Понимание поведения памяти распределенного Dask

Подобно этому вопросу, у меня возникают проблемы с памятью при распространении Dask. Однако в моем случае объяснение не в том, что клиент пытается собрать большой объем данных.

Проблему можно проиллюстрировать на очень простом графике задач: список из delayed операций генерирует несколько случайных DataFrames фиксированного размера ~ 500 МБ (для имитации загрузки множества разделов из файлов). Следующая операция в графе задач - получить размер каждого DataFrame. Наконец, все размеры уменьшаются до одного общего размера, т. Е. Данные, которые должны быть возвращены клиенту, имеют небольшой размер.

В целях тестирования я использую однопоточный локальный планировщик / рабочий, ограниченный 2 ГБ памяти, то есть:

$ dask-scheduler
$ dask-worker localhost:8786 --nthreads 1 --memory-limit 2000000000

Мое ожидание от графа задач состоит в том, что работнику никогда не потребуется больше 500 МБ ОЗУ, потому что выполнение «получить размер данных» непосредственно после «сгенерировать данные» должно привести к данные небольшие сразу. Однако я заметил, что рабочему требуется гораздо больше памяти, чем это:

«Использование

Коэффициент 2 означает, что данные должны быть дублированы внутри. Поэтому любые попытки приблизить размер раздела к физической памяти узла приводят к MemoryErrors или интенсивному свопингу.

Мы высоко ценим любую информацию, проливающую свет на это. Особенно:

  • Могу ли я контролировать дублирование данных и можно ли этого избежать? Или есть общее эмпирическое правило: сохранять полезную нагрузку значительно ниже 50% для учета дублирования данных?
  • Как рабочий memory-limit влияет на это поведение? Судя по моим тестам, использование более низкого порога, похоже, запускает сборщик мусора раньше (и / или перенос на диск?), Но, с другой стороны, есть другие пики памяти, которые даже превышают пиковую память при использовании более высокого порога.

Обратите внимание, что я знаю, что могу решить эту конкретную проблему, взяв размер в первой операции, и, вероятно, для этой проблемы лучше подходит одиночный компьютерный исполнитель Dask, но я прошу в образовательных целях .


Приложение 1: тестовый код

from __future__ import division, print_function
import pandas as pd
import numpy as np
from dask import delayed
from dask.distributed import Client, Executor


def simulate_df_partition_load(part_id):
    """
    Creates a random DataFrame of ~500 MB
    """
    num_rows = 5000000
    num_cols = 13

    df = pd.DataFrame()
    for i in xrange(num_cols):
        data_col = np.random.uniform(0, 1, num_rows)
        df["col_{}".format(i)] = data_col
        del data_col    # for max GC-friendliness

    print("[Partition {}] #rows: {}, #cols: {}, memory: {} MB".format(
        part_id, df.shape[0], df.shape[1],
        df.memory_usage().sum() / (2 ** 20)
    ))
    return df


e = Executor('127.0.0.1:8786', set_as_default=True)

num_partitions = 2

lazy_dataframes = [
    delayed(simulate_df_partition_load)(part_id)
    for part_id in xrange(num_partitions)
]

length_partitions = [df.shape[0] for df in lazy_dataframes]
dag = delayed(sum)(length_partitions)

length_total = dag.compute()

Приложение 2: иллюстрация DAG

DAG


person bluenote10    schedule 03.06.2017    source источник
comment
Как у вас получился график, показывающий память? Плюс это график на одного рабочего или на всех рабочих?   -  person AmyChodorowski    schedule 08.01.2021
comment
@AmyChodorowski Это было только с панели мониторинга Dask, и я думаю, что это относилось только к одному рабочему. Обратите внимание, что этому вопросу несколько лет, и, возможно, панель мониторинга Dask немного изменилась.   -  person bluenote10    schedule 08.01.2021


Ответы (1)


Здесь есть несколько вопросов:

  1. Почему я вижу, что в два раза больше памяти используется по сравнению с одним элементом данных?
  2. Рекомендуется ли поддерживать размер раздела ниже общего объема памяти?
  3. Что происходит, когда я выхожу за пределы значения --memory-limit

Почему я вижу, что память используется вдвое больше?

Рабочий, скорее всего, выполняет две задачи создания данных, прежде чем он перейдет к первой задаче размера вычислений. Это связано с тем, что планировщик назначает работникам все выполняемые в данный момент задачи, возможно, больше, чем они могут выполнить за один раз. Рабочий завершает первый и сообщает планировщику. В то время как планировщик определяет, какую новую задачу отправить работнику (задача размера вычислений), работник немедленно запускает другую задачу создания данных.

Рекомендуется ли поддерживать размер раздела ниже общего объема памяти?

да.

Что произойдет, если я выйду за пределы --memory-limit?

Рабочий начнет записывать на диск наименее используемые элементы данных. Это происходит, когда по умолчанию используется около 60% памяти (по данным протокола __sizeof__).

Примечание: спасибо за хорошо поставленный вопрос

person MRocklin    schedule 03.06.2017
comment
Большое спасибо, это проясняет. Я добавил журналы в генератор + картограф, и могу подтвердить, что это именно то поведение, которое я получаю: первые две задачи - это задачи генератора, а оттуда задачи сопоставления и генератора чередуются при использовании более двух разделов. - person bluenote10; 03.06.2017