spring-cloud-stream-kafka Потреблять только последние сообщения после запуска приложения

В нашем проекте мы используем spring-cloud-stream-binder-kafka версии 1.1.2 для интеграции с kafka. Недавно у нас была ситуация, когда одна из наших служб потребляла старые сообщения (уже использованные) из темы после запуска. В этой теме есть 2 раздела и 2 потребителя, сгруппированных в группу потребителей. Мы не уверены, правильно ли привязано смещение к zookeeper или нет. Нижеприведенное сообщение об ошибке выдается для каждого сообщения во время запуска.

[-kafka-listener-2] ERROR o.s.k.listener.LoggingErrorHandler.handle - Error while processing: ConsumerRecord(topic = statemachine_deal_notification, partition = 1, offset = 926, key = null, value = [B@6fab0a32)

Чтобы этого больше не повторилось, мы всегда хотели читать только последнее сообщение в теме. Я обнаружил, что установка resetOffsets на true и startOffset на latest выполнит свою работу. Но на потребителя эти свойства не влияют. Позже выяснилось, что эта функция была удалена.

Есть ли другой способ убедиться, что потребители в определенной группе потребляют только самые свежие сообщения ???.


person Hariharan    schedule 28.06.2017    source источник


Ответы (1)


Это была ошибка, которая впоследствии была исправлена ​​здесь: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/commit/7355ada4613ad50fe95430f1859d4ea65f004be1. Вы можете попробовать версии SNAPSHOT (1.2.2.BUILD-SNAPSHOT или 1.3.0.BUILD-SNAPSHOT), чтобы проверить это исправление.

person Ilayaperumal Gopinathan    schedule 28.06.2017