Асинхронный подпроцесс в Windows

Прежде всего, общая проблема, которую я решаю, немного сложнее, чем я показываю здесь, поэтому, пожалуйста, не говорите мне «использовать потоки с блокировкой», поскольку это не решит мою реальную ситуацию без честного, ЧЕСТНОГО переписывания и рефакторинг.

У меня есть несколько приложений, которые я не могу модифицировать, которые берут данные из стандартного ввода и выводят их на стандартный вывод после выполнения своей магии. Моя задача состоит в том, чтобы связать несколько таких программ. Проблема в том, что иногда они задыхаются, и поэтому мне нужно отслеживать их прогресс, который выводится в STDERR.

pA = subprocess.Popen(CommandA,  shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# ... some more processes make up the chain, but that is irrelevant to the problem
pB = subprocess.Popen(CommandB, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=pA.stdout )

Теперь прямое чтение через pA.stdout.readline() и pB.stdout.readline() или обычные функции read() блокирует. Поскольку разные приложения выводят данные с разной скоростью и в разных форматах, блокировка невозможна. (И, как я писал выше, многопоточность не вариант, если только в крайнем, крайнем случае.) pA.communicate() безопасен для взаимоблокировок, но, поскольку мне нужна информация в реальном времени, это тоже не вариант.

Таким образом, Google привел меня к этому асинхронному фрагмент подпроцесса в ActiveState.

Сначала все хорошо, пока не реализую. Сравнивая вывод cmd.exe pA.exe | pB.exe, игнорируя тот факт, что оба вывода выводятся в одно и то же окно, создающее беспорядок, я вижу очень мгновенные обновления. Однако я реализую то же самое, используя приведенный выше фрагмент и объявленную там функцию read_some(), и уведомление об обновлениях одного канала занимает более 10 секунд. Но когда это происходит, у него есть обновления, ведущие, например, к 40% прогресса.

Таким образом, я провел еще несколько исследований и увидел множество тем, касающихся PeekNamedPipe, анонимных дескрипторов и возврата 0 доступных байтов, даже если в канале есть информация. Поскольку эта тема оказалась за пределами моего опыта по исправлению или написанию кода, я пришел в Stack Overflow, чтобы найти руководство. :)

Моя платформа — 64-битная W7 с Python 2.6, приложения — 32-битные, если это имеет значение, и совместимость с Unix не вызывает беспокойства. Я даже могу иметь дело с полным решением ctypes или pywin32, которое полностью разрушает подпроцесс, если это единственное решение, если я могу асинхронно читать из каждого канала stderr с немедленной производительностью и без взаимоблокировок. :)


person Stigma    schedule 31.03.2010    source источник
comment
Посмотрите, как реализован communicate. В Windows он использует потоки, в то время как на других платформах используется выбор или опрос. Я не знаю, почему автор не использовал select в окнах, я не знаю окон. Попробуйте изучить и реализовать его с помощью select.   -  person mg.    schedule 31.03.2010
comment
В Windows select работает только с сокетами, поэтому для communicate он совершенно непригоден.   -  person Alex Martelli    schedule 31.03.2010
comment
Да, если бы select работал, это было бы относительно легко, так как это единственная команда, которую я знаю. :)   -  person Stigma    schedule 31.03.2010


Ответы (3)


Насколько плохо использовать потоки? Я столкнулся с той же проблемой и в конце концов решил использовать потоки для сбора всех данных в stdout и stderr подпроцесса и поместить их в потокобезопасную очередь, которую основной поток может читать блокирующим образом, без необходимости беспокоиться о потоках, происходящих за кулисами.

Непонятно, какие проблемы вы ожидаете от решения, основанного на потоках и блокировке. Вы беспокоитесь о необходимости сделать остальную часть вашего кода потокобезопасной? Это не должно быть проблемой, поскольку поток ввода-вывода не должен взаимодействовать ни с какой остальной частью вашего кода или данных. Если у вас очень строгие требования к памяти или ваш конвейер особенно длинный, возможно, вы недовольны созданием такого количества потоков. Я недостаточно знаю о вашей ситуации, поэтому я не могу сказать, может ли это быть проблемой, но мне кажется, что, поскольку вы уже порождаете дополнительные процессы, несколько потоков для взаимодействия с ними не должны быть ужасное бремя. В моей ситуации я не нашел эти потоки ввода-вывода особенно проблематичными.

