Внешняя обработка разреженных массивов CSR

Как можно применить некоторую функцию параллельно к частям разреженного массива CSR, сохраненному на диске, с помощью Python? Последовательно это может быть сделано, например. сохраняя массив CSR с помощью joblib.dump, открывая его с помощью joblib.load(.., mmap_mode="r") и обрабатывая фрагменты строк один за другим. Можно ли сделать это более эффективно с помощью dask?

В частности, если предположить, что вам не нужны все возможные операции вне ядра над разреженными массивами, а нужна только возможность параллельной загрузки фрагментов строк (каждый фрагмент представляет собой массив CSR) и применение к ним некоторой функции (в моем случае это было бы быть, например, estimator.predict(X) из scikit-learn).

Кроме того, есть ли на диске формат файла, подходящий для этой задачи? Joblib работает, но я не уверен в (параллельной) производительности массивов CSR, загруженных как карты памяти; spark.mllib, по-видимому, использует либо какой-то пользовательский разреженный формат хранения (который, похоже, не имеет чистого синтаксического анализатора Python), либо формат LIBSVM (анализатор в scikit-learn, по моему опыту, намного медленнее, чем joblib.dump)...

Примечание. Я прочитал документацию, https://github.com/dask/dask/search?q=sparse&type=Issues&utf8=%E2%9C%93 но я до сих пор не знаю, как лучше решить эту проблему.

Изменить: чтобы дать более практический пример, ниже приведен код, который работает в dask для плотных массивов, но не работает при использовании разреженных массивов с эта ошибка,

import numpy as np
import scipy.sparse

import joblib
import dask.array as da
from sklearn.utils import gen_batches

np.random.seed(42)
joblib.dump(np.random.rand(100000, 1000), 'X_dense.pkl')
joblib.dump(scipy.sparse.random(10000, 1000000, format='csr'), 'X_csr.pkl')

fh = joblib.load('X_dense.pkl', mmap_mode='r')

# computing the results without dask
results = np.vstack((fh[sl, :].sum(axis=1)) for sl in gen_batches(fh.shape[0], batch_size))

# computing the results with dask
x = da.from_array(fh, chunks=(2000))
results = x.sum(axis=1).compute()

Edit2: после обсуждения ниже в приведенном ниже примере устраняется предыдущая ошибка, но появляются ошибки примерно IndexError: tuple index out of range в dask/array/core.py:L3413,

import dask
# +imports from the example above
dask.set_options(get=dask.get)  # disable multiprocessing

fh = joblib.load('X_csr.pkl', mmap_mode='r')

def func(x):
    if x.ndim == 0:
        # dask does some heuristics with dummy data, if the x is a 0d array
        # the sum command would fail
        return x
    res = np.asarray(x.sum(axis=1, keepdims=True))
    return res

Xd = da.from_array(fh, chunks=(2000))
results_new = Xd.map_blocks(func).compute()

