Я хочу, чтобы моя служба только для подписчиков Lagom подписывалась на тему Kafka и передавала сообщения в веб-сокет. У меня есть следующая служба, определенная в этой документации (https://www.lagomframework.com/documentation/1.4.x/scala/MessageBrokerApi.html#Subscribe-to-a-topic) в качестве руководства:
// service call
def stream(): ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]
// service implementation
override def stream() = ServiceCall { req =>
req.runForeach(str => log.info(s"client: %str"))
kafkaTopic().subscribe.atLeastOnce(Flow.fromFunction(
// add message to a Source and return Done
))
Future.successful(//some Source[String, NotUsed])
Однако я не совсем понимаю, как обрабатывать мое сообщение кафки. Flow.fromFunction
возвращает [String, Done, _]
и подразумевает, что мне нужно добавить эти сообщения (строки) в Источник, который был создан вне подписчика.
Итак, у меня двоякий вопрос: 1) Как мне создать источник потока akka для получения сообщений от подписчика темы kafka во время выполнения? 2) Как мне добавить сообщения kafka к указанному источнику, находясь в потоке?
req.runForeach(str => log.info(s"client: %str"))
исчерпает весь ваш входной поток. - person erip   schedule 14.09.2018