Как можно применить некоторую функцию параллельно к частям разреженного массива CSR, сохраненному на диске, с помощью Python? Последовательно это может быть сделано, например. сохраняя массив CSR с помощью joblib.dump
, открывая его с помощью joblib.load(.., mmap_mode="r")
и обрабатывая фрагменты строк один за другим. Можно ли сделать это более эффективно с помощью dask?
В частности, если предположить, что вам не нужны все возможные операции вне ядра над разреженными массивами, а нужна только возможность параллельной загрузки фрагментов строк (каждый фрагмент представляет собой массив CSR) и применение к ним некоторой функции (в моем случае это было бы быть, например, estimator.predict(X)
из scikit-learn).
Кроме того, есть ли на диске формат файла, подходящий для этой задачи? Joblib работает, но я не уверен в (параллельной) производительности массивов CSR, загруженных как карты памяти; spark.mllib
, по-видимому, использует либо какой-то пользовательский разреженный формат хранения (который, похоже, не имеет чистого синтаксического анализатора Python), либо формат LIBSVM (анализатор в scikit-learn, по моему опыту, намного медленнее, чем joblib.dump
)...
Примечание. Я прочитал документацию, https://github.com/dask/dask/search?q=sparse&type=Issues&utf8=%E2%9C%93 но я до сих пор не знаю, как лучше решить эту проблему.
Изменить: чтобы дать более практический пример, ниже приведен код, который работает в dask для плотных массивов, но не работает при использовании разреженных массивов с эта ошибка,
import numpy as np
import scipy.sparse
import joblib
import dask.array as da
from sklearn.utils import gen_batches
np.random.seed(42)
joblib.dump(np.random.rand(100000, 1000), 'X_dense.pkl')
joblib.dump(scipy.sparse.random(10000, 1000000, format='csr'), 'X_csr.pkl')
fh = joblib.load('X_dense.pkl', mmap_mode='r')
# computing the results without dask
results = np.vstack((fh[sl, :].sum(axis=1)) for sl in gen_batches(fh.shape[0], batch_size))
# computing the results with dask
x = da.from_array(fh, chunks=(2000))
results = x.sum(axis=1).compute()
Edit2: после обсуждения ниже в приведенном ниже примере устраняется предыдущая ошибка, но появляются ошибки примерно IndexError: tuple index out of range
в dask/array/core.py:L3413
,
import dask
# +imports from the example above
dask.set_options(get=dask.get) # disable multiprocessing
fh = joblib.load('X_csr.pkl', mmap_mode='r')
def func(x):
if x.ndim == 0:
# dask does some heuristics with dummy data, if the x is a 0d array
# the sum command would fail
return x
res = np.asarray(x.sum(axis=1, keepdims=True))
return res
Xd = da.from_array(fh, chunks=(2000))
results_new = Xd.map_blocks(func).compute()
NumpyPickler
(github.com/ joblib/joblib/blob/ ), в котором все хранится в одном большом двоичном объекте. Я думаю, что для разреженных массивов CSR это должно быть эквивалентно применению массивовnp.save
к массивамX.data
,X.indices
иX.indptr
. Фактически, предыдущие версии joblib.dump приводили к одному файлу на массив numpy. Преимущество в том, чтоjoblib.load("<sparse array pickled file>", mmap_mode="r")[slice, :]
уже загружает только один фрагмент массива. - person rth   schedule 17.07.2017scipy
естьsparse.savenz
. Для форматаcsr
он используетnp.savez
для сохраненияdict(data=matrix.data, indices=matrix.indices, indptr=matrix.indptr)
. То есть ключевые атрибуты матрицы сохраняются в отдельныеzip
файлов архива. «Разбитая» загрузка должна будет считываться из всех трех массивов. - person hpaulj   schedule 17.07.2017np.savez
тоже может сработать. Однако я хочу сказать, что я до сих пор не уверен, как заставить memmap разреженного массива работать с dask (см. отредактированный пример выше), и я ищу некоторые предложения о наилучшем подходе, чтобы заставить его работать. В частности, @MRocklin dask.pydata.org/en/latest/array- sparse.html подходит для этого варианта использования? - person rth   schedule 18.07.2017fh[...].sum()
,axis 1 is out of bounds for array of dimension 0
предполагает, что что-то, возможно,fh
, является массивом 0d, возможно, оболочкой массива объектов вокруг разреженной матрицы. Я думаю, вам нужно изучитьfh
, прежде чем пытаться использовать его в вычислениях. - person hpaulj   schedule 18.07.2017memmap of a sparse array
. - person hpaulj   schedule 18.07.2017fh
- этоscipy.sparse.csr.csr_matrix
, гдеfh.data
,fh.indices
иfh.indptr
- этоnp.memmap
. Если вы предпочитаете не использовать joblib, я думаю, это чем-то похоже на то, чтобы взять массив csr, сохранить его с помощьюnp.savez
, а затем загрузить с помощьюnp.load(..., memmap='r')
... Хорошо fh не является массивом 0d, возможно, это как-то связано с неудовлетворением требований в dask.pydata.org/en/latest/array-sparse.html#requirements (например, нет функцииconcatenate
вscipy.sparse
)... - person rth   schedule 18.07.2017vstack
иhstack
, но они сильно отличаются от версий numpy. Они строят новую матрицу, используяcoo
атрибутов. - person hpaulj   schedule 18.07.2017np.load('test.npz',mmap_mode='r')
не вызывает ошибку, но значениеmmap_mode
игнорируется при создании объектаNpzFile
и, таким образом, ничего не делает. - person hpaulj   schedule 25.03.2018