Python Совместное использование сетевого сокета с multiprocessing.Manager

В настоящее время я пишу модуль прокси-сервера nginx с очередью запросов впереди, поэтому запросы не отбрасываются, когда серверы за nginx не могут обрабатывать запросы (nginx настроен как балансировщик нагрузки).

я использую

from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler

Идея состоит в том, чтобы поставить запрос в очередь перед их обработкой. Я знаю, что multiprocessing.Queue поддерживает только простой объект и не может поддерживать необработанные сокеты, поэтому я попытался использовать multiprocess.Manager для создания общего словаря. Диспетчер также использует сокеты для подключения, так что этот способ тоже не удался. Есть ли способ разделить сетевые сокеты между процессами? Вот проблемная часть кода:

class ProxyServer(Threader, HTTPServer):

    def __init__(self, server_address, bind_and_activate=True):
        HTTPServer.__init__(self, server_address, ProxyHandler,
                bind_and_activate)

        self.manager = multiprocessing.Manager()

        self.conn_dict = self.manager.dict()
        self.ticket_queue = multiprocessing.Queue(maxsize= 10)
        self._processes = []
        self.add_worker(5)


    def process_request(self, request, client):
        stamp = time.time()
        print "We are processing"

        self.conn_dict[stamp] = (request, client) # the program crashes here


    #Exception happened during processing of request from ('172.28.192.34', 49294)
    #Traceback (most recent call last):
    #  File "/usr/lib64/python2.6/SocketServer.py", line 281, in _handle_request_noblock
    #    self.process_request(request, client_address)
    #  File "./nxproxy.py", line 157, in process_request
    #    self.conn_dict[stamp] = (request, client)
    #  File "<string>", line 2, in __setitem__
    #  File "/usr/lib64/python2.6/multiprocessing/managers.py", line 725, in _callmethod
    #    conn.send((self._id, methodname, args, kwds))
    #TypeError: expected string or Unicode object, NoneType found

        self.ticket_queue.put(stamp)


    def add_worker(self, number_of_workers):
        for worker in range(number_of_workers):
            print "Starting worker %d" % worker
            proc = multiprocessing.Process(target=self._worker, args = (self.conn_dict,))
            self._processes.append(proc)
            proc.start()

    def _worker(self, conn_dict):
        while 1:
            ticket = self.ticket_queue.get()

            print conn_dict
            a=0
            while a==0:
                try:
                    request, client = conn_dict[ticket]
                    a=1
                except Exception:
                    pass
            print "We are threading!"
            self.threader(request, client)

person iu_long    schedule 17.12.2010    source источник


Ответы (3)


Вы можете использовать multiprocessing.reduction для передачи объектов соединения и сокета между процессами.

Пример кода

# Main process
from multiprocessing.reduction import reduce_handle
h = reduce_handle(client_socket.fileno())
pipe_to_worker.send(h)

# Worker process
from multiprocessing.reduction import rebuild_handle
h = pipe.recv()
fd = rebuild_handle(h)
client_socket = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
client_socket.send("hello from the worker process\r\n") 
person Kies    schedule 31.12.2011
comment
это то, что я искал, это не очень хорошо документировано на сайте python - person WojonsTech; 25.02.2013

Похоже, вам нужно передавать файловые дескрипторы между процессами ( предполагая здесь Unix, понятия не имею о Windows). Я никогда не делал этого в Python, но вот ссылка на проект python-passfd что вы, возможно, захотите проверить.

person Nikolai Fetissov    schedule 17.12.2010

Вы можете посмотреть на этот код — https://gist.github.com/sunilmallya/4662837, который это сервер сокетов multiprocessing.reduction с родительской обработкой, передающей соединения клиенту после принятия соединений

person Sunil    schedule 05.02.2013