Как я могу асинхронно обрабатывать xml в python?

У меня есть большой файл данных XML (> 160M) для обработки, и кажется, что синтаксический анализ SAX/expat/pulldom - это то, что нужно. Я хотел бы иметь поток, который просеивает узлы и помещает узлы для обработки в очередь, а затем другие рабочие потоки извлекают из очереди следующий доступный узел и обрабатывают его.

У меня есть следующее (у него должны быть замки, я знаю - это будет позже)

import sys, time
import xml.parsers.expat
import threading

q = []

def start_handler(name, attrs):
    q.append(name)

def do_expat():
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")


t = threading.Thread(group=None, target=do_expat)
t.start()

while True:
    print(q)
    time.sleep(1)

Проблема в том, что тело блока while вызывается только один раз, и тогда я даже не могу его прервать ctrl-C. В файлах меньшего размера вывод соответствует ожидаемому, но это, по-видимому, указывает на то, что обработчик вызывается только тогда, когда документ полностью проанализирован, что, по-видимому, противоречит цели синтаксического анализатора SAX.

Я уверен, что это мое собственное невежество, но я не понимаю, где я делаю ошибку.

PS: я также пытался изменить start_handler таким образом:

def start_handler(name, attrs):
    def app():
        q.append(name)
    u = threading.Thread(group=None, target=app)
    u.start()

Хотя любви нет.


person decitrig    schedule 18.01.2010    source источник


Ответы (4)


ParseFile, как мы заметили, просто "проглатывает" все - это не годится для инкрементного синтаксического анализа, который вы хотите сделать! Таким образом, просто загружайте файл в синтаксический анализатор понемногу, убедившись, что условно передаете управление другим потокам по мере продвижения, например:

while True:
  data = f.read(BUFSIZE)
  if not data:
    p.Parse('', True)
    break
  p.Parse(data, False)
  time.sleep(0.0)

вызов time.sleep(0.0) — это способ Python сказать «уступать другим потокам, если они готовы и ожидают»; метод Parse задокументирован здесь.

Второй момент: забудьте о блокировках для этого использования! -- вместо этого используйте Queue.Queue, это по своей сути потокобезопасный и почти всегда лучший и самый простой способ координировать несколько потоков в Python. Просто создайте экземпляр Queue q, q.put(name) на нем и заблокируйте рабочие потоки на q.get(), ожидая выполнения дополнительной работы — это НАСТОЛЬКО просто!

(Существует несколько вспомогательных стратегий, которые вы можете использовать для координации завершения рабочих потоков, когда им больше нечего делать, но самый простой, при отсутствии особых требований, — просто сделать их потоками-демонами, чтобы все они завершались, когда основной поток — см. документацию).

person Alex Martelli    schedule 19.01.2010
comment
Проголосовали за предложения Queue, но уверены ли вы в том, что ParseFile проглотит все за один раз? Он обращается к обработчикам Python для обработки тегов по мере их поступления, в этом вся цель синтаксического анализа SAX ... или вы говорите, что этого недостаточно, чтобы вызвать переключение потока в Python? - person Bandi-T; 19.01.2010
comment
Если вам нужен SAX, вы можете использовать xml.sax, см. docs.python.org/library/ ; OP не использует SAX, а использует xml.parsers.expat, интерфейс с более низкой абстракцией, который не навязывает инкрементную стратегию (он поддерживает ее, но не < i>наложить его, оставив выбор на уровне кода Python). - person Alex Martelli; 19.01.2010
comment
Выбор expat был несколько произвольным, я не мог найти хорошего объяснения разницы между expat и sax. Модуль sax работает так же хорошо — возможно, даже лучше, поскольку он кажется настолько асинхронным, насколько мне было нужно. В конце концов, я все равно принял метод подачи по частям, так как это дает мне возможность стерилизовать строки, которые я загружаю, до того, как синтаксический анализатор доберется до них. Очень полезный ответ, спасибо. - person decitrig; 19.01.2010
comment
@slide_rule, добро пожаловать — и да, SAX обеспечивает асинхронное (управляемое событиями) использование и позволяет вам использовать различные базовые синтаксические анализаторы (тот, который поставляется с Python, но вы можете установить другие, например, те, которые проверяют относительно XML-схема и т. д.). - person Alex Martelli; 19.01.2010

Я не слишком уверен в этой проблеме. Я предполагаю, что вызов ParseFile блокируется, и из-за GIL выполняется только поток синтаксического анализа. Вместо этого можно было бы использовать multiprocessing. В любом случае, он предназначен для работы с очередями.

Вы создаете Process и можете передать ему Queue:

import sys, time
import xml.parsers.expat
import multiprocessing
import Queue

def do_expat(q):
    p = xml.parsers.expat.ParserCreate()

    def start_handler(name, attrs):
        q.put(name)

    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")

if __name__ == '__main__':
    q = multiprocessing.Queue()
    process = multiprocessing.Process(target=do_expat, args=(q,))
    process.start()

    elements = []
    while True:
        while True:
            try:
                elements.append(q.get_nowait())
            except Queue.Empty:
                break

        print elements
        time.sleep(1)

Я включил список элементов, просто чтобы воспроизвести ваш оригинальный сценарий. Ваше окончательное решение, вероятно, будет использовать get_nowait и Pool или что-то похожее.

person Brian McKenna    schedule 19.01.2010
comment
Да, это хороший путь - как вы сказали, вы все равно захотите использовать очереди. - person Bandi-T; 19.01.2010
comment
Я попробовал этот код; это позволяет избежать блокировки, но ParseFile по-прежнему ничего не выводит, пока не прочитает весь ввод. - person decitrig; 19.01.2010

Единственное, что я вижу неправильно, это то, что вы одновременно обращаетесь к q из разных потоков - без блокировки, как вы действительно пишете. Это напрашивается на неприятности - и вы, вероятно, получаете проблемы в виде блокировки интерпретатора Python. :)

Попробуйте заблокировать, это действительно не очень сложно:

import sys, time
import xml.parsers.expat
import threading

q = []
q_lock = threading.Lock() <---

def start_handler(name, attrs):
    q_lock.acquire() <---
    q.append(name)
    q_lock.release() <---

def do_expat():
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")


t = threading.Thread(group=None, target=do_expat)
t.start()

while True:
    q_lock.acquire() <---
    print(q)
    q_lock.release() <---
    time.sleep(1)

Видите ли, это было очень просто, мы просто создали переменную блокировки для защиты нашего объекта, и получали эту блокировку каждый раз, прежде чем использовать объект, и освобождали каждый раз после того, как мы закончили нашу задачу над объектом. Таким образом мы гарантировали, что q.append(name) никогда не перекроется с print(q).


(В более новых версиях Python также есть синтаксис «with ....», который помогает вам не снимать блокировки, не закрывать файлы или другие очистки, о которых часто забывают.)

person Bandi-T    schedule 19.01.2010

Я не очень много знаю о реализации, но если синтаксический анализ представляет собой код C, который выполняется до завершения, другие потоки Python не будут выполняться. Если синтаксический анализатор обращается к коду Python, GIL может быть выпущен для запуска других потоков, но я не уверен в этом. Возможно, вы захотите проверить эти детали.

person Noctis Skytower    schedule 19.01.2010