Почему я не вижу результатов метода сокращения Kafka Streams?

Учитывая следующий код:

KStream<String, Custom> stream =  
    builder.stream(Serdes.String(), customSerde, "test_in");

stream
    .groupByKey(Serdes.String(), customSerde)
    .reduce(new CustomReducer(), "reduction_state")
    .print(Serdes.String(), customSerde);

У меня есть оператор println внутри метода apply редьюсера, который успешно распечатывается, когда я ожидаю, что произойдет сокращение. Однако окончательный оператор печати, показанный выше, ничего не отображает. аналогичным образом, если я использую метод to, а не print, я не вижу сообщений в теме назначения.

Что мне нужно после оператора сокращения, чтобы увидеть результат сокращения? Если на вход подается одно значение, я ничего не ожидаю увидеть. Если вводится второе значение с тем же ключом, я ожидаю, что редуктор применится (что он и делает), и я также ожидаю, что результат редукции продолжится до следующего шага в конвейере обработки. Как описано, я ничего не вижу на последующих этапах конвейера и не понимаю, почему.


person LaserJesus    schedule 10.11.2016    source источник
comment
Попробуйте установить StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG в значение 0.   -  person Matthias J. Sax    schedule 11.11.2016
comment
@ MatthiasJ.Sax Спасибо! Это решило проблему для меня, пожалуйста, не стесняйтесь опубликовать это как ответ, чтобы я мог наградить вас баллами. Если бы вы могли включить ссылку на дополнительную информацию об этой детали конфигурации и тому подобное, я также был бы очень признателен.   -  person LaserJesus    schedule 11.11.2016


Ответы (1)


Начиная с Kafka 0.10.1.0, все операторы агрегации используют внутренний кеш дедупликации, чтобы уменьшить нагрузку на результирующий поток журнала изменений KTable. Например, если вы считаете и обрабатываете две записи с одним и тем же ключом непосредственно друг за другом, полный поток журнала изменений будет <key:1>, <key:2>.

С новой функцией кэширования кеш будет получать <key:1> и сохранять его, но не сразу отправлять вниз по течению. Когда вычисляется <key:2>, он заменяет первую запись кэша. В зависимости от размера кэша, количества отдельных ключей, пропускной способности и интервала фиксации кэш отправляет записи вниз по течению. Это происходит либо при вытеснении кеша для одной ключевой записи, либо при полной очистке кеша (отправке всех записей вниз по течению). Таким образом, журнал изменений KTable может отображать только <key:2> (поскольку <key:1> не дублируется).

Вы можете контролировать размер кеша с помощью параметра конфигурации Streams StreamConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG. Если вы установите значение равным нулю, вы полностью отключите кэширование, и журнал изменений KTable будет содержать все обновления (фактически обеспечивая поведение до 0.10.1.0).

Документация Confluent содержит раздел, в котором более подробно описывается кеш:

person Matthias J. Sax    schedule 11.11.2016