Доступ к потребителю из ConsumerRebalanceListener в Spring Kafka

Мне нужно использовать ConsumerRebalanceListener и обнаружить, что его можно зарегистрировать с помощью метода containerProperties.setConsumerRebalanceListener. Мне нужен экземпляр потребителя в прослушивателе балансировщика для получения позиций разделов (consumer.position()) и для переопределения смещений выборки потребителя (consumer.seek()), но я не смог найти способ получить доступ к экземпляру потребителя.

Отредактировано

Подняли вопрос GH, и вот ссылка https://github.com/spring-projects/spring-kafka/issues/304


person Aram Mkrtchyan    schedule 28.04.2017    source источник


Ответы (1)


Для этого вам необходимо использовать реализацию ConsumerSeekAware из ваш целевой слушатель:

Первый вызывается при запуске контейнера; этот обратный вызов следует использовать при поиске в произвольное время после инициализации. Вы должны сохранить ссылку на обратный вызов; если вы используете один и тот же прослушиватель в нескольких контейнерах (или в ConcurrentMessageListenerContainer), вы должны сохранить обратный вызов в ThreadLocal или какой-либо другой структуре, на которую указывает поток прослушивателя.

При использовании группового управления второй метод вызывается при изменении назначений. Вы можете использовать этот метод, например, для установки начальных смещений для разделов, вызвав обратный вызов; вы должны использовать аргумент обратного вызова, а не тот, который передается в registerSeekCallback. Этот метод никогда не будет вызываться, если вы сами явно назначаете разделы; в этом случае используйте TopicPartitionInitialOffset.

person Artem Bilan    schedule 28.04.2017
comment
Спасибо, за ответ Артем. Но мне также нужно обработать обратный вызов ConsumerRebalanceListener.onPartitionsRevoked, где мне нужно получить положение раздела для его сохранения в моей базе данных. - person Aram Mkrtchyan; 28.04.2017
comment
ХОРОШО. Я вижу вашу точку зрения. Начиная с версии 2.0 мы предоставляем новый аргумент Consumer для целевого слушателя: spring.io/blog/2017/04/27/. Но OTOH вы можете просто рассмотреть возможность отслеживания offset для каждого сообщения и действительно сохранять в БД, когда происходит onPartitionsRevoked(). - person Artem Bilan; 28.04.2017
comment
Однако я понимаю вашу точку зрения, и нам действительно может понадобиться добавить еще один метод обратного вызова в ConsumerSeekAware и указать позиции для отозванных разделов. Не стесняйтесь поднимать вопрос GH по этому поводу со ссылкой на эту тему SO: github. com/spring-projects/spring-kafka/issues - person Artem Bilan; 28.04.2017
comment
Вы знаете, когда выйдет Spring Kafka 2.0? - person Aram Mkrtchyan; 28.04.2017
comment
Где-то сразу после выпуска Spring 5.0 GA - person Artem Bilan; 28.04.2017