Я понимаю, что даск хорошо работает в пакетном режиме, как это
def load(filename):
...
def clean(data):
...
def analyze(sequence_of_data):
...
def store(result):
with open(..., 'w') as f:
f.write(result)
dsk = {'load-1': (load, 'myfile.a.data'),
'load-2': (load, 'myfile.b.data'),
'load-3': (load, 'myfile.c.data'),
'clean-1': (clean, 'load-1'),
'clean-2': (clean, 'load-2'),
'clean-3': (clean, 'load-3'),
'analyze': (analyze, ['clean-%d' % i for i in [1, 2, 3]]),
'store': (store, 'analyze')}
from dask.multiprocessing import get
get(dsk, 'store') # executes in parallel
- Можем ли мы использовать dask для обработки потокового канала, где количество чанков неизвестно или даже бесконечно?
- Может ли он выполнять вычисления инкрементным способом. например, может ли описанный выше шаг «анализировать» обрабатывать текущие фрагменты?
- должны ли мы вызывать операцию «получить» только после того, как все фрагменты данных известны, можем ли мы добавить новые фрагменты после вызова «получить»