Тестирование маршрута akka-http, который завершается бесконечным потоком

У меня есть маршрут akka-http, который возвращает Source, содержащий бесконечный поток объектов. Как я могу проверить это с помощью набора тестов маршрута? Я хотел бы проверить только первые n элементов потока, но я взглянул на код тестового набора, и похоже, что нет прямого доступа к Source в ответе. Он всегда преобразуется в последовательность ByteString, что в моем случае просто вызывает TimeoutException, поскольку поток никогда не завершается.

Для справки, проблему можно воспроизвести с маршрутом, выглядящим примерно так:

case class Bar(wibble: String, wobble: String)

path("stream") {
  get {
    complete {
      import JsonSupport._
      implicit val streamingSupport = EntityStreamingSupport.json()
      Source.unfold(1) { i =>
        Thread.sleep(10)
        Some((i + 1, Bar(i.toString, (i + 1).toString)))
      }
    }
  }
}

person Andrew Brett    schedule 20.01.2017    source источник
comment
Что такое JsonSupport ??   -  person sarveshseri    schedule 20.01.2017
comment
JsonSupport определяется как: объект JsonSupport расширяет SprayJsonSupport с помощью DefaultJsonProtocol { неявный val barFormat = jsonFormat2 (Bar) }   -  person Andrew Brett    schedule 21.01.2017
comment
comment
Но этот поток не предназначен для чего-то вроде бесконечного потока. HTTP в своем стандарте должен определять тайм-аут для длительных запросов. И таким образом Akka-http делает это. Если вам нужен бесконечный поток, вам следует использовать веб-сокеты.   -  person sarveshseri    schedule 21.01.2017


Ответы (1)


Похоже, что всякий раз, когда вы получаете доступ к ответу и объекту в пределах границ набора тестов, набор тестов будет пытаться извлечь его полностью.

Вы можете проверить частный метод awaitAllElements в RouteTestResultComponent для дальнейшего понимания.

Вместо этого вы можете попробовать просто использовать комбинаторы akka-streams. Я думаю, что это не слишком сильно раздуло бы ваш тестовый код. Пример ниже:

def firstNElements(n: Int) = Source.single(yourRequest)
  .via(RouteResult.route2HandlerFlow(route))
  .flatMapConcat(_.entity.dataBytes)
  .take(n)
  .runWith(Sink.seq)
  .futureValue

// assertions
person Stefano Bonetti    schedule 20.01.2017
comment
К сожалению, это не работает — вызов response блокируется до тех пор, пока не будут получены все элементы, что приводит к исключению TimeoutException для бесконечного потока. - person Andrew Brett; 21.01.2017
comment
вы правы, я не проверил код тестового набора должным образом. Я изменил ответ, предложив что-то другое. - person Stefano Bonetti; 21.01.2017
comment
Это делает трюк - спасибо! Я изменил `RouteResult.route2HandlerFlow' на 'Route.handlerFlow' для краткости, и из-за форматирования ответа (с разделителями в отдельной строке) мне пришлось взять первые 6 элементов, чтобы получить 3 сериализованных объекта (и преобразовать использовал ByteString.utf8String для преобразования во что-то более легко анализируемое), но это именно то, что я искал. - person Andrew Brett; 23.01.2017