У меня есть кластер kafka, из которого я использую две темы, и присоединяюсь к нему. В результате объединения я делаю некоторые манипуляции с базой данных. Все операции с БД асинхронны, поэтому они возвращают мне Future (scala.concurrent.Future, но в любом случае это то же самое, что и java.util.concurrent.CompletableFuture). В итоге получил такой код:
val firstSource: KTable[String, Obj]
val secondSource: KTable[String, Obj2]
def enrich(data: ObjAndObj2): Future[EnrichedObj]
def saveResultToStorage(enrichedData: Future[EnrichedObj]): Future[Unit]
firstSource.leftJoin(secondSource, joinFunc)
.mapValues(enrich)
.foreach(saveResultToStorage)
Можно ли манипулировать будущими значениями в потоке или есть лучшие способы обработки асинхронных задач (например, .mapAsync в потоках Akka)?