Как надежно выполнять большое количество одновременных HTTPS-запросов в Clojure (/Java)

У меня есть поток входных данных, и я хочу сделать 2 HTTPS сетевых запроса для каждого, прежде чем передавать результат в другую часть программы. Типичная пропускная способность составляет 50 в секунду.

for each input:
    HTTP request A
    HTTP request B
    pass event on with (A.body and B.body)

Я использую клиент http-kit, который по умолчанию является асинхронным. Он возвращает обещание, а также может принимать обратный вызов. Http-kit использует Java NIO (см. >здесь и здесь)

Скорость поступающих запросов в сочетании со временем выполнения запроса достаточно высока, поэтому это необходимо делать асинхронно.

Я пробовал 3 подхода:

  1. Когда приходит событие, поместите его на канал. Ряд go подпрограмм отключает канал. Каждый из них делает запросы, которые «блокируют» goblock, derefing промисы из HTTP-запросов. Это не работает, потому что я не думаю, что обещание хорошо работает с потоками.
  2. Когда приходит событие, немедленно запустите future, который «блокирует» асинхронные промисы. Это приводит к очень высокой загрузке ЦП. Плюс голодание сетевых ресурсов как-то.
  3. Когда приходит событие, немедленно запускайте запрос http-kit для запроса A, передавая обратный вызов, который делает запрос B, передавая обратный вызов, который передает событие. Это приводит к ошибке нехватки памяти через несколько часов.

Все они работают и справляются с нагрузкой некоторое время. Все они в конце концов терпят крах. Самый последний сбой, примерно через 12 часов:

Mar 10, 2016 2:05:59 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run
WARNING: com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@1bc8a7f5 -- APPARENT DEADLOCK!!! Creating emergency threads for unassigned pending
 tasks!
Mar 10, 2016 3:38:38 AM com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector run
WARNING: com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@1bc8a7f5 -- APPARENT DEADLOCK!!! Complete Status:
        Managed Threads: 3
        Active Threads: 1
        Active Tasks:
                com.mchange.v2.resourcepool.BasicResourcePool$1DestroyResourceTask@65d8b232 (com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0)
        Pending Tasks:
                com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask@359acb0d
Pool thread stack traces:
        Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0,5,main]
                com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:560)
        Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#1,5,main]
                java.lang.Object.wait(Native Method)
                com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534)
        Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#2,5,main]
                java.lang.Object.wait(Native Method)
                com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:534)


Thu Mar 10 04:38:34 UTC 2016 [client-loop] ERROR - select exception, should not happen
java.lang.OutOfMemoryError: Java heap space
        at java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:77)
        at sun.security.ssl.OutputRecord.<init>(OutputRecord.java:76)
        at sun.security.ssl.EngineOutputRecord.<init>(EngineOutputRecord.java:65)
        at sun.security.ssl.HandshakeOutStream.<init>(HandshakeOutStream.java:63)
        at sun.security.ssl.Handshaker.activate(Handshaker.java:514)
        at sun.security.ssl.SSLEngineImpl.kickstartHandshake(SSLEngineImpl.java:717)
        at sun.security.ssl.SSLEngineImpl.beginHandshake(SSLEngineImpl.java:743)
        at org.httpkit.client.HttpClient.finishConnect(HttpClient.java:310)
        at org.httpkit.client.HttpClient.run(HttpClient.java:375)
        at java.lang.Thread.run(Thread.java:745)
Mar 10, 2016 4:56:34 AM baleen.events invoke
SEVERE: Thread error: Java heap space
java.lang.OutOfMemoryError: Java heap space
Mar 10, 2016 5:00:43 AM baleen.events invoke
SEVERE: Thread error: Java heap space
java.lang.OutOfMemoryError: Java heap space
Mar 10, 2016 4:58:25 AM baleen.events invoke
SEVERE: Thread error: Java heap space
java.lang.OutOfMemoryError: Java heap space

Я не знаю, в чем причина неудачи. Это может быть связано со слишком большим количеством замыканий, постепенной утечкой ресурсов или голоданием потока.

Вопросы

  1. Выполнение 50 HTTP-запросов в секунду, каждый из которых может занимать 200 мс, а это означает, что в любой момент времени может выполняться 100 запросов, звучит как чрезмерное бремя?

  2. Как мне сделать это таким образом, чтобы обрабатывать пропускную способность и быть надежным?

ИЗМЕНИТЬ