Моя функция потока выглядела примерно так:

def simple_io_thread(pipe, queue, tag, stop_event):
    """
    Read line-by-line from pipe, writing (tag, line) to the
    queue. Also checks for a stop_event to give up before
    the end of the stream.
    """
    while True:
        line = pipe.readline()

        while True:
            try:
                # Post to the queue with a large timeout in case the
                # queue is full.
                queue.put((tag, line), block=True, timeout=60)
                break
            except Queue.Full:
                if stop_event.isSet():
                    break
                continue
        if stop_event.isSet() or line=="":
            break
    pipe.close()

Когда я запускаю подпроцесс, я делаю это:

outputqueue = Queue.Queue(50)
stop_event = threading.Event()
process = subprocess.Popen(
    command,
    cwd=workingdir,
    env=env,
    shell=useshell,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)
stderr_thread = threading.Thread(
    target=simple_io_thread,
    args=(process.stderr, outputqueue, "STDERR", stop_event)
)
stdout_thread = threading.Thread(
    target=simple_io_thread,
    args=(process.stdout, outputqueue, "STDOUT", stop_event)
)
stderr_thread.daemon = True
stdout_thread.daemon = True
stderr_thread.start()
stdout_thread.start()

Затем, когда я хочу прочитать, я могу просто заблокировать outputqueue - каждый считанный из него элемент содержит либо строку, определяющую, из какого канала он пришел, либо строку текста из этого канала. Очень мало кода выполняется в отдельном потоке, и он взаимодействует с основным потоком только через потокобезопасную очередь (плюс событие на случай, если мне нужно сдаться раньше). Возможно, такой подход был бы полезен и позволил бы решить проблему с потоками и блокировками, но без переписывания большого количества кода?

(Мое решение усложнено, потому что иногда я хочу завершить подпроцессы досрочно и хочу быть уверенным, что все потоки закончатся. Если это не проблема, вы можете избавиться от всех вещей stop_event, и это станет довольно кратким.)

person Weeble    schedule 04.04.2010
comment
Я бы хотел улучшить свой ответ, если бы кто-нибудь сказал мне, почему они минусуют его. В вопросе говорилось, что использование потоков с блокировкой потребует много переписывания, и я предполагаю, что это может быть не тот случай, когда использование потоков потребует много переписывания. - person Weeble; 08.04.2010
comment
Приносим извинения за поздний ответ (у меня ошибочно сложилось впечатление, что мой вопрос был остановлен, чтобы привлечь внимание). Я попробую, когда позволит время. Недавно я отказался от этого вопроса в пользу более продуктивных дел, но я определенно хочу его решить. - person Stigma; 25.04.2010

Я предполагаю, что конвейер процесса не заблокируется, если он использует только стандартный ввод и стандартный вывод; и проблема, которую вы пытаетесь решить, заключается в том, как сделать так, чтобы она не блокировалась, если они пишут в stderr (и должны иметь дело с возможным резервным копированием stderr).

Если вы позволяете нескольким процессам писать в stderr, вы должны следить за тем, чтобы их выходные данные не смешивались. Я предполагаю, что вы как-то разобрались с этим; просто поместите это там, чтобы быть уверенным.

Помните о флаге -u для python; это полезно при тестировании, чтобы увидеть, не портит ли вас буферизация ОС.

Если вы хотите эмулировать select() для файловых дескрипторов в win32, ваш единственный выбор — использовать PeekNamedPipe() и подобные. У меня есть фрагмент кода, который одновременно считывает построчный вывод из нескольких процессов, который вы даже можете использовать напрямую — попробуйте передать ему список дескрипторов proc.stderr и вперед.

