Может ли даск работать с бесконечным потоковым вводом

Я понимаю, что даск хорошо работает в пакетном режиме, как это

def load(filename):
    ...

def clean(data):
    ...

def analyze(sequence_of_data):
    ...

def store(result):
    with open(..., 'w') as f:
        f.write(result)

dsk = {'load-1': (load, 'myfile.a.data'),
       'load-2': (load, 'myfile.b.data'),
       'load-3': (load, 'myfile.c.data'),
       'clean-1': (clean, 'load-1'),
       'clean-2': (clean, 'load-2'),
       'clean-3': (clean, 'load-3'),
       'analyze': (analyze, ['clean-%d' % i for i in [1, 2, 3]]),
       'store': (store, 'analyze')}

from dask.multiprocessing import get
get(dsk, 'store')  # executes in parallel
  1. Можем ли мы использовать dask для обработки потокового канала, где количество чанков неизвестно или даже бесконечно?
  2. Может ли он выполнять вычисления инкрементным способом. например, может ли описанный выше шаг «анализировать» обрабатывать текущие фрагменты?
  3. должны ли мы вызывать операцию «получить» только после того, как все фрагменты данных известны, можем ли мы добавить новые фрагменты после вызова «получить»

person sami    schedule 27.11.2015    source источник


Ответы (1)


Изменить: см. новый ответ ниже

No

Текущий планировщик задач в dask ожидает один вычислительный граф. Он не поддерживает динамическое добавление или удаление из этого графика. Планировщик предназначен для оценки больших графов в небольшом объеме памяти; знание всего графика заранее имеет решающее значение для этого.

Однако это не мешает создавать другие планировщики с другими свойствами. Одним из простых решений здесь является использование модуля типа conncurrent.futures на одной машине или distributed на нескольких машинах.

На самом деле да

Распределенный планировщик теперь работает полностью асинхронно, и вы можете отправлять задачи, ждать несколько из них, отправлять больше, отменять задачи, добавлять/удалять рабочих и т. д. во время вычислений. Есть несколько способов сделать это, но самым простым, вероятно, является новый интерфейс concurrent.futures, кратко описанный здесь:

http://dask.pydata.org/en/latest/futures.html

person MRocklin    schedule 27.11.2015