Как использовать фьючерсы с Kafka Streams

У меня есть кластер kafka, из которого я использую две темы, и присоединяюсь к нему. В результате объединения я делаю некоторые манипуляции с базой данных. Все операции с БД асинхронны, поэтому они возвращают мне Future (scala.concurrent.Future, но в любом случае это то же самое, что и java.util.concurrent.CompletableFuture). В итоге получил такой код:

val firstSource: KTable[String, Obj]
val secondSource: KTable[String, Obj2]

def enrich(data: ObjAndObj2): Future[EnrichedObj]
def saveResultToStorage(enrichedData: Future[EnrichedObj]): Future[Unit]

firstSource.leftJoin(secondSource, joinFunc)
           .mapValues(enrich)
           .foreach(saveResultToStorage)

Можно ли манипулировать будущими значениями в потоке или есть лучшие способы обработки асинхронных задач (например, .mapAsync в потоках Akka)?


person Arthur Kushka    schedule 15.02.2017    source источник
comment
Возможный дубликат запросов внешней системы во время обработки Kafka Stream   -  person Matthias J. Sax    schedule 15.02.2017


Ответы (1)


У меня такая же проблема. Из того, что я могу сказать, Kafka Streams не предназначен для обработки многоскоростной потоковой передачи так же, как Akka Streams. Kafka Streams не имеет эквивалента многоскоростных примитивов, которые есть в Akka, таких как mapAsync, дроссель, объединение, буфер, пакет и т. д. Kafka Streams хорошо справляется с соединениями между темами и агрегацией данных с отслеживанием состояния. Akka Streams хорошо справляется с многоскоростной и асинхронной обработкой.

У вас есть несколько вариантов, как справиться с этим:

  • Сделайте блокирующий вызов в приложении Kafka Streams. Это самый простой способ, и он подходит, если пропускная способность ваших вызовов Future ненамного превышает их задержку. Kafka Streams использует отдельные потоки для каждого раздела, поэтому вы можете использовать разделение обрабатываемых тем Kafka для обеспечения параллелизма.
  • Обработайте обогащение в Akka Streams с помощью библиотеки Reactive Kafka, опубликуйте обогащенный результат в другой теме Kafka, которая затем вы вносите в свое приложение Kafka Streams. Это то, что мы делаем для случаев, когда асинхронный вызов имеет гораздо более высокую параллельную пропускную способность, чем сквозная задержка, такая как вызов веб-службы или запрос к базе данных NoSQL.
  • Публикуйте все свои дополнительные данные в собственной KTable и присоединяйтесь к ним в приложении Kafka Streams. На самом деле, объединение потоковых данных с данными обогащения через KTables — это то, в чем хороша Kafka Streams. Мы используем это, если данные обогащения могут быть представлены в виде таблицы. Это не работает, если данные обогащения должны быть вычислены на лету.
person Charles Crain    schedule 17.10.2017