Я хочу использовать RxPy для открытия файла (csv) и обработки файла построчно. Я точно предполагаю следующие шаги
- указать имя файла для потока
- открыть файл
- читать файл построчно
- удалить строки, начинающиеся с комментария (например, # ...)
- применить CSV-ридер
- отфильтровать записи, соответствующие некоторым критериям
Пока у меня есть:
def to_file(filename):
f = open(filename)
return Observable.using(
lambda: AnonymousDisposable(lambda: f.close()),
lambda d: Observable.just(f)
)
def to_reader(f):
return csv.reader(f)
def print_rows(reader):
for row in reader:
print(row)
Это работает
Observable.from_(["filename.csv", "filename2.csv"])
.flat_map(to_file).**map**(to_reader).subscribe(print_rows)
Это не так: ValueError: операция ввода-вывода в закрытом файле
Observable.from_(["filename.csv", "filename2.csv"])
.flat_map(to_file).**flat_map**(to_rows).subscribe(print)
Второй не работает, потому что (см. https://github.com/ReactiveX/RxPY/issues/69)
Когда наблюдаемые из первой плоской карты объединяются второй плоской картой, внутренние подписки будут удалены после их завершения. Таким образом, файлы будут закрыты, даже если дескрипторы файлов on_next'ed в новую наблюдаемую, установленную второй плоской картой.
Любая идея, как я могу достичь: что-то вроде:
Observable.from_(["filename.csv", "filename2.csv"]
).flat_map(to_file
).filter(comment_lines
).filter(empty_lines
).map(to_csv_reader
).filter(filter_by.. )
).do whatever
Большое спасибо за твою помощь
Юрген