kafka KStream - топология для подсчета за n секунд

У меня есть поток объектов JSON, которые я ввожу в хэш из нескольких значений. Я надеюсь считать по ключу с n-секундными (10? 60?) интервалами и использовать эти значения для анализа шаблонов.

Моя топология: K->aggregateByKey(n seconds)->process()

На шаге process - init() я позвонил ProcessorContent.schedule(60 * 1000L) в надежде, что будет вызван .punctuate(). Отсюда я бы прокручивал значения во внутреннем хеше и действовал соответственно.

Я вижу, что значения проходят этап агрегации и попадают в функцию process(), но .punctuate() никогда не вызывается.


Код:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> opxLines = kStreamBuilder.stream(TOPIC);

KStream<String, String> mapped = opxLines.map(new ReMapper());

KTable<Windowed<String>, String> ktRtDetail = mapped.aggregateByKey(
            new AggregateInit(),
            new OpxAggregate(),
            TimeWindows.of("opx_aggregate", 60000));

ktRtDetail.toStream().process(new ProcessorSupplier<Windowed<String>, String>() {
                            @Override
                            public Processor<Windowed<String>, String> get() {
                                 return new AggProcessor();
                            }
                       });
    
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

AggregateInit() возвращает значение null.

Думаю, я могу сделать эквивалент .punctuate() с помощью простого таймера, но я хотел бы знать, почему этот код не работает так, как я надеюсь.


person ethrbunny    schedule 14.09.2016    source источник
comment
Какова ваша семантика времени (время обработки события?) См. Также stackoverflow.com/questions/39251997/ о внутренних элементах препинания.   -  person Matthias J. Sax    schedule 16.09.2016
comment
Это изучено немного дальше здесь   -  person ethrbunny    schedule 16.09.2016


Ответы (1)


Я думаю, что это связано с неправильной настройкой кластера kafka. После изменения количества файловых дескрипторов на гораздо более высокое значение, чем значение по умолчанию (1024 -> 65535), похоже, это работает в соответствии со спецификацией.

person ethrbunny    schedule 20.09.2016