Как эффективно отправлять задачи с большими аргументами в распределенном Dask?

Я хочу отправлять функции с Dask, которые имеют большие (гигабайтные) аргументы. Как лучше всего это сделать? Я хочу запускать эту функцию много раз с разными (маленькими) параметрами.

Пример (плохой)

Здесь используется интерфейс concurrent.futures. Мы могли бы так же легко использовать интерфейс dask.delayed.

x = np.random.random(size=100000000)  # 800MB array
params = list(range(100))             # 100 small parameters

def f(x, param):
    pass

from dask.distributed import Client
c = Client()

futures = [c.submit(f, x, param) for param in params]

Но это медленнее, чем я ожидал, или приводит к ошибкам памяти.


person MRocklin    schedule 04.01.2017    source источник


Ответы (1)


Итак, что не так, это то, что каждая задача содержит массив numpy x, который велик. Для каждой из 100 задач, которые мы отправляем, нам необходимо сериализовать x, отправить его планировщику, отправить его рабочему и т. Д.

Вместо этого мы отправим массив в кластер один раз:

[future] = c.scatter([x])

Теперь future - это токен, который указывает на массив x, который находится в кластере. Теперь мы можем отправлять задачи, относящиеся к этому удаленному будущему, вместо массива numpy на нашем локальном клиенте.

# futures = [c.submit(f, x, param) for param in params]  # sends x each time
futures = [c.submit(f, future, param) for param in params]  # refers to remote x already on cluster

Теперь это намного быстрее и позволяет Dask более эффективно контролировать перемещение данных.

Разбросить данные по всем воркерам

Если вы ожидаете, что в конечном итоге вам понадобится переместить массив x всем рабочим, вы можете захотеть транслировать массив для запуска

[future] = c.scatter([x], broadcast=True)

Использовать Dask с задержкой

Futures также отлично работает с dask.delayed. Здесь нет преимущества в производительности, но некоторые люди предпочитают этот интерфейс:

# futures = [c.submit(f, future, param) for param in params]

from dask import delayed
lazy_values = [delayed(f)(future, param) for param in params]
futures = c.compute(lazy_values)
person MRocklin    schedule 04.01.2017
comment
Спасибо, очень полезно! Использование будущего, возвращаемого командой scatter (в качестве аргумента функции в вашем примере), не объясняется в документации. - person PierreE; 12.04.2018
comment
в рамках задачи (f выше) можно ли ссылаться на future (широковещательную кластерную версию x) без передачи ее в качестве аргумента в submit? Например, в Spark вы можете транслировать переменную, а затем просто ссылаться на нее в своих задачах, используя глобальный контекст. - person eggie5; 04.05.2020