dask out-of-core матрица многократного планирования

Я пытаюсь вычислить матричное произведение Y=XX^T для матрицы X размером 10 000 * 800 000. Матрица X хранится на диске в файле h5py. Результирующий Y должен быть матрицей 10 000 * 10 000, хранящейся в том же файле h5py. Вот воспроизводимый пример кода.

import dask.array as da
from blaze import into

into("h5py:///tmp/dummy::/X", da.ones((10**4, 8*10**5), chunks=(10**4,10**4)))
x = into(da.Array, "h5py:///tmp/dummy::/X", chunks=(10**4,10**4)))
y = x.dot(x.T)
into("h5py:///tmp/dummy::/Y", y)

Я ожидал, что это вычисление пройдет гладко, поскольку каждый фрагмент (10 000 * 10 000) должен быть индивидуально транспонирован, за ним следует скалярное произведение, а затем суммируется окончательный результат. Однако выполнение этого вычисления заполняет как мою оперативную память, так и память подкачки, пока процесс в конечном итоге не будет уничтожен.

Вот пример графика вычислений, построенного с помощью dot_graph: Пример графика вычислений

В соответствии с документом по расписанию http://dask.pydata.org/en/latest/scheduling-policy.html я ожидаю, что промежуточные результаты верхнего тензорда будут суммированы один за другим в последний результат суммы, как только они были рассчитаны индивидуально. Это освободило бы память от этих промежуточных результатов тензордотов, чтобы мы не столкнулись с ошибками памяти.

Играем с меньшим игрушечным примером:

from dask.diagnostics import Profiler, CacheProfiler, ResourceProfiler

# Experiment on a (1,0000 * 5,000) matrix X split into 500 chunks of size (1,000 * 10)
x = into(da.Array, "h5py:///tmp/dummy::/X", chunks=(10**3,10)))[:10**3,5000]
y = x.T.dot(x)
with Profiler() as prof, CacheProfiler() as cprof, ResourceProfiler() as rprof:
    into("h5py:///tmp/dummy::/X", y)
rprof.visualize()

Я получаю следующий экран: Профилировщик ресурсов

Где зеленая полоса представляет собой операцию суммы, а желтая и фиолетовая полосы представляют соответственно операции get_array и tensordot. Это, по-видимому, указывает на то, что операция суммирования ожидает выполнения всех промежуточных операций tensordot перед их суммированием. Это также объяснило бы, что мой процесс исчерпал память и был убит.

Итак, мои вопросы:

  • Это нормальное поведение операции суммы?
  • Есть ли способ заставить его вычислять промежуточные суммы до того, как все промежуточные продукты тензордота будут вычислены и сохранены в памяти?
  • Если нет, есть ли обходной путь, который не включает сброс на диск?

Любая помощь очень ценится!


person Grin    schedule 01.05.2017    source источник


Ответы (1)


Вообще говоря, выполнение плотного матричного умножения в небольшом пространстве затруднено. Это связано с тем, что каждый промежуточный фрагмент будет использоваться несколькими выходными фрагментами.

В соответствии с документом по расписанию http://dask.pydata.org/en/latest/scheduling-policy.html я ожидаю, что промежуточные результаты верхнего тензорда будут суммированы один за другим в последний результат суммы, как только они были рассчитаны индивидуально.

График, который вы показали, имеет много входных данных для функции суммы. Dask будет ждать, пока все эти входные данные будут завершены, прежде чем запускать функцию суммирования. Планировщик задач понятия не имеет, что сумма ассоциативна и может выполняться по частям. Это отсутствие семантической информации — цена, которую вы платите за использование общей системы планирования задач, такой как Dask, а не специальной библиотеки линейной алгебры. Если ваша цель состоит в том, чтобы выполнить плотную линейную алгебру как можно эффективнее, вы можете поискать в другом месте; это хорошо покрытое поле.

Таким образом, как написано, ваши требования к памяти составляют не менее 8e5 * 1e4 * dtype.itemsize, при условии, что Dask работает в точно правильном порядке (что он и должен в основном делать).

Вы можете попробовать следующее:

  1. Уменьшите размер фрагмента по несокращающемуся измерению.
  2. Используйте версию Dask более позднюю, чем 0.14.1 (версия 0.14.2 должна быть выпущена к 5 мая 2017 г.), где мы разбиваем эти вызовы с большой суммой на множество более мелких явно на графике.
  3. Используйте распределенный планировщик, который более эффективно обрабатывает запись данных на диск.

    from dask.distributed import Client
    client = Client(processes=False)  # create a local cluster in this process
    
person MRocklin    schedule 01.05.2017