Я надеюсь сгруппировать оконные пакеты вывода из KStream и записать их во вторичное хранилище.
Я ожидал, что .punctuate()
будет вызываться примерно каждые 30 секунд. То, что я получил вместо этого, сохранено здесь.
(Исходный файл состоял из нескольких тысяч строк)
Резюме - .punctuate()
вызывается, по-видимому, случайным образом, а затем неоднократно. Похоже, что он не соответствует значению, установленному через ProcessorContext.schedule().
Редактировать:
Другой запуск того же кода производил вызовы .punctuate()
примерно каждые четыре минуты. На этот раз я не видел сумасшедших повторяющихся значений. Никаких изменений в источнике - просто другой результат.
Используя следующий код:
Главный
StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
lines.process(new BPS2());
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
Процессор
public class BP2 extends AbstractProcessor<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);
private ProcessorContext context;
private final long delay;
private final ArrayList<String> values;
public BP2(long delay) {
LOGGER.debug("BatchProcessor() constructor");
this.delay = delay;
values = new ArrayList<>();
}
@Override
public void process(String s, String s2) {
LOGGER.debug("batched processor s:{} s2:{}", s, s2);
values.add(s2);
}
@Override
public void init(ProcessorContext context) {
LOGGER.info("init");
super.init(context);
values.clear();
this.context = context;
context.schedule(delay);
}
@Override
public void punctuate(long timestamp) {
super.punctuate(timestamp);
LOGGER.info("punctuate ts: {} count: {}", timestamp, values.size());
context().commit();
}
}
ПроцессорПоставщик
public class BPS2 implements ProcessorSupplier<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class);
@Override
public Processor<String, String> get() {
try {
return new BP2(30000);
} catch(Exception exception) {
LOGGER.error("Unable to instantiate BatchProcessor()", exception);
throw new RuntimeException();
}
}
}
Редактировать:
Чтобы убедиться, что мой отладчик не замедляет это, я построил его и запустил на том же компьютере, что и мой процесс kafka. На этот раз он даже не пытался отставать на 4 минуты и более - в течение нескольких секунд он выдавал ложные вызовы на .punctuate()
. Многие (большинство) из них без промежуточных вызовов .process()
.