auto-offset-reset = latest не работает в spring-kafka

У меня есть вариант использования, когда я хочу, чтобы потребитель всегда начинал с последнего смещения. Мне не нужно фиксировать смещения для этого потребителя. Этого невозможно достичь с помощью spring-kafka, поскольку новая группа потребителей всегда фиксирует вновь назначенные разделы. Затем при последующих запусках программы потребитель читает из этого сохраненного смещения, а не из последнего. Другими словами, только самый первый запуск с новой группой потребителей ведет себя правильно, т. Е. Потребление из последней. Проблема в KafkaMessageListenerContainer$ListenerConsumer.onPartitionsAssigned()

Для справки я установил следующее в весенней загрузке

spring.kafka.listener.ack-mode=manual
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false

person Daniel Nitzan    schedule 19.08.2017    source источник


Ответы (1)


Этот код был добавлен для решения некоторых неприятных условий гонки, когда произошло перераспределение, когда новая группа потребителей начала использовать; это может привести к потере или дублированию записей, в зависимости от конфигурации.

Было сочтено, что лучше всего зафиксировать начальное смещение, чтобы избежать этих условий.

Я согласен, однако, с тем, что если пользователь берет на себя полную ответственность за смещения (с ручным режимом подтверждения), то нам, вероятно, не следует делать эту фиксацию; это зависит от пользовательского кода, чтобы иметь дело с гонкой (в вашем случае вас не волнуют потерянные записи).

Не стесняйтесь открывать выпуск GitHub (вклады приветствуются).

А пока вы можете избежать ситуации, если ваш слушатель реализует ConsumerSeekAware и ищет конец темы / раздела во время назначения.

Другой альтернативой является использование UUID для group.id каждый раз; и вы всегда будете начинать с конца темы.

person Gary Russell    schedule 19.08.2017