В настоящее время я пишу модуль прокси-сервера nginx с очередью запросов впереди, поэтому запросы не отбрасываются, когда серверы за nginx не могут обрабатывать запросы (nginx настроен как балансировщик нагрузки).
я использую
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
Идея состоит в том, чтобы поставить запрос в очередь перед их обработкой. Я знаю, что multiprocessing.Queue поддерживает только простой объект и не может поддерживать необработанные сокеты, поэтому я попытался использовать multiprocess.Manager для создания общего словаря. Диспетчер также использует сокеты для подключения, так что этот способ тоже не удался. Есть ли способ разделить сетевые сокеты между процессами? Вот проблемная часть кода:
class ProxyServer(Threader, HTTPServer):
def __init__(self, server_address, bind_and_activate=True):
HTTPServer.__init__(self, server_address, ProxyHandler,
bind_and_activate)
self.manager = multiprocessing.Manager()
self.conn_dict = self.manager.dict()
self.ticket_queue = multiprocessing.Queue(maxsize= 10)
self._processes = []
self.add_worker(5)
def process_request(self, request, client):
stamp = time.time()
print "We are processing"
self.conn_dict[stamp] = (request, client) # the program crashes here
#Exception happened during processing of request from ('172.28.192.34', 49294)
#Traceback (most recent call last):
# File "/usr/lib64/python2.6/SocketServer.py", line 281, in _handle_request_noblock
# self.process_request(request, client_address)
# File "./nxproxy.py", line 157, in process_request
# self.conn_dict[stamp] = (request, client)
# File "<string>", line 2, in __setitem__
# File "/usr/lib64/python2.6/multiprocessing/managers.py", line 725, in _callmethod
# conn.send((self._id, methodname, args, kwds))
#TypeError: expected string or Unicode object, NoneType found
self.ticket_queue.put(stamp)
def add_worker(self, number_of_workers):
for worker in range(number_of_workers):
print "Starting worker %d" % worker
proc = multiprocessing.Process(target=self._worker, args = (self.conn_dict,))
self._processes.append(proc)
proc.start()
def _worker(self, conn_dict):
while 1:
ticket = self.ticket_queue.get()
print conn_dict
a=0
while a==0:
try:
request, client = conn_dict[ticket]
a=1
except Exception:
pass
print "We are threading!"
self.threader(request, client)