Итак, что не так, это то, что каждая задача содержит массив numpy x
, который велик. Для каждой из 100 задач, которые мы отправляем, нам необходимо сериализовать x
, отправить его планировщику, отправить его рабочему и т. Д.
Вместо этого мы отправим массив в кластер один раз:
[future] = c.scatter([x])
Теперь future
- это токен, который указывает на массив x
, который находится в кластере. Теперь мы можем отправлять задачи, относящиеся к этому удаленному будущему, вместо массива numpy на нашем локальном клиенте.
# futures = [c.submit(f, x, param) for param in params] # sends x each time
futures = [c.submit(f, future, param) for param in params] # refers to remote x already on cluster
Теперь это намного быстрее и позволяет Dask более эффективно контролировать перемещение данных.
Разбросить данные по всем воркерам
Если вы ожидаете, что в конечном итоге вам понадобится переместить массив x всем рабочим, вы можете захотеть транслировать массив для запуска
[future] = c.scatter([x], broadcast=True)
Использовать Dask с задержкой
Futures также отлично работает с dask.delayed. Здесь нет преимущества в производительности, но некоторые люди предпочитают этот интерфейс:
# futures = [c.submit(f, future, param) for param in params]
from dask import delayed
lazy_values = [delayed(f)(future, param) for param in params]
futures = c.compute(lazy_values)
person
MRocklin
schedule
04.01.2017