Я пытаюсь запустить параллельный цикл joblib внутри кластера, распределенного по потокам (см. Причину ниже), но я не могу добиться ускорения из-за блокировки GIL. Вот пример:
def task(x):
""" Sample single-process task that takes between 2 and 5 seconds """
import time
import random
dt = random.uniform(2,5)
time.sleep(dt)
return x+dt
def composite_task(np=8):
""" Composite task that runs multiple single-process runs in parallel """
from functools import partial
from joblib import Parallel, delayed, parallel_backend
with parallel_backend('loky', n_jobs=np):
out=Parallel()(delayed(task)(i) for i in list(range(0, np)))
return out
Задача с одним ЦП в среднем занимает 3,5 секунды
%timeit -n7 -r1 task(0)
3.61 s ± 0 ns per loop (mean ± std. dev. of 1 run, 7 loops each)
Joblib работает как положено, 8 задач занимают не больше времени, чем одно самое долгое
%timeit -n1 -r1 composite_task(8)
5.03 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
Однако, когда я пытаюсь запустить этот код внутри dask LocalCluster с 8 потоками, я не получаю никакого ускорения.
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=1, threads_per_worker=8)
client = Client(cluster)
%timeit -n1 -r1 client.submit(composite_task,8).result()
25.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
Наверное, я неправильно понимаю, как работает GIL. Пожалуйста помоги. Полную записную книжку можно посмотреть здесь:
http://nbviewer.jupyter.org/gist/aeantipov/6d6370e13e13c
Причина попробовать это - необходимость решить> 10 тыс. Задач с заблокированным GIL примерно на 50 узлах с 32 процессорами. Легко создать кластер dask-jobqueue с 50 рабочими * 32 потоками, но не с 1600 рабочими. И, к сожалению, поскольку GIL заблокирован, используя этот пример http://matthewrocklin.com/blog/work/2018/06/26/dask-scaling-limits не дает значительного ускорения по сравнению с 50 рабочими.
dask 0.19.1
dask-core 0.19.1
dask-jobqueue 0.3.0
python 3.7.0
distributed 1.23.1