Переполнение очереди между процессами Python

У меня есть производитель с N потребителями.

Производитель прослушивает сокет, который получает большое количество сообщений TCP (10 000 в минуту), считывает эти данные и помещает их в очередь для рабочих.

Рабочие, которые я настроил для чтения из очереди, следующим образом:

iterations = 0
work_iterations = 0
while True:
  try:
    iterations += 1
    data = queue.get(block=False)
    work_iterations +=1
    do_work(data)
  except Queue.Empty:
    time.sleep(0.001) #avoid high CPU usage


  if iterations == 100:
    load = float(work_iterations/iterations)
    print load
    iterations = 0
    work_iterations = 0

Это упрощенный код, но вы можете видеть, что я пытаюсь увидеть загрузку рабочего процесса, но посмотреть, сколько итераций из 100 рабочий процесс фактически смог вытащить работу из очереди. Если нагрузка постоянно 100/100, то я знаю, что очередь производителей/потребителей становится невыполненной. Теоретически это должно работать.

То, что я вижу на выходе, это много 0,97, 0,99 и очень мало 1,0. Но очередь заполняется в течение нескольких минут (у нее ограничение по размеру 10 000), и мне приходится начинать сбрасывать данные на стороне производителя. Кто-нибудь может пролить свет на то, почему это происходит? Если рабочий процесс выполняет в среднем 97/100 итераций, это означает, что очередь должна быть близка к пустой, нет?


person Martin Konecny    schedule 29.07.2013    source источник


Ответы (2)


Когда вы вызываете queue.get(block=False), Queue.Empty может быть поднят, даже если очередь на самом деле не пуста. В случае, если ваш текущий процесс не может получить блокировку для доступа к очереди, Queue.Empty будет поднят независимо от того, сколько элементов фактически находится в очереди.

Быстрый взгляд на код Queue.get() в multiprocessing/queues.py:

126    if not self._rlock.acquire(block, timeout):
127        raise Empty

Обратите внимание, что перед возбуждением исключения не проверяется, насколько заполнена очередь. Поскольку у вас так много информации в очереди, я подозреваю, что несколько раз, когда Queue.Empty поднимался, это было фактически вызвано тем, что производитель удерживал блокировку во время постановки в очередь, что приводило к сбою попытки вашего работника получить доступ к очереди.

Вы можете проверить это, внеся небольшое изменение в свой код:

except Queue.Empty:
    print queue.qsize() # returns the approximate number of elements in the queue

Как говорится в документации, это число не совсем надежно. Однако, поскольку вы имеете дело с таким большим количеством элементов в своей очереди, она должна быть достаточно близкой, чтобы сказать вам, ближе ли ваша очередь к 0 или 10 000.

person skrrgwasme    schedule 14.07.2014

Что если удалить block=Flase и time.sleep()? Вы не сможете считать рабочих.

person eri    schedule 29.07.2013