zeromq: как предотвратить бесконечное ожидание?

Я только начал с ZMQ. Я разрабатываю приложение, рабочий процесс которого:

  1. один из многих клиентов (имеющих случайные PULL-адреса) PUSH-запрос на сервер по адресу 5555
  2. сервер всегда ждет клиентских PUSH-сообщений. Когда он приходит, для этого конкретного запроса создается рабочий процесс. Да, рабочие процессы могут существовать одновременно.
  3. Когда этот процесс завершает свою задачу, он отправляет результат клиенту.

Я предполагаю, что для этого подходит архитектура PUSH/PULL. Пожалуйста, поправьте меня в этом.


Но как мне справиться с этими сценариями?

  1. client_receiver.recv() будет ждать бесконечное время, когда сервер не сможет ответить.
  2. клиент может отправить запрос, но сразу после этого произойдет сбой, поэтому рабочий процесс навсегда останется на server_sender.send().

Итак, как мне настроить что-то вроде тайм-аута в модели PUSH/PULL?


РЕДАКТИРОВАТЬ: спасибо за предложения пользователя 938949, я получил рабочий ответ и делюсь им с потомками.


person Jesvin Jose    schedule 24.09.2011    source источник
comment
Я не эксперт по 0mq, но во многих подобных ситуациях лучше создать пул рабочих процессов при запуске, а не создавать рабочие процессы в ответ на сообщения. Может быть, я неправильно вас понимаю.   -  person wberry    schedule 24.09.2011
comment
Хорошая точка зрения. На самом деле я планирую предварительно разветвить рабочих. Я только что понял, что это может быть тривиально с 0mq.   -  person Jesvin Jose    schedule 24.09.2011


Ответы (4)


Если вы используете zeromq >= 3.0, вы можете установить параметр сокета RCVTIMEO:

client_receiver.RCVTIMEO = 1000 # in milliseconds

Но в целом можно использовать поллеры:

poller = zmq.Poller()
poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send

И poller.poll() берет тайм-аут:

evts = poller.poll(1000) # wait *up to* one second for a message to arrive.

evts будет пустым списком, если нечего получать.

Вы можете опросить с помощью zmq.POLLOUT, чтобы проверить, будет ли отправка успешной.

Или, чтобы обработать случай с узлом, который мог выйти из строя, a:

worker.send(msg, zmq.NOBLOCK)

может быть достаточно, что всегда будет возвращаться немедленно, вызывая ZMQError(zmq.EAGAIN), если отправка не может быть завершена.

person minrk    schedule 24.09.2011
comment
не могли бы вы подробнее рассказать о zmq.NOBLOCK? - person Jesvin Jose; 24.09.2011
comment
Привет, нам нужно перерегистрировать сокет (в опросчике) каждый раз, когда мы отключаем и снова подключаем его? - person mariolpantunes; 11.05.2014
comment
Нет, только если вы закрываете сокет и открываете новый, вам нужно перерегистрироваться. - person minrk; 12.05.2014
comment
Как сказано ниже @Adobri и @mknaf: при использовании zmq.RCVTIMEO вам также необходимо установить zmq.LINGER, иначе сокет все равно не закроется даже после истечения времени ожидания. В Python это socket.setsockopt(zmq.RCVTIMEO, 1000) socket.setsockopt(zmq.LINGER, 0) message = socket.recv() - person dthor; 02.11.2016
comment
Обе строки работают в питоне: results_receiver.RCVTIMEO = 1000 и results_receiver.setsockopt(zmq.RCVTIMEO, 1000) - person silgon; 23.08.2017
comment
Вы также должны поднять ошибку zmq.ZMQError, если вы используете схему try: except:. - person silgon; 23.08.2017

Это был быстрый хак, который я сделал после того, как сослался на ответ пользователя938949 и http://taotetek.wordpress.com/2011/02/02/python-multiprocessing-with-zeromq/. Если у вас получится лучше, опубликуйте свой ответ, я порекомендую ваш ответ.

Тем, кто ищет долговременные решения по надежности, см. http://zguide.zeromq.org/page:all#toc64

Zeromq версии 3.0 (бета-банкомат) поддерживает время ожидания в ZMQ_RCVTIMEO и ZMQ_SNDTIMEO. http://api.zeromq.org/3-0:zmq-setsockopt

Сервер

Zmq.NOBLOCK гарантирует, что если клиент не существует, send() не блокируется.

import time
import zmq
context = zmq.Context()

ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")

i=0

while True:
    i=i+1
    time.sleep(0.5)
    print ">>sending message ",i
    try:
        ventilator_send.send(repr(i),zmq.NOBLOCK)
        print "  succeed"
    except:
        print "  failed"

Клиент

Объект опроса может прослушивать многие принимающие сокеты (см. «Многопроцессорная обработка Python с ZeroMQ», ссылка на которую приведена выше. Я связал его только с work_receiver. В бесконечном цикле клиент опрашивает с интервалом в 1000 мс. Объект socks возвращает пустое значение, если за это время не было получено ни одного сообщения.

import time
import zmq
context = zmq.Context()

work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")

poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)

# Loop and accept messages from both channels, acting accordingly
while True:
    socks = dict(poller.poll(1000))
    if socks:
        if socks.get(work_receiver) == zmq.POLLIN:
            print "got message ",work_receiver.recv(zmq.NOBLOCK)
    else:
        print "error: message timeout"
person Jesvin Jose    schedule 26.09.2011
comment
Есть ли в Python select? В библиотеке Ruby есть такой, как IO.select. Вы можете: if ZMQ.select([read_socket], nil, nil, 20); puts read_socket.recv; else; puts 'timeout 20 secs'; end - person mixonic; 06.03.2012

Отправка не будет блокироваться, если вы используете ZMQ_NOBLOCK, но если вы попытаетесь закрыть сокет и контекст, этот шаг заблокирует выход программы.

Причина в том, что сокет ожидает любого однорангового узла, чтобы исходящие сообщения попадали в очередь. Чтобы немедленно закрыть сокет и сбросить исходящие сообщения из буфера, используйте ZMQ_LINGER и установите его в 0..

person Adobri    schedule 01.06.2012
comment
zmq.RCVTIMEO не поможет вам, если вы не используете zmq.LINGER, потому что по истечении времени ожидания сокет все равно не закроется. Это следует добавить к выбранному ответу. - person mknaf; 28.03.2014

Если вы ждете только один сокет, а не создаете Poller, вы можете сделать это:

if work_receiver.poll(1000, zmq.POLLIN):
    print "got message ",work_receiver.recv(zmq.NOBLOCK)
else:
    print "error: message timeout"

Вы можете использовать это, если ваш тайм-аут меняется в зависимости от ситуации, вместо установки work_receiver.RCVTIMEO.

person Mathieu Longtin    schedule 02.05.2019
comment
хороший ответ : ) - person Google; 14.07.2021