RxPy читает файлы csv и строки процесса

Я хочу использовать RxPy для открытия файла (csv) и обработки файла построчно. Я точно предполагаю следующие шаги

  1. указать имя файла для потока
  2. открыть файл
  3. читать файл построчно
  4. удалить строки, начинающиеся с комментария (например, # ...)
  5. применить CSV-ридер
  6. отфильтровать записи, соответствующие некоторым критериям

Пока у меня есть:

def to_file(filename):
f = open(filename)
return Observable.using(
    lambda: AnonymousDisposable(lambda: f.close()),
    lambda d: Observable.just(f)
)

def to_reader(f):
    return csv.reader(f)

def print_rows(reader):
    for row in reader:
        print(row)

Это работает

Observable.from_(["filename.csv", "filename2.csv"])
   .flat_map(to_file).**map**(to_reader).subscribe(print_rows)

Это не так: ValueError: операция ввода-вывода в закрытом файле

Observable.from_(["filename.csv", "filename2.csv"])
   .flat_map(to_file).**flat_map**(to_rows).subscribe(print)

Второй не работает, потому что (см. https://github.com/ReactiveX/RxPY/issues/69)

Когда наблюдаемые из первой плоской карты объединяются второй плоской картой, внутренние подписки будут удалены после их завершения. Таким образом, файлы будут закрыты, даже если дескрипторы файлов on_next'ed в новую наблюдаемую, установленную второй плоской картой.

Любая идея, как я могу достичь: что-то вроде:

Observable.from_(["filename.csv", "filename2.csv"]
   ).flat_map(to_file
   ).filter(comment_lines
   ).filter(empty_lines
   ).map(to_csv_reader
   ).filter(filter_by.. )
   ).do whatever

Большое спасибо за твою помощь

Юрген


person Juergen    schedule 29.12.2016    source источник


Ответы (1)


Я только недавно начал работать с RxPy, и мне нужно было сделать то же самое. Удивлен, что кто-то еще не ответил на ваш вопрос, но решил ответить на всякий случай, если кому-то еще нужно знать. Предполагая, что у вас есть файл CSV, подобный этому:

$ cat datafile.csv
"iata","airport","city","state","country","lat","long"
"00M","Thigpen ","Bay Springs","MS","USA",31.95376472,-89.23450472
"00R","Livingston Municipal","Livingston","TX","USA",30.68586111,-95.01792778
"00V","Meadow Lake","Colorado Springs","CO","USA",38.94574889,-104.5698933
"01G","Perry-Warsaw","Perry","NY","USA",42.74134667,-78.05208056
"01J","Hilliard Airpark","Hilliard","FL","USA",30.6880125,-81.90594389

Вот решение:

from rx import Observable
from csv import DictReader

Observable.from_(DictReader(open('datafile.csv', 'r'))) \
          .subscribe(lambda row: 
                     print("{0:3}\t{1:<35}".format(row['iata'], row['airport'][:35]))
          )
person Tony Piazza    schedule 22.12.2017