Как прочитать CSV-файл из потока и обработать каждую строку по мере ее написания?

Я хотел бы прочитать файл CSV со стандартного ввода и обработать каждую строку по мере ее поступления. Мой код вывода CSV записывает строки одну за другой, но мой читатель ждет завершения потока, прежде чем повторять строки. Это ограничение модуля csv? Я делаю что-то неправильно?

Мой код для чтения:

import csv
import sys
import time


reader = csv.reader(sys.stdin)
for row in reader:
    print "Read: (%s) %r" % (time.time(), row)

Мой код писателя:

import csv
import sys
import time


writer = csv.writer(sys.stdout)
for i in range(8):
    writer.writerow(["R%d" % i, "$" * (i+1)])
    sys.stdout.flush()
    time.sleep(0.5)

Выход python test_writer.py | python test_reader.py:

Read: (1309597426.3) ['R0', '$']
Read: (1309597426.3) ['R1', '$$']
Read: (1309597426.3) ['R2', '$$$']
Read: (1309597426.3) ['R3', '$$$$']
Read: (1309597426.3) ['R4', '$$$$$']
Read: (1309597426.3) ['R5', '$$$$$$']
Read: (1309597426.3) ['R6', '$$$$$$$']
Read: (1309597426.3) ['R7', '$$$$$$$$']

Как видите, все операторы печати выполняются одновременно, но я ожидаю, что между ними будет промежуток в 500 мс.


person muhuk    schedule 02.07.2011    source источник
comment
Что произойдет, если вы запустите только python test_writer.py?   -  person Roland Illig    schedule 02.07.2011


Ответы (3)


Как говорится в документации,

Чтобы сделать цикл for наиболее эффективным способом обхода строк файла (очень распространенная операция), метод next() использует скрытый буфер упреждающего чтения.

И вы можете увидеть, посмотрев на реализация модуля csv (строка 784), что csv.reader вызывает метод next() базового итератора (через PyIter_Next).

Поэтому, если вы действительно хотите небуферизованное чтение CSV-файлов, вам нужно преобразовать файловый объект (здесь sys.stdin) в итератор, чей метод next() на самом деле вместо этого вызывает readline(). Это легко сделать с помощью двухаргументной формы функции iter. . Поэтому измените код в test_reader.py на что-то вроде этого:

for row in csv.reader(iter(sys.stdin.readline, '')):
    print("Read: ({}) {!r}".format(time.time(), row))

Например,

$ python test_writer.py | python test_reader.py
Read: (1388776652.964925) ['R0', '$']
Read: (1388776653.466134) ['R1', '$$']
Read: (1388776653.967327) ['R2', '$$$']
Read: (1388776654.468532) ['R3', '$$$$']
[etc]

Можете ли вы объяснить, почему вам нужно небуферизованное чтение файлов CSV? Может быть лучшее решение для того, что вы пытаетесь сделать.

person Gareth Rees    schedule 02.07.2011
comment
Отличный ответ, спасибо. Причина, по которой мне это было нужно, заключалась в том, что обработка результатов по мере их поступления обеспечивала бы мне скорость. 1-я операция — это чтение из сети, а 2-я операция — запись на диск, и обе они требуют определенных трансляций с интенсивным использованием ЦП. Также мне нужно было, чтобы они были связаны (через каналы), чтобы можно было повторно использовать сценарии (а-ля unix). - person muhuk; 04.07.2011

Возможно, это ограничение. Прочтите этот http://docs.python.org/using/cmdline.html#cmdoption-unittest-discover-u

Обратите внимание, что существует внутренняя буферизация в file.readlines() и файловых объектах (для строки в sys.stdin), на которую эта опция не влияет. Чтобы обойти это, вы захотите использовать file.readline() внутри цикла while 1:.

Я изменил test_reader.py следующим образом:

import csv, sys, time

while True:
    print "Read: (%s) %r" % (time.time(), sys.stdin.readline())

Вывод

python test_writer.py | python  test_reader.py
Read: (1309600865.84) 'R0,$\r\n'
Read: (1309600865.84) 'R1,$$\r\n'
Read: (1309600866.34) 'R2,$$$\r\n'
Read: (1309600866.84) 'R3,$$$$\r\n'
Read: (1309600867.34) 'R4,$$$$$\r\n'
Read: (1309600867.84) 'R5,$$$$$$\r\n'
Read: (1309600868.34) 'R6,$$$$$$$\r\n'
Read: (1309600868.84) 'R7,$$$$$$$$\r\n'
person user    schedule 02.07.2011
comment
Ты прав. Но как заставить csv.reader воспользоваться этим взломом? - person muhuk; 02.07.2011

Вы очищаете стандартный вывод, но не стандартный ввод.

Sys.stdin также имеет метод flush(), попробуйте использовать его после чтения каждой строки, если вы действительно хотите отключить буферизацию.

person RoundTower    schedule 02.07.2011
comment
Было бы круто, если бы downvoter оставил небольшое пояснение. Я действительно хотел бы знать, почему вызов stdin.flush() не помогает. - person muhuk; 07.07.2011