person rth    schedule 17.07.2017    source источник
comment
Это будет зависеть от того, как joblib хранит данные на диске. Я подозреваю, что они хранят его как непрозрачный блок, и в этом случае его будет трудно читать по частям.   -  person MRocklin    schedule 17.07.2017
comment
@MRocklin Ну да, у них есть NumpyPickler (github.com/ joblib/joblib/blob/ ), в котором все хранится в одном большом двоичном объекте. Я думаю, что для разреженных массивов CSR это должно быть эквивалентно применению массивов np.save к массивам X.data, X.indices и X.indptr. Фактически, предыдущие версии joblib.dump приводили к одному файлу на массив numpy. Преимущество в том, что joblib.load("<sparse array pickled file>", mmap_mode="r")[slice, :] уже загружает только один фрагмент массива.   -  person rth    schedule 17.07.2017
comment
В последней версии scipy есть sparse.savenz. Для формата csr он использует np.savez для сохранения dict(data=matrix.data, indices=matrix.indices, indptr=matrix.indptr). То есть ключевые атрибуты матрицы сохраняются в отдельные zip файлов архива. «Разбитая» загрузка должна будет считываться из всех трех массивов.   -  person hpaulj    schedule 17.07.2017
comment
@hpaulj Спасибо, np.savez тоже может сработать. Однако я хочу сказать, что я до сих пор не уверен, как заставить memmap разреженного массива работать с dask (см. отредактированный пример выше), и я ищу некоторые предложения о наилучшем подходе, чтобы заставить его работать. В частности, @MRocklin dask.pydata.org/en/latest/array- sparse.html подходит для этого варианта использования?   -  person rth    schedule 18.07.2017
comment
Похоже, ошибка возникает в fh[...].sum(), axis 1 is out of bounds for array of dimension 0 предполагает, что что-то, возможно, fh, является массивом 0d, возможно, оболочкой массива объектов вокруг разреженной матрицы. Я думаю, вам нужно изучить fh, прежде чем пытаться использовать его в вычислениях.   -  person hpaulj    schedule 18.07.2017
comment
Я понятия не имею, как будет выглядеть memmap of a sparse array.   -  person hpaulj    schedule 18.07.2017
comment
@hpaulj fh - это scipy.sparse.csr.csr_matrix, где fh.data, fh.indices и fh.indptr - это np.memmap. Если вы предпочитаете не использовать joblib, я думаю, это чем-то похоже на то, чтобы взять массив csr, сохранить его с помощью np.savez, а затем загрузить с помощью np.load(..., memmap='r')... Хорошо fh не является массивом 0d, возможно, это как-то связано с неудовлетворением требований в dask.pydata.org/en/latest/array-sparse.html#requirements (например, нет функции concatenate в scipy.sparse)...   -  person rth    schedule 18.07.2017
comment
В Sparse есть vstack и hstack, но они сильно отличаются от версий numpy. Они строят новую матрицу, используя coo атрибутов.   -  person hpaulj    schedule 18.07.2017
comment
Возможно, вы делаете это не так, как нужно. Я бы использовал h5py для этой задачи. Таким образом, данные в любом случае сжимаются.   -  person sdgaw erzswer    schedule 07.03.2018
comment
@sdgawerzswer Можно использовать hdf5 для хранения вместо numpy, это на самом деле не меняет проблему, заключающуюся в том, что вам нужно иметь возможность обрабатывать разреженный вывод в dask ..   -  person rth    schedule 07.03.2018
comment
np.load('test.npz',mmap_mode='r') не вызывает ошибку, но значение mmap_mode игнорируется при создании объекта NpzFile и, таким образом, ничего не делает.   -  person hpaulj    schedule 25.03.2018


Ответы (1)


Так что я ничего не знаю о joblib или dask, не говоря уже о формате данных вашего приложения. Но на самом деле можно читать разреженные матрицы с диска порциями, сохраняя при этом разреженную структуру данных.

В то время как статья Википедии о формате CSR отлично объясняет как это работает, я дам краткий обзор:

Некоторая разреженная матрица, например:

1 0 2
0 0 3
4 5 6

сохраняется путем запоминания каждого ненулевого значения и столбца, в котором оно находится:

sparse.data    = 1 2 3 4 5 6  # acutal value
sparse.indices = 0 2 2 0 1 2  # number of column (0-indexed)

Теперь нам все еще не хватает строк. В сжатом формате просто сохраняется количество ненулевых значений в каждой строке, а не каждая отдельная строка значений.

Обратите внимание, что ненулевое количество также накапливается, поэтому следующий массив содержит количество ненулевых значений до этой строки включительно. Чтобы еще больше усложнить ситуацию, массив всегда начинается с 0 и, таким образом, содержит num_rows+1 записей:

sparse.indptr = 0 2 3 6

поэтому до второй строки включительно есть 3 ненулевых значения, а именно 1, 2 и 3.

