Подобно этому вопросу, у меня возникают проблемы с памятью при распространении Dask. Однако в моем случае объяснение не в том, что клиент пытается собрать большой объем данных.
Проблему можно проиллюстрировать на очень простом графике задач: список из delayed
операций генерирует несколько случайных DataFrames фиксированного размера ~ 500 МБ (для имитации загрузки множества разделов из файлов). Следующая операция в графе задач - получить размер каждого DataFrame. Наконец, все размеры уменьшаются до одного общего размера, т. Е. Данные, которые должны быть возвращены клиенту, имеют небольшой размер.
В целях тестирования я использую однопоточный локальный планировщик / рабочий, ограниченный 2 ГБ памяти, то есть:
$ dask-scheduler
$ dask-worker localhost:8786 --nthreads 1 --memory-limit 2000000000
Мое ожидание от графа задач состоит в том, что работнику никогда не потребуется больше 500 МБ ОЗУ, потому что выполнение «получить размер данных» непосредственно после «сгенерировать данные» должно привести к данные небольшие сразу. Однако я заметил, что рабочему требуется гораздо больше памяти, чем это:
Коэффициент 2 означает, что данные должны быть дублированы внутри. Поэтому любые попытки приблизить размер раздела к физической памяти узла приводят к MemoryErrors
или интенсивному свопингу.
Мы высоко ценим любую информацию, проливающую свет на это. Особенно:
- Могу ли я контролировать дублирование данных и можно ли этого избежать? Или есть общее эмпирическое правило: сохранять полезную нагрузку значительно ниже 50% для учета дублирования данных?
- Как рабочий
memory-limit
влияет на это поведение? Судя по моим тестам, использование более низкого порога, похоже, запускает сборщик мусора раньше (и / или перенос на диск?), Но, с другой стороны, есть другие пики памяти, которые даже превышают пиковую память при использовании более высокого порога.
Обратите внимание, что я знаю, что могу решить эту конкретную проблему, взяв размер в первой операции, и, вероятно, для этой проблемы лучше подходит одиночный компьютерный исполнитель Dask, но я прошу в образовательных целях .
Приложение 1: тестовый код
from __future__ import division, print_function
import pandas as pd
import numpy as np
from dask import delayed
from dask.distributed import Client, Executor
def simulate_df_partition_load(part_id):
"""
Creates a random DataFrame of ~500 MB
"""
num_rows = 5000000
num_cols = 13
df = pd.DataFrame()
for i in xrange(num_cols):
data_col = np.random.uniform(0, 1, num_rows)
df["col_{}".format(i)] = data_col
del data_col # for max GC-friendliness
print("[Partition {}] #rows: {}, #cols: {}, memory: {} MB".format(
part_id, df.shape[0], df.shape[1],
df.memory_usage().sum() / (2 ** 20)
))
return df
e = Executor('127.0.0.1:8786', set_as_default=True)
num_partitions = 2
lazy_dataframes = [
delayed(simulate_df_partition_load)(part_id)
for part_id in xrange(num_partitions)
]
length_partitions = [df.shape[0] for df in lazy_dataframes]
dag = delayed(sum)(length_partitions)
length_total = dag.compute()
Приложение 2: иллюстрация DAG