Запросы Python - временное исчерпание порта

Могу ли я что-нибудь сделать с приведенным ниже кодом (я думал, что сеансы решат эту проблему?), Чтобы предотвратить создание новых TCP-соединений с каждым запросом GET? Я выполняю около 1000 запросов в секунду и после того, как около 10000 запросов заканчиваются сокетами:

def ReqOsrm(url_input):
    ul, qid = url_input
    conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=1)
    try:
        response = conn_pool.request('GET', ul)
        json_geocode = json.loads(response.data.decode('utf-8'))
        status = int(json_geocode['status'])
        if status == 200:
            tot_time_s = json_geocode['route_summary']['total_time']
            tot_dist_m = json_geocode['route_summary']['total_distance']
            used_from, used_to = json_geocode['via_points']
            out = [qid, status, tot_time_s, tot_dist_m, used_from[0], used_from[1], used_to[0], used_to[1]]
            return out
        else:
            print("Done but no route: %d %s" % (qid, req_url))
            return [qid, 999, 0, 0, 0, 0, 0, 0]
    except Exception as err:
        print("%s: %d %s" % (err, qid, req_url))
        return [qid, 999, 0, 0, 0, 0, 0, 0]

# run:
pool = Pool(int(cpu_count()))
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()

HTTPConnectionPool (host = '127.0.0.1', port = 5005): превышено максимальное количество повторных попыток с URL: /viaroute?loc=44.779708,4.2609877&loc=44.648439,4.2811959&alt=false&geometry=false (вызвано NewConnectionError (': не удалось установить новое соединение: [WinError 10048] Обычно разрешено только одно использование каждого адреса сокета (протокол / сетевой адрес / порт) ',))


Эрик - большое спасибо за ответ. Думаю, это именно то, что мне нужно. Однако я не могу его правильно изменить. Код правильно возвращает 10000 ответов для первых нескольких циклов, однако затем он, кажется, ломается и возвращает менее 10000, что наводит меня на мысль, что я неправильно реализовал очередь?

введите описание изображения здесь

ghost = 'localhost'
gport = 8989

def CreateUrls(routes, ghost, gport):
    return [
        ["http://{0}:{1}/route?point={2}%2C{3}&point={4}%2C{5}&vehicle=car&calc_points=false&instructions=false".format(
            ghost, gport, alat, alon, blat, blon),
            qid] for qid, alat, alon, blat, blon in routes]


def LoadRouteCSV(csv_loc):
    if not os.path.isfile(csv_loc):
        raise Exception("Could not find CSV with addresses at: %s" % csv_loc)
    else:
        return pd.read_csv(csv_loc, sep=',', header=None, iterator=True, chunksize=1000 * 10)

class Worker(Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self._qin = qin
        self._qout = qout

    def run(self):
        # Create threadsafe connection pool
        conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=10)

        class Consumer(threading.Thread):
            def __init__(self, qin, qout):
                threading.Thread.__init__(self)
                self.__qin = qin
                self.__qout = qout

            def run(self):
                while True:
                    msg = self.__qin.get()
                    ul, qid = msg
                    try:
                        response = conn_pool.request('GET', ul)
                        s = float(response.status)
                        if s == 200:
                            json_geocode = json.loads(response.data.decode('utf-8'))
                            tot_time_s = json_geocode['paths'][0]['time']
                            tot_dist_m = json_geocode['paths'][0]['distance']
                            out = [qid, s, tot_time_s, tot_dist_m]
                        elif s == 400:
                            print("Done but no route for row: ", qid)
                            out = [qid, 999, 0, 0]
                        else:
                            print("Done but unknown error for: ", s)
                            out = [qid, 999, 0, 0]
                    except Exception as err:
                        print(err)
                        out = [qid, 999, 0, 0]
                    self.__qout.put(out)
                    self.__qin.task_done()

        num_threads = 10
        [Consumer(self._qin, self._qout).start() for _ in range(num_threads)]

if __name__ == '__main__':
    try:
        with open(os.path.join(directory_loc, 'gh_output.csv'), 'w') as outfile:
            wr = csv.writer(outfile, delimiter=',', lineterminator='\n')
            for x in LoadRouteCSV(csv_loc=os.path.join(directory_loc, 'gh_input.csv')):
                routes = x.values.tolist()
                url_routes = CreateUrls(routes, ghost, gport)
                del routes

                stime = time.time()

                qout = Queue()
                qin = JoinableQueue()
                [qin.put(url_q) for url_q in url_routes]
                [Worker(qin, qout).start() for _ in range(cpu_count())]
                # Block until all urls in qin are processed
                qin.join()
                calc_routes = []
                while not qout.empty():
                    calc_routes.append(qout.get())

                # Time diagnostics
                dur = time.time() - stime
                print("Calculated %d distances in %.2f seconds: %.0f per second" % (len(calc_routes),
                                                                                    dur,
                                                                                    len(calc_routes) / dur))
                del url_routes
                wr.writerows(calc_routes)
                done_count += len(calc_routes)
                # Continually update progress in terms of millions
                print("Saved %d calculations" % done_count)

person mptevsion    schedule 04.03.2016    source источник
comment
Привет, ваша проблема описана здесь: Вы перегружаете стек TCP / IP ...;)   -  person kamy22    schedule 04.03.2016
comment
Мне кажется, проблема в том, что вы создаете новый пул соединений для каждого запрашиваемого URL. Почему бы не создать пул соединений для каждого процесса и не использовать соединения повторно?   -  person Eric Conner    schedule 04.03.2016
comment
@EricConner - это так же просто, как создать глобальный conn_pool = HTTPConnectionPool(host='localhost', port=5000, maxsize=int(cpu_count())) и просто сослаться на него в функции response = conn_pool.request('GET', req_url), которая вызывается calc_routes = pool.map(ReqOsrm, url_routes)? Причина, по которой я спрашиваю, заключается в том, что для одного сервера (Graphhopper) это работает очень хорошо (и скорость увеличивается в 3 раза), однако, если я изменяю порт сервера на сервер OSRM, он становится очень медленным по сравнению с предыдущим (в 100 раз медленнее). Может быть, некоторые серверы не разрешают keep-alive / sessions?   -  person mptevsion    schedule 04.03.2016


