Как составлять задачи в dask-distribution

Я пытаюсь запустить параллельный цикл joblib внутри кластера, распределенного по потокам (см. Причину ниже), но я не могу добиться ускорения из-за блокировки GIL. Вот пример:

def task(x):
    """ Sample single-process task that takes between 2 and 5 seconds """
    import time
    import random
    dt = random.uniform(2,5)
    time.sleep(dt)
    return x+dt

def composite_task(np=8):
    """ Composite task that runs multiple single-process runs in parallel """
    from functools import partial
    from joblib import Parallel, delayed, parallel_backend
    with parallel_backend('loky', n_jobs=np):
        out=Parallel()(delayed(task)(i) for i in list(range(0, np)))
    return out

Задача с одним ЦП в среднем занимает 3,5 секунды

%timeit -n7 -r1 task(0)
3.61 s ± 0 ns per loop (mean ± std. dev. of 1 run, 7 loops each)

Joblib работает как положено, 8 задач занимают не больше времени, чем одно самое долгое

%timeit -n1 -r1 composite_task(8)
5.03 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

Однако, когда я пытаюсь запустить этот код внутри dask LocalCluster с 8 потоками, я не получаю никакого ускорения.

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=1, threads_per_worker=8)
client = Client(cluster)

%timeit -n1 -r1 client.submit(composite_task,8).result()
25.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

Наверное, я неправильно понимаю, как работает GIL. Пожалуйста помоги. Полную записную книжку можно посмотреть здесь:

http://nbviewer.jupyter.org/gist/aeantipov/6d6370e13e13c


Причина попробовать это - необходимость решить> 10 тыс. Задач с заблокированным GIL примерно на 50 узлах с 32 процессорами. Легко создать кластер dask-jobqueue с 50 рабочими * 32 потоками, но не с 1600 рабочими. И, к сожалению, поскольку GIL заблокирован, используя этот пример http://matthewrocklin.com/blog/work/2018/06/26/dask-scaling-limits не дает значительного ускорения по сравнению с 50 рабочими.


dask                      0.19.1                
dask-core                 0.19.1                
dask-jobqueue             0.3.0             
python                    3.7.0
distributed               1.23.1

person A32167    schedule 21.10.2018    source источник


Ответы (1)


Я бы просто использовал решение dask-joblib

cluster = LocalCluster()
client = Client(cluster)

with joblib.parallel_backend('dask'):
    out=Parallel()(delayed(task)(i) for i in range(0, np))

Ваши опасения по поводу GIL здесь не касаются. Ваша функция вызывает sleep, который освобождает GIL во время выполнения. Если ваша фактическая функция представляет собой чистый код Python и не выпускает GIL, я рекомендую запустить кластер Dask с множеством однопоточных процессов. Если вы используете dask-jobqueue, вы хотите использовать ключевое слово processes= для управления процессами для каждого задания.

У вас может быть намного больше задач, чем процессов.

person MRocklin    schedule 21.10.2018