В нашем проекте мы используем spring-cloud-stream-binder-kafka версии 1.1.2 для интеграции с kafka. Недавно у нас была ситуация, когда одна из наших служб потребляла старые сообщения (уже использованные) из темы после запуска. В этой теме есть 2 раздела и 2 потребителя, сгруппированных в группу потребителей. Мы не уверены, правильно ли привязано смещение к zookeeper или нет. Нижеприведенное сообщение об ошибке выдается для каждого сообщения во время запуска.
[-kafka-listener-2] ERROR o.s.k.listener.LoggingErrorHandler.handle - Error while processing: ConsumerRecord(topic = statemachine_deal_notification, partition = 1, offset = 926, key = null, value = [B@6fab0a32)
Чтобы этого больше не повторилось, мы всегда хотели читать только последнее сообщение в теме. Я обнаружил, что установка resetOffsets
на true
и startOffset
на latest
выполнит свою работу. Но на потребителя эти свойства не влияют. Позже выяснилось, что эта функция была удалена.
Есть ли другой способ убедиться, что потребители в определенной группе потребляют только самые свежие сообщения ???.