Профилировщик YourKit сообщает мне, что у меня есть около 2 ГБ char[]s через org.httpkit.client.Handlers через java.util.concurrent.FutureTasks, что говорит о том, что ссылки на старые обработчики (то есть запросы) каким-то образом сохраняются. Вся причина попытки использовать обратные вызовы заключалась в том, чтобы избежать этого (хотя они могли каким-то образом попасть в замыкания)


person Joe    schedule 10.03.2016    source источник
comment
OutOfMemoryError указывает на проблему с сохранением памяти... но мы не можем помочь, не увидев ваш код или не написав полное решение с нуля. Я бы хотел удержать голову бесконечной последовательности или не очищать ресурсы, такие как соединения.   -  person Timothy Pratley    schedule 10.03.2016
comment
Я задавался вопросом, могут ли это быть сохраненные буферы, но, насколько я могу судить, сборка мусора должна обрабатывать освобождение памяти/внешних буферов, которые, например. НИО выделил. То, что происходит ниже по течению, — это просто вставка в базу данных и вставка в канал.   -  person Joe    schedule 10.03.2016
comment
Я думал опубликовать код, но это довольно сложно, и потребуется около дня, чтобы узнать, воспроизвел ли я проблему в упрощенной версии.   -  person Joe    schedule 10.03.2016
comment
Может ли быть так, что события недостаточно быстро сливаются в приемнике? Одна вещь, которую вы можете сделать, это проанализировать дамп кучи, чтобы увидеть, где занимает память.   -  person Timothy Pratley    schedule 10.03.2016
comment
Я ковырялся с YourKit, смотрите правку. Все еще пытаюсь понять, как сохраняются ссылки.   -  person Joe    schedule 10.03.2016
comment
Вы уверены, что это не живые связи? Возможно, вы захотите зарегистрировать количество активных соединений в любой момент времени?   -  person Timothy Pratley    schedule 10.03.2016
comment
В предыдущем тестировании я поставил количество запросов в полете на низкое число (10 или около того), и, похоже, оно остается стабильным. Между тем, куча медленно ползет вверх (и не будет принудительно загружена). Я не профилировал его в течение длительного периода времени.   -  person Joe    schedule 10.03.2016
comment
А ок круто. Если есть объекты подключения, рассмотрите возможность использования with-open вокруг них, чтобы убедиться, что они закрыты после того, как вы закончите с ними.   -  person Timothy Pratley    schedule 10.03.2016
comment
@Joe Вы уверены, что не сохраняете ссылки на свои функции обработчика? Являются ли FutureTask единственными путями к корням GC? Если да, то что хранит ссылки на эти FutureTask? И вообще: пробовали ли вы http-kit.org/client.html#combined? Является ли утечка памяти единственной причиной, по которой такое решение не работает для вас?   -  person Piotrek Bzdyl    schedule 10.03.2016
comment
Спасибо @PiotrekBzdyl. Я использовал метод, на который вы ссылались, в моем подходе № 1 и № 2. Я думаю, что должен каким-то образом держаться за ссылки, но я не понимаю, как это сделать. Я работаю над простой репродукцией.   -  person Joe    schedule 11.03.2016


Ответы (2)


  1. Выполнение 50 HTTP-запросов в секунду, каждый из которых может занимать 200 мс, а это означает, что в любой момент времени может выполняться 100 запросов, звучит как чрезмерное бремя?

Это определенно не избыточно для современного оборудования.

  1. Как мне сделать это таким образом, чтобы обрабатывать пропускную способность и быть надежным?

Для этого вы можете комбинировать конвейеры core.async и обратные вызовы http-kit. На самом деле вам не нужно создавать подпрограмму go для каждого запроса (хотя это не повредит), потому что вы можете использовать асинхронную put! из обратного вызова http-kit.

Используйте ограниченные буферы для каждого шага конвейера, чтобы ограничить количество активных подключений, которые будут (как минимум) ограничены количеством эфемерных портов TCP, доступных в вашей системе.

Вот пример небольшой программы, которая делает что-то похожее на то, что вы описали. Он считывает «события» из канала, в данном случае каждое событие имеет идентификатор «1» и ищет эти идентификаторы в службе HTTP. Он берет ответ от этого первого вызова, ищет ключ JSON "next" и ставит его в очередь в качестве URL-адреса для шага 2. Наконец, когда этот поиск завершен, он добавляет событие в канал out, который отслеживает подпрограмма go для отчета о статистике.

(ns concur-req.core
  (require [clojure.core.async :as async]
           [cheshire.core :refer [decode]]
           [org.httpkit.client :as http]))

(defn url-of
  [id]
  ;; this service responds within 100-200ms
  (str "http://localhost:28080/" id ".json"))

