Spring Kafka: как отбросить сообщения, уже полученные опросом () после выполнения поиска ()?

Это дополнительный вопрос к чтению одного и того же сообщения несколько раз от Kafka< /а>. Если есть лучший способ задать этот вопрос, не публикуя новый вопрос, дайте мне знать. В этом посте Гэри упоминает

«Но вы все равно сначала увидите более поздние сообщения, если они уже были получены, поэтому вам придется отбросить и их».

Есть ли чистый способ отбросить сообщения, уже прочитанные poll() после вызова seek()? Я начал реализовывать логику для этого, сохраняя начальное смещение (в recordOffset), увеличивая его в случае успеха. В случае неудачи я вызываю seek() и устанавливаю значение recordOffset в record.offset(). Затем для каждого нового сообщения я проверяю, больше ли record.offset(), чем recordOffset. Если это так, я просто вызываю accept(), тем самым «отбрасывая» все ранее прочитанные сообщения. Вот код -

    // in onMessage()...
    if (record.offset() > recordOffset){
        acknowledgment.acknowledge();
        return;
    }

    try {
        processRecord(record);
        recordOffset = record.offset()+1;
        acknowledgment.acknowledge();
    } catch (Exception e) {
        recordOffset = record.offset();
        consumerSeekCallback.seek(record.topic(), record.partition(), record.offset());
    }

Проблема с этим подходом заключается в том, что он усложняется при наличии нескольких разделов. Есть ли более простой/чистый способ?

ИЗМЕНИТЬ 1 Основываясь на предложении Гэри ниже, я попытался добавить такой обработчик ошибок:

@KafkaListener(topicPartitions =
        {@org.springframework.kafka.annotation.TopicPartition(topic = "${kafka.consumer.topic}", partitions = { "1" })},
        errorHandler = "SeekToCurrentErrorHandler")

Что-то не так с этим синтаксисом, когда я получаю «Не удается разрешить метод« errorHandler »»?

РЕДАКТИРОВАТЬ 2 После того, как Гэри объяснил 2 обработчика ошибок, я удалил указанный выше обработчик ошибок и добавил в файл конфигурации следующее:

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProps()));
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
    return factory;
}

Когда я запускаю приложение, я получаю эту ошибку сейчас...

java.lang.NoSuchMethodError: org.springframework.util.Assert.state(ZLjava/util/function/Supplier;)V at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.determineInferredType(MessagingMessageListenerAdapter.java:396)

Вот строка 396 -

Assert.state(!this.isConsumerRecordList || validParametersForBatch,
            () -> String.format(stateMessage, "ConsumerRecord"));
Assert.state(!this.isMessageList || validParametersForBatch,
            () -> String.format(stateMessage, "Message<?>"));

person rmulay    schedule 26.11.2017    source источник


Ответы (2)


Начиная с версии 2.0.1, если ErrorHandler контейнера является RemainingRecordsErrorHandler, например SeekToCurrentErrorHandler, оставшиеся записи (включая неудачную) отправляются обработчику ошибок вместо прослушивателя.

Это позволяет SeekToCurrentErrorHandler перемещать каждый раздел, чтобы следующий опрос вернул необработанные записи.

/**
 * An error handler that seeks to the current offset for each topic in the remaining
 * records. Used to rewind partitions after a message failure so that it can be
 * replayed.
 *
 * @author Gary Russell
 * @since 2.0.1
 *
 */
public class SeekToCurrentErrorHandler implements RemainingRecordsErrorHandler 

ИЗМЕНИТЬ

Существует два типа обработчиков ошибок. KafkaListenerErrorHandler (указанный в аннотации) работает на уровне слушателя; он подключен к адаптеру прослушивателя, который вызывает аннотацию @KafkaListener и поэтому имеет доступ только к текущей записи.

Второй обработчик ошибок (настроенный в контейнере прослушивателя) работает на уровне контейнера и, таким образом, имеет доступ к остальным записям. SeekToCurrentErrorHandler — это обработчик ошибок на уровне контейнера.

Он настраивается в свойствах контейнера на заводе контейнеров...

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(this.consumerFactory);
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    return factory;
}
person Gary Russell    schedule 26.11.2017
comment
Гэри - спасибо за ваше предложение. Я получаю сообщение об ошибке (опубликовано в отредактированном вопросе), когда добавляю errorHandler. Кроме того, я новичок в gradle и не уверен, достаточно ли просто обновить версию в build.gradle, чтобы перейти на 2.0.1. Я добавил - скомпилируйте "org.springframework.kafka:spring-kafka:2.0.1.RELEASE". Я буду продолжать пытаться ... просто хотел, чтобы вы знали, где я застрял с этим. - person rmulay; 28.11.2017
comment
Гэри. Я обновил свой код, чтобы установить обработчик ошибок на уровне контейнера, как вы показали. Теперь я получаю ошибку утверждения при запуске. Я добавил некоторые детали в вопрос. Есть идеи, почему? - person rmulay; 28.11.2017
comment
Для Spring Kafka 2.x требуется Spring Framework 5 (в настоящее время 5.0.2). - person Gary Russell; 28.11.2017
comment
На обновление до 2.0.1 ушел целый день... но, наконец, я смог протестировать это, и оно работает! Я проведу более тщательное тестирование с большим количеством сообщений, чтобы быть абсолютно уверенным. Спасибо Гэри! - person rmulay; 29.11.2017
comment
Возможно, я делаю что-то не так, но приведенный выше код не работает в spring-kafka-2.2.9.RELEASE. Ошибка The method setErrorHandler(SeekToCurrentErrorHandler) is undefined for the type ContainerProperties - person jumping_monkey; 16.10.2019
comment
Обработчик ошибок был перенесен на фабрику, начиная с версии 2.2. Всегда проверяйте Что нового? в документации. - person Gary Russell; 16.10.2019