Случайное зависание/зависание в Python ZeroMQ

Я пишу сбалансированный клиент-рабочий сервис без брокера, написанный на python с помощью ZeroMQ.

Клиенты получают адрес работника, устанавливают соединение ( zmq.REQ / zmq.REP ), отправляют один запрос, получают один ответ и затем отключаются.

Я выбрал архитектуру без брокера, потому что объем данных, которые необходимо передать между клиентами и рабочими, относительно велик, несмотря на то, что для каждого соединения используется только одна пара REQ/REP, и использование брокера в качестве «посредника» создать узкое место.

При тестировании системы я заметил, что связь между клиентами и воркерами прерывалась случайным образом, лишь иногда возобновляясь через пару секунд (часто несколько минут).

Я сузил проблему до .connect()/.disconnect() от клиентов до рабочих.

Я написал два небольших скрипта на Python, которые воспроизводят ошибку.

import zmq

class Site:

      def __init__(self):
        ctx = zmq.Context()
        self.pair_socket = ctx.socket(zmq.REQ)
        self.num = 0


      def __del__(self):
        print "closed"


      def run_site(self):
        print "running..."
        while True:
            self.pair_socket.connect('tcp://127.0.0.1:5555')
            print 'connected'
            self.pair_socket.send_pyobj(self.num)
            print 'sent', self.num
            print self.pair_socket.recv_pyobj()
            self.pair_socket.disconnect('tcp://127.0.0.1:5555')
            print 'disconnected'
            self.num += 1

s = Site()
s.run_site()

и

import zmq

class Server:

      def __init__(self):
          ctx = zmq.Context()
          self.pair_socket = ctx.socket(zmq.REP)
          self.pair_socket.bind('tcp://127.0.0.1:5555')


      def __del__(self):
          print " closed"


      def run_server(self):
          print "running..."
          while True:
              x =  self.pair_socket.recv_pyobj()
              print x
              self.pair_socket.send_pyobj(x)


s = Server()  
s.run_server()

Я не думаю, что проблема связана с памятью или gc, так как я пытался отключить gc - без особого эффекта.

Я пытался использовать zmq.LINGER, как описано здесь: Zeromq с python зависает при подключении к недопустимому сокету

Что может привести к зависанию этих случайных чисел?


person Ellingr    schedule 23.03.2015    source источник
comment
Используйте анализатор пакетов, чтобы увидеть, какая сторона пары зависает... зависает ли сообщение на клиенте до его отправки или зависает на сервере после его получения. Как определить зависание, т. е. какое последнее сообщение вы видите перед началом зависания?   -  person Jason    schedule 23.03.2015
comment
При всем уважении, разработка while True: .connect(); ... ; .disconnect() — это довольно жестокий способ с точки зрения базовых ресурсов и связанных с ними системных накладных расходов. Конечно, есть гораздо лучшие и более экологичные / более экологичные способы выразить свои дизайнерские намерения в коде, которые не будут тратить ресурсы ЦП / ресурсов.   -  person user3666197    schedule 27.05.2016


Ответы (1)


Сокет REP является синхронным по определению. Таким образом, ваш сервер может обслуживать только один запрос за раз, остальные просто заполнят буфер и в какой-то момент потеряются.

Чтобы устранить основную причину, вам нужно вместо этого использовать сокет ROUTER.

class Server:
    def __init__(self):
        ctx = zmq.Context()
        self.pair_socket = ctx.socket(zmq.ROUTER)
        self.pair_socket.bind('tcp://127.0.0.1:5555')
        self.poller = zmq.Poller()
        self.poller.register(self.pair_socket, zmq.POLLIN)

    def __del__(self):
        print " closed"

    def run_server(self):
        print "running..."
        while True:
            try:
                items = dict(self.poller.poll())
            except KeyboardInterrupt:
                break
            if self.pair_socket in items:
                x = self.pair_socket.recv_multipart()
                print x
                self.pair_socket.send_multipart(x)
person Dorian B.    schedule 27.05.2016