Семафоры в dask.distributed?

У меня есть кластер dask с n рабочими, и я хочу, чтобы рабочие выполняли запросы к базе данных. Но база данных способна обрабатывать только m запросов параллельно, где m ‹n. Как я могу смоделировать это в dask.distributed? Параллельно над такой задачей должны работать только m работников.

Я видел, что распределенный поддерживает блокировки (http://distributed.readthedocs.io/en/latest/api.html#distributed.Lock). Но с этим я мог выполнять только один запрос параллельно, а не m.

Также я видел, что могу определять ресурсы для каждого рабочего (https://distributed.readthedocs.io/en/latest/resources.html). Но это также не подходит, поскольку база данных не зависит от рабочих. Мне бы либо пришлось определять по одному ресурсу базы данных для каждого рабочего (что приводит к слишком большому количеству параллельных запросов). Или мне пришлось бы распределить m ресурсов базы данных среди n работников, что сложно при настройке кластера и неоптимально в исполнении.

Можно ли определить что-то вроде семафоров в dask, чтобы решить эту проблему?


person Christian Trebing    schedule 07.02.2018    source источник


Ответы (2)


Возможно, вы могли бы что-нибудь взломать вместе с блокировками и переменными.

Более чистым решением было бы просто реализовать семафоры так же, как реализованы блокировки. В зависимости от вашего опыта это может быть не так сложно (реализация блокировки составляет 150 строк) и будет желанным запросом на перенос.

https://github.com/dask/distributed/blob/master/distributed/lock.py

person MRocklin    schedule 07.02.2018

Вы можете использовать dask.distributed.Queue

class DDSemaphore(object):
    """Dask Distributed Semaphore"""

    def __init__(self, value=1):
        self._q = dask.distributed.Queue()
        for _ in range(value):
            self._q.put(42)

    def acquire():
        self._q.get()

    def release():
        self._q.put(42)
person Ngo    schedule 09.03.2018