class NoLineError(Exception): pass
class NoMoreLineError(Exception): pass
class LineReader(object):
    """Helper class for multi_readlines."""
    def __init__(self, f):
        self.fd = f.fileno()
        self.osf = msvcrt.get_osfhandle(self.fd)
        self.buf = ''

    def getline(self):
        """Returns a line of text, or raises NoLineError, or NoMoreLineError."""
        try:
            _, avail, _ = win32pipe.PeekNamedPipe(self.osf, 0)
            bClosed = False
        except pywintypes.error:
            avail = 0
            bClosed = True

        if avail:
            self.buf += os.read(self.fd, avail)

        idx = self.buf.find('\n')
        if idx >= 0:
            ret, self.buf = self.buf[:idx+1], self.buf[idx+1:]
            return ret
        elif bClosed:
            if self.buf:
                ret, self.buf = self.buf, None
                return ret
            else:
                raise NoMoreLineError
        else:
            raise NoLineError


def multi_readlines(fs, timeout=0):
    """Read lines from |fs|, a list of file objects.
    The lines come out in arbitrary order, depending on which files
    have output available first."""
    if type(fs) not in (list, tuple):
        raise Exception("argument must be a list.")
    objs = [LineReader(f) for f in fs]
    for i,obj in enumerate(objs): obj._index = i
    while objs:
        yielded = 0
        for i,obj in enumerate(objs):
            try:
                yield (obj._index, obj.getline())
                yielded += 1
            except NoLineError:
                #time.sleep(timeout)
                pass
            except NoMoreLineError:
                del objs[i]
                break   # Because we mutated the array

        if not yielded:
            time.sleep(timeout)
            pass

Я никогда не видел проблему «Peek возвращает 0 байтов, даже если данные доступны». Если это произойдет с другими, держу пари, их libc буферизует их stdout/stderr перед отправкой данных в ОС; вы ничего не можете сделать с этим извне. Вы должны каким-то образом заставить приложение использовать небуферизованный вывод (-u для python; вызовы win32/libc для изменения дескриптора файла stderr,...)

Тот факт, что вы ничего не видите, а затем тонну обновлений, заставляет меня думать, что ваша проблема заключается в буферизации на стороне источника. win32 libc может буферизоваться по-разному, если пишет в канал, а не в консоль. Опять же, лучшее, что вы можете сделать вне этих программ, — это агрессивно истощать их вывод.

person Paul Du Bois    schedule 07.04.2010
comment
Приносим извинения за поздний ответ. Я поиграю с вашим кодом, но краткий обзор показывает, что внутри он немного отличается от фрагментов, которые я уже опубликовал. Возможно, опция -u сработает, но это относится только к Python STDOUT и STDERR, если я правильно понимаю. В моем случае это порожденные процессы с subprocess.PIPE настроенными STDERR и STDOUT, которые задерживают и выводят рывками. Пожалуйста, поправьте меня, если я неправильно это понимаю. В любом случае, я все равно попробую, когда время позволит мне снова. :) - person Stigma; 25.04.2010
comment
Да, прости; Я должен был более четко изложить свою точку зрения, что я не думаю, что ваш общий подход неверен (поскольку наш код практически идентичен), и что ИМО, вероятно, проблема в окружающих частях. - person Paul Du Bois; 28.04.2010

Как насчет использования Twisted FD? http://twistedmatrix.com/documents/8.1.0/api/twisted.internet.fdesc.html

Он не асинхронный, но неблокирующий. Для асинхронных вещей вы можете использовать Twisted?

person Gregory    schedule 01.04.2010
comment
Я смотрел Twisted раньше, но это в значительной степени отдельный мир, и из-за всего, что связано с Интернетом, у меня были проблемы с поиском вещей, которые имели отношение ко мне. Спасибо за ссылку, я посмотрю, что я могу сделать с этим. Однако я немного боюсь, что это может в конечном итоге страдать от той же проблемы PeekNamedPipe, о которой я писал в своем вопросе, поскольку все в Windows и неблокирующих каналах, похоже, каким-то образом ведет к этому. - person Stigma; 02.04.2010