Akka Stream + Akka Http - получить запрос при ошибке

У меня очень хорошо работает следующий поток:

source
  .map(x => HttpRequest(uri = x.rawRequest))
  .via(Http().outgoingConnection(host, port))
  .to(Sink.actorRef(myActor, IsDone))
  .run()

и простой субъект для обработки статуса ответа и финального сообщения по завершении потока:

/**
  * A simple actor to count how many rows have been processed
  * in the complete process given a http status
  *
  * It also finish the main thread upon a message of type [[IsDone]] is received
  */
class MyActor extends Actor with ActorLogging {

  var totalProcessed = 0

  def receive = LoggingReceive {

    case response: HttpResponse =>

      if(response.status.isSuccess()) {
        totalProcessed = totalProcessed + 1
      } else if(response.status.isFailure()) {
        log.error(s"Http response error: ${response.status.intValue()} - ${response.status.reason()}")
      } else {
        log.error(s"Error: ${response.status.intValue()} - ${response.status.reason()}")
      }

    case IsDone =>
      println(s"total processed: $totalProcessed")
      sys.exit()
  }
}

case object IsDone

Я не знаю, лучший ли это подход для подсчета вещей, а также для обработки статуса ответа, но пока он работает.

Вопрос в том, как передать исходный запрос актеру, чтобы я мог знать, какой запрос вызвал конкретную ошибку.

Вместо этого мой актер мог ожидать следующего:

case (request: String, response: HttpResponse) =>

Но как передать ту информацию, которая есть у меня в начале моего конвейера?

Я думал о map вот так:

source
  .map(x => (HttpRequest(uri = x.rawRequest), x.rawRequest))

Но я понятия не имею, как запустить поток Http.

Любое предложение?


person Thiago Pereira    schedule 21.04.2016    source источник
comment
Попробуйте использовать пул соединений хоста вместо того, чтобы явно открывать исходящее соединение для каждого запроса. Эта модель требует произвольного идентификатора для каждого запроса, который затем возвращается в ответ, чтобы вы могли правильно соотнести запрос и ответ.   -  person cmbaxter    schedule 21.04.2016
comment
Привет, @cmbaxter, ты хочешь использовать этот пример? http://doc.akka.io/docs/akka/2.4.4/scala/http/client-side/host-level.html#Example, но вместо этого со строкой?   -  person Thiago Pereira    schedule 22.04.2016


Ответы (1)


С помощью @cmbaxter я мог решить свою проблему, используя следующий фрагмент кода:

val poolClientFlow = Http().cachedHostConnectionPool[String](host, port)

source
  .map(url => HttpRequest(uri = url) -> url)
  .via(poolClientFlow)
  .to(Sink.actorRef(myActor, IsDone))
  .run()

Теперь мой актер может получить это:

case (respTry: Try[HttpResponse], request: String) =>
person Thiago Pereira    schedule 22.04.2016