(defn retrieve-json-async
  [url c]
  (http/get url nil
            (fn [{body :body status :status :as resp}]
              (if (= 200 status)
                (async/put! c (decode body true))
                (println "ERROR:" resp))
              (async/close! c))))

(defn run [parallelism stop-chan]
  (let [;; allocate half of the parallelism to each step
        step1-n    (int (max (/ parallelism 2) 1))
        step2-n    step1-n
        ;; buffer to take ids, transform them into urls
        step1-chan (async/chan step1-n (map url-of))
        ;; buffer for result of pulling urls from step1, xform by extracting :next url
        step2-chan (async/chan step2-n (map :next))
        ;; buffer to count completed results
        out-chan   (async/chan 1 (map (constantly 1)))
        ;; for delivering the final result
        final-chan (async/chan)
        start-time (System/currentTimeMillis)]

    ;; process URLs from step1 and put the result in step2
    (async/pipeline-async step1-n step2-chan retrieve-json-async step1-chan)
    ;; process URLs from step2 and put the result in out
    (async/pipeline-async step2-n out-chan retrieve-json-async step2-chan)

    ;; keep the input channel full until stop-chan is closed.
    (async/go-loop []
      (let [[v c] (async/alts! [stop-chan [step1-chan "1"]])]
        (if (= c stop-chan)
          (async/close! step1-chan)
          (recur))))

    ;; count messages on out-chan until the pipeline is closed, printing
    ;; status message every second
    (async/go-loop [status-timer (async/timeout 1000) subt 0 accu 0]
      (let [[v c] (async/alts! [status-timer out-chan])]
        (cond (= c status-timer)
              (do (println subt "records...")
                  (recur (async/timeout 1000) 0 (+ subt accu)))

              (nil? v)
              (async/>! final-chan (+ subt accu))

              :else
              (recur status-timer (+ v subt) accu))))

    ;; block until done, then emit final report.
    (let [final-total (async/<!! final-chan)
          elapsed-ms  (- (System/currentTimeMillis) start-time)
          elapsed-s   (/ elapsed-ms 1000.0)]
      (print (format "Processed %d records with parallelism %d in %.3f seconds (%d/sec)\n"
                     final-total parallelism elapsed-s
                     (int (/ final-total elapsed-s)))))))

(defn run-for
  [seconds parallelism]
  (let [stop-chan (async/chan)]
    (future
      (Thread/sleep (* seconds 1000))
      (async/close! stop-chan))
    (run parallelism stop-chan)))

(do
  ;; Warm up the connection pool, avoid somaxconn problems...
  (doseq [p (map #(* 20 (inc %)) (range 25))]
    (run-for 1 p))
  (run-for (* 60 60 6) 500))

Чтобы проверить это, я настроил службу HTTP, которая отвечает только после того, как засыпает в случайное время между 100-200 мс. Затем я запускал эту программу в течение 6 часов на своем Macbook Pro.

При параллелизме, установленном на 500, я получил в среднем 1155 завершенных транзакций в секунду (2310 завершенных HTTP-запросов в секунду). Я уверен, что это могло бы быть намного выше с некоторой настройкой (и особенно с переносом службы HTTP на другую машину). Память JVM увеличилась до 1,5 ГБ в течение первых 30 минут, а затем сохранила этот размер. Я использую 64-битную JVM Oracle 1.8.

person KingPong    schedule 09.10.2017

Альтернативой вашему методу A (derefing HTTP-kit, возвращенные фьючерсы внутри блока go) может быть возможность, просто делайте это таким образом, чтобы не блокировать потоки обработчика core.async в будущем, что вы можете сделать, объединив обратные вызовы httpkit и core.async:

(defn handle-event
 "Return a core.async channel that will contain the result of making both HTTP call A and B."
  [event-data]
  (let [event-a-chan (clojure.core.async/chan)
        event-b-chan (clojure.core.async/chan)
        return-chan (clojure.core.async/chan)]
    (org.httpkit.client/request "https://event-a-call"
                                {:method :get :params {"param1-k" "param1-v"}}
                                (fn [resp]
                                  (clojure.core.async/put! event-a-chan resp)))
    (org.httpkit.client/request "https://event-b-call"
                                {:method :get :params {"param1-k" "param1-v"}}
                                (fn [resp]
                                  (clojure.core.async/put! event-b-chan resp)))
    (clojure.core.async/go
      (clojure.core.async/>! return-chan {:event-a-response (clojure.core.async/<! event-a-chan)
                                          :event-b-response (clojure.core.async/<! event-b-chan)}))
    return-chan))
person tanzoniteblack    schedule 17.03.2016