Запись в тему из процессора в приложении Spring Cloud Streams Kafka Stream

Я использую Processor API для обработки данных на низком уровне в хранилище состояний. Дело в том, что мне тоже нужно писать в тему после сохранения в магазин. Как это можно сделать в приложениях Spring Cloud Streams Kafka?

@Bean
fun processEvent() = Consumer<KStream<EventId, EventValue>> { event ->

    event.map{
        ...
    }.process(ProcessorSupplier {

            object : Processor<EventId, MappedEventValue> {

                private lateinit var store: KeyValueStore<EventId, MappedEventValue>

                override fun init(context: ProcessorContext) {
                    store = context.getStateStore("event-store") as KeyValueStore<EventId, MappedEventValue>
                }

                override fun process(key: EventId, value: MappedEventValue) {
                    ...
                    store.put(key, processedMappedEventValue)

                    //TODO Write into a topic
                }
            }
    }
}  



Ответы (1)


Вы не можете. Метод process() - это терминальная операция, которая не позволяет передавать данные в нисходящий поток. Вместо этого вы можете использовать transform() (это в основном то же самое, но process(), но позволяет передавать данные ниже по течению); или, в зависимости от вашего приложения, transformValues() или flatTransform() и т. д.

Используя transform(), вы получаете обратно KStream, которое вы можете написать в теме.

person Matthias J. Sax    schedule 02.05.2020
comment
Как указано в ответе, использование transform позволяет получить KStream, который вы можете отправить в тему. В этом случае вы можете использовать Function в качестве возвращаемого типа. Если у вас есть вариант использования, который должен использовать метод process(), то я думаю, вы все равно можете отправить в тему, используя KafkaTemplate напрямую из Spring Kafka или используя StreamBrdige API из Spring Cloud Stream. Но использование трансформатора, как предлагается, намного лучше, ИМО. - person sobychacko; 03.05.2020