Создать исходный код из подписчика тем Lagom / Akka Kafka для Websocket

Я хочу, чтобы моя служба только для подписчиков Lagom подписывалась на тему Kafka и передавала сообщения в веб-сокет. У меня есть следующая служба, определенная в этой документации (https://www.lagomframework.com/documentation/1.4.x/scala/MessageBrokerApi.html#Subscribe-to-a-topic) в качестве руководства:

    // service call
    def stream(): ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]

    // service implementation
    override def stream() = ServiceCall { req =>
      req.runForeach(str => log.info(s"client: %str"))
      kafkaTopic().subscribe.atLeastOnce(Flow.fromFunction(
        // add message to a Source and return Done
      ))
      Future.successful(//some Source[String, NotUsed])

Однако я не совсем понимаю, как обрабатывать мое сообщение кафки. Flow.fromFunction возвращает [String, Done, _] и подразумевает, что мне нужно добавить эти сообщения (строки) в Источник, который был создан вне подписчика.

Итак, у меня двоякий вопрос: 1) Как мне создать источник потока akka для получения сообщений от подписчика темы kafka во время выполнения? 2) Как мне добавить сообщения kafka к указанному источнику, находясь в потоке?


person bgao3    schedule 12.09.2018    source источник
comment
req.runForeach(str => log.info(s"client: %str")) исчерпает весь ваш входной поток.   -  person erip    schedule 14.09.2018


Ответы (1)


Похоже, вы неправильно понимаете сервисный API Lagom. Если вы пытаетесь материализовать поток из тела вызова службы, вход в ваш вызов отсутствует; т.е.

def stream(): ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]

означает, что когда клиент предоставляет Source[String, NotUsed], служба ответит тем же. Ваш клиент не предоставляет это напрямую; поэтому ваша подпись, скорее всего, должна быть

def stream(): ServiceCall[NotUsed, Source[String, NotUsed]]

Теперь к вашему вопросу ...

На самом деле этого нет в шаблоне scala giter8, но версия java содержит то, что они называют автономный поток, который делает примерно то, что Вы хотите сделать.

В Scala этот код выглядел бы примерно так ...

override def autonomousStream(): ServiceCall[
  Source[String, NotUsed], 
  Source[String, NotUsed]
] = ServiceCall { hellos => Future {
    hellos.mapAsync(8, ...)
  }
}

Поскольку ваш вызов сопоставляется не с потоком input, а с темой kafka, вам нужно сделать что-то вроде этого:

override def stream(): ServiceCall[NotUsed, Source[String, NotUsed]] = ServiceCall { 
  _ => 
    Future {
      kafkaTopic()
        .subscribe
        .atMostOnce
        .mapAsync(...)
    }
}
person erip    schedule 14.09.2018