Поскольку мы с этим разобрались, мы можем начать «нарезать» матрицу. Цель состоит в том, чтобы построить массивы data, indices и indptr для некоторых фрагментов. Предположим, исходная огромная матрица хранится в трех бинарных файлах, которые мы будем постепенно читать. Мы используем генератор для повторного yield некоторого фрагмента.

Для этого нам нужно знать, сколько ненулевых значений в каждом фрагменте, и прочитать соответствующее количество значений и индексов столбцов. Ненулевой счетчик удобно считывать из массива indptr. Это достигается чтением некоторого количества записей из огромного файла indptr, соответствующего желаемому размеру фрагмента. Последняя запись этой части файла indptr за вычетом количества ненулевых значений перед дает количество ненулевых значений в этом фрагменте. Таким образом, массивы фрагментов data и indices просто вырезаны из больших файлов data и indices. Массив indptr должен быть искусственно добавлен нулем (это то, чего хочет формат, не спрашивайте меня: D).

Затем мы можем просто построить разреженную матрицу с фрагментами data, indices и indptr, чтобы получить новую разреженную матрицу.

Следует отметить, что фактический размер матрицы не может быть напрямую восстановлен только по трем массивам. Это либо максимальный индекс столбца матрицы, либо если вам не повезло и в чанке нет неопределенных данных. Поэтому нам также нужно передать количество столбцов.

Я, наверное, довольно сложно объяснил, так что просто прочитайте это просто как непрозрачный кусок кода, который реализует такой генератор:

import numpy as np
import scipy.sparse


def gen_batches(batch_size, sparse_data_path, sparse_indices_path, 
                sparse_indptr_path, dtype=np.float32, column_size=None):
    data_item_size = dtype().itemsize

    with open(sparse_data_path, 'rb') as data_file, \
            open(sparse_indices_path, 'rb') as indices_file, \
            open(sparse_indptr_path, 'rb') as indptr_file:
        nnz_before = np.fromstring(indptr_file.read(4), dtype=np.int32)

        while True:
            indptr_batch = np.frombuffer(nnz_before.tobytes() +
                              indptr_file.read(4*batch_size), dtype=np.int32)

            if len(indptr_batch) == 1:
                break

            batch_indptr = indptr_batch - nnz_before
            nnz_before = indptr_batch[-1]
            batch_nnz = np.asscalar(batch_indptr[-1])

            batch_data = np.frombuffer(data_file.read(
                                       data_item_size * batch_nnz), dtype=dtype)
            batch_indices = np.frombuffer(indices_file.read(
                                          4 * batch_nnz), dtype=np.int32)

            dimensions = (len(indptr_batch)-1, column_size)

            matrix = scipy.sparse.csr_matrix((batch_data, 
                           batch_indices, batch_indptr), shape=dimensions)

            yield matrix


if __name__ == '__main__':
    sparse = scipy.sparse.random(5, 4, density=0.1, format='csr', dtype=np.float32)

    sparse.data.tofile('sparse.data')        # dtype as specified above  ^^^^^^^^^^
    sparse.indices.tofile('sparse.indices')  # dtype=int32
    sparse.indptr.tofile('sparse.indptr')    # dtype=int32

    print(sparse.toarray())
    print('========')

    for batch in gen_batches(2, 'sparse.data', 'sparse.indices', 
                             'sparse.indptr', column_size=4):
        print(batch.toarray())

numpy.ndarray.tofile() просто хранит двоичные массивы, поэтому вам нужно запомнить формат данных. scipy.sparse представляет indices и indptr как int32, так что это ограничение общего размера матрицы.

Также я проверил код и обнаружил, что конструктор матрицы scipy csr является узким местом для небольших матриц. Ваш пробег может отличаться, это просто «доказательство принципа».

Если есть необходимость в более сложной реализации или что-то слишком запутанное, просто напишите мне :)

person Obay    schedule 19.06.2018