Python: неблокирующее чтение из стандартного вывода многопоточного подпроцесса

У меня есть скрипт (worker.py), который выводит небуферизованный вывод в виде...

1
2
3
.
.
.
n

где n — некоторое постоянное количество итераций, которое будет выполнять цикл в этом скрипте. В другом скрипте (service_controller.py) я запускаю несколько потоков, каждый из которых запускает подпроцесс, используя subprocess.Popen(stdout=subprocess.PIPE, ...); Теперь в моем основном потоке (service_controller.py) я хочу прочитать вывод подпроцесса worker.py каждого потока и использовать его для расчета оценки времени, оставшегося до завершения.

У меня работает вся логика, которая считывает стандартный вывод из worker.py и определяет последнее напечатанное число. Проблема в том, что я не могу понять, как это сделать неблокирующим способом. Если я прочитаю постоянный размер буфера, то каждое чтение будет заканчиваться ожиданием одних и тех же данных от каждого из рабочих. Я пробовал множество способов, включая использование fcntl, select + os.read и т. д. Какой здесь лучший вариант? Я могу опубликовать свой источник, если это необходимо, но я решил, что объяснение описывает проблему достаточно хорошо.

Спасибо за любую помощь здесь.

ИЗМЕНИТЬ
Добавление примера кода

У меня есть рабочий, который запускает подпроцесс.

class WorkerThread(threading.Thread):
    def __init__(self):
        self.completed = 0
        self.process = None
        self.lock = threading.RLock()
        threading.Thread.__init__(self)

    def run(self):
        cmd = ["/path/to/script", "arg1", "arg2"]
        self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1, shell=False)
        #flags = fcntl.fcntl(self.process.stdout, fcntl.F_GETFL)
        #fcntl.fcntl(self.process.stdout.fileno(), fcntl.F_SETFL, flags | os.O_NONBLOCK)

    def get_completed(self):
        self.lock.acquire();
        fd = select.select([self.process.stdout.fileno()], [], [], 5)[0]
        if fd:
            self.data += os.read(fd, 1)
            try:
                self.completed = int(self.data.split("\n")[-2])
            except IndexError:
                pass
        self.lock.release()
        return self.completed

Затем у меня есть ThreadManager.

class ThreadManager():
    def __init__(self):
        self.pool = []
        self.running = []
        self.lock = threading.Lock()

    def clean_pool(self, pool):
        for worker in [x for x in pool is not x.isAlive()]:
            worker.join()
            pool.remove(worker)
            del worker
        return pool

    def run(self, concurrent=5):
        while len(self.running) + len(self.pool) > 0:
            self.clean_pool(self.running)
            n = min(max(concurrent - len(self.running), 0), len(self.pool))
            if n > 0:
                for worker in self.pool[0:n]:
                    worker.start()
                self.running.extend(self.pool[0:n])
                del self.pool[0:n]
            time.sleep(.01)
         for worker in self.running + self.pool:
             worker.join()

и некоторый код для его запуска.

threadManager = ThreadManager()
for i in xrange(0, 5):
    threadManager.pool.append(WorkerThread())
threadManager.run()

Я удалил журнал другого кода в надежде попытаться точно определить проблему.


person sberry    schedule 18.03.2010    source источник
comment
Вы используете Linux или другой Unix? Если это так, select + os.read 1 byte должен работать нормально — можете ли вы показать нам код, который у вас есть в этой строке, и какую ошибку или неправильное поведение он вам дает?   -  person Alex Martelli    schedule 18.03.2010
comment
На самом деле это работает на Windoze для разработки, будет либо на Fedora, либо на OS X для производства.   -  person sberry    schedule 18.03.2010


Ответы (2)


Вместо того, чтобы ваш service_controller был заблокирован доступом к вводу-выводу, только цикл потока должен считывать свой собственный управляемый вывод процесса.

затем у вас может быть метод в объекте с потоком, управляющий процессом, чтобы получить последний опрашиваемый вывод.

конечно, не забудьте в этом случае использовать некий механизм блокировки для защиты буфера, который будет использоваться как потоком для его заполнения, так и методом, вызываемым контроллером для его получения.

person dweeves    schedule 18.03.2010
comment
Я далек от того, что вы предлагаете? У меня есть объект с резьбой, управляющий процессом, чтобы получить последний опрашиваемый вывод... - person sberry; 18.03.2010
comment
ваш метод get_completed заполняет только self.completed , я бы предложил переименовать его в update_completed. затем добавление метода get_completed, возвращающего self.completed (добавление threading.RLock для защиты доступа к нему). Затем в вашем диспетчере потоков вы можете периодически вызывать get_completed для своих воркеров. - person dweeves; 18.03.2010
comment
На самом деле метод get_completed должен был возвращать self.completed (я пропустил его по ошибке при повторном вводе). Я добавил RLock вокруг чтения, но у меня все еще та же проблема. - person sberry; 18.03.2010
comment
Я просто не могу уложить это в голове. Возможно, немного сна сегодня вечером поможет мне совершить прорыв, который я ищу. - person sberry; 19.03.2010

Ваш метод WorkerThread.run() запускает подпроцесс, а затем немедленно завершает работу. Run() должен выполнять опрос и обновлять WorkerThread.completed до тех пор, пока подпроцесс не завершится.

person Jeff Younker    schedule 21.09.2011