У меня есть поток объектов JSON, которые я ввожу в хэш из нескольких значений. Я надеюсь считать по ключу с n-секундными (10? 60?) интервалами и использовать эти значения для анализа шаблонов.
Моя топология: K->aggregateByKey(n seconds)->process()
На шаге process - init()
я позвонил ProcessorContent.schedule(60 * 1000L)
в надежде, что будет вызван .punctuate()
. Отсюда я бы прокручивал значения во внутреннем хеше и действовал соответственно.
Я вижу, что значения проходят этап агрегации и попадают в функцию process()
, но .punctuate()
никогда не вызывается.
Код:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> opxLines = kStreamBuilder.stream(TOPIC);
KStream<String, String> mapped = opxLines.map(new ReMapper());
KTable<Windowed<String>, String> ktRtDetail = mapped.aggregateByKey(
new AggregateInit(),
new OpxAggregate(),
TimeWindows.of("opx_aggregate", 60000));
ktRtDetail.toStream().process(new ProcessorSupplier<Windowed<String>, String>() {
@Override
public Processor<Windowed<String>, String> get() {
return new AggProcessor();
}
});
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
AggregateInit() возвращает значение null.
Думаю, я могу сделать эквивалент .punctuate()
с помощью простого таймера, но я хотел бы знать, почему этот код не работает так, как я надеюсь.