Ответы (2)


Я думал что-то вроде этого. Идея состоит в том, чтобы создать процесс для каждого ядра и пул потоков для каждого процесса. У каждого процесса есть отдельный пул соединений, который совместно используется потоками этого процесса. Я не думаю, что вы можете получить более производительное решение без каких-либо потоков.

from multiprocessing import Pool, cpu_count
import Queue

from urllib3 import HTTPConnectionPool
import threading


def ReqOsrm(url_input):
    # Create threadsafe connection pool
    conn_pool = HTTPConnectionPool(host='127.0.0.1', port=5005, maxsize=1000)

    # Create consumer thread class
    class Consumer(threading.Thread):
        def __init__(self, queue):
            threading.Thread.__init__(self)
            self._queue = queue

        def run(self):
          while True:
              msg = self._queue.get()
              try:
                response = conn_pool.request('GET', url)
                print response
              except Exception as err:
                print err
              self._queue.task_done()

    # Create work queue and a pool of workers
    queue = Queue.Queue()
    num_threads = 20
    workers = []
    for _ in xrange(num_threads):
        worker = Consumer(queue)
        worker.start()
        workers.append(worker)

    for url in url_input:
        queue.put(url)

    queue.join()

url_routes = [
    ["/proc1-0", "/proc1-1"],
    ["/proc2-0", "/proc2-1"],
    ["/proc3-0", "/proc3-1"],
    ["/proc4-0", "/proc4-1"],
    ["/proc5-0", "/proc5-1"],
    ["/proc6-0", "/proc6-1"],
    ["/proc7-0", "/proc7-1"],
    ["/proc8-0", "/proc8-1"],
    ["/proc9-0", "/proc9-1"],
]

pool = Pool(int(cpu_count()))
calc_routes = pool.map(ReqOsrm, url_routes)
pool.close()
pool.join()
person Eric Conner    schedule 05.03.2016
comment
Благодарность! Думаю, это почти то, что мне нужно. У меня небольшая проблема с очередью (для фиксации результата), которую я отредактировал в конце своего сообщения. Был бы признателен, если бы вы помогли с этим - person mptevsion; 06.03.2016
comment
По этому фрагменту сложно сказать, что происходит не так. Вы сказали, что он рассчитывает дважды? Означает ли это, что вы в конечном итоге обращаетесь к одним и тем же URL несколько раз? Как вы разделяете URL-адреса между процессами? Может выложить более полную версию скрипта? - person Eric Conner; 06.03.2016
comment
извиняюсь, но я неправильно описал проблему. Кажется, я пропускаю результаты. Я опубликовал почти полный сценарий в моем отредактированном сообщении. Еще раз спасибо - немного расстраивает, поскольку я надеюсь, что я близок к тому, чтобы разобраться с этим :) - person mptevsion; 06.03.2016
comment
Ах, глупый я! Мне нужно было добавить этот self._qin.join() в мой класс процесса (а также иметь qin.join() в моем основном коде. Я почти уверен? По крайней мере, это, похоже, решило эту проблему. - person mptevsion; 06.03.2016

Цените помощь - мое рабочее решение:

class Worker(Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self._qin = qin
        self._qout = qout

    def run(self):
        # Create threads to run in process
        class Consumer(threading.Thread):
            def __init__(self, qin, qout):
                threading.Thread.__init__(self)
                self.__qin = qin
                self.__qout = qout

            def run(self):
                # Close once queue empty (otherwise process will linger)
                while not self.__qin.empty():
                    msg = self.__qin.get()
                    ul, qid = msg
                    try:
                        response = conn_pool.request('GET', ul)
                        s = float(response.status)
                        if s == 200:
                            json_geocode = json.loads(response.data.decode('utf-8'))
                            tot_time_s = json_geocode['paths'][0]['time']
                            tot_dist_m = json_geocode['paths'][0]['distance']
                            out = [qid, s, tot_time_s, tot_dist_m]
                        elif s == 400:
                            #print("Done but no route for row: ", qid)
                            out = [qid, 999, 0, 0]
                        else:
                            print("Done but unknown error for: ", s)
                            out = [qid, 999, 0, 0]
                    except Exception as err:
                        print(err)
                        out = [qid, 999, 0, 0]
                    #print(out)
                    self.__qout.put(out)
                    self.__qin.task_done()

        # Create thread-safe connection pool
        concurrent = 10
        conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=concurrent)
        num_threads = concurrent
        # Start threads (concurrent) per process
        [Consumer(self._qin, self._qout).start() for _ in range(num_threads)]
        # Block until all urls in self._qin are processed
        self._qin.join()
        return

if __name__ == '__main__':
   # Fill queue input
   qin = JoinableQueue()
   [qin.put(url_q) for url_q in url_routes]
   # Queue to collect output
   qout = Queue()
   # Start cpu_count number of processes (which will launch threads and sessions)
   workers = []
   for _ in range(cpu_count()):
       workers.append(Worker(qin, qout))
       workers[-1].start()
   # Block until all urls in qin are processed
   qin.join()
   # Fill routes
   calc_routes = []
   while not qout.empty():
       calc_routes.append(qout.get())
   del qin, qout
person mptevsion    schedule 07.03.2016