Использование наборов данных HDF5 (не путать с HDFS) в качестве целей в luigi

Судя по тому, что я прочитал в документации, luigi предназначен для работы с текстовыми файлами или необработанными двоичными файлами в качестве целей. Я пытаюсь создать рабочий процесс luigi для существующего конвейера обработки, который использует файлы HDF5 (из-за их многочисленных преимуществ) с использованием h5py в обычной файловой системе. Некоторые задачи в этом рабочем процессе не создают полностью новый файл, а добавляют новые наборы данных в существующий файл HDF. Используя h5py, я бы прочитал набор данных с помощью:

hdf = h5py.File('filepath','r')
hdf['internal/path/to/dataset'][...]

написать набор данных с:

hdf['internal/path/to/dataset'] = np.array(data)

и проверьте, существует ли набор данных в файле HDF с помощью этой строки:

'internal/path/to/dataset' in hdf

Мой вопрос: есть ли способ адаптировать luigi для работы с этими типами файлов? Мое чтение документации Луиджи заставляет меня думать, что я могу создать подкласс luigi.format.Format или, возможно, подкласс LocalTarget и создать собственный «открытый» метод. Но я не могу найти примеры того, как это реализовать. Большое спасибо за любые предложения!


person user2611761    schedule 20.12.2018    source источник
comment
h5py — это интерфейс к HDF5 файлам. HDFS - это нечто совершенно другое (файловая система на основе JAVA). Я вижу элементы luigi документов о HDFS, но ничего о HDF5.   -  person hpaulj    schedule 21.12.2018
comment
Как насчет моделирования на одном из модулей базы данных?   -  person hpaulj    schedule 21.12.2018


Ответы (1)


d6tflow имеет реализацию pandas HDF5 и может быть легко расширен для сохранения данных, отличных от фреймов данных pandas.

import d6tflow
from d6tflow.tasks.h5 import TaskH5Pandas
import pandas as pd

class Task1(TaskH5Pandas):
    def run(self):
        df = pd.DataFrame({'a':range(10)})
        self.save(df)

class Task2(d6tflow.tasks.TaskCachePandas):

    def requires(self):
        return Task1()

    def run(self):
        df = self.input().load()
        # use dataframe from HDF5

d6tflow.run([Task2])

Чтобы увидеть https://d6tflow.readthedocs.io/en/latest/targets.html#writing-your-own-targets о том, как расширить.

person citynorman    schedule 20.01.2019
comment
Мне еще предстоит реализовать это, но это именно то, что я ищу, спасибо! - person user2611761; 23.01.2019