Это дополнительный вопрос к чтению одного и того же сообщения несколько раз от Kafka< /а>. Если есть лучший способ задать этот вопрос, не публикуя новый вопрос, дайте мне знать. В этом посте Гэри упоминает
«Но вы все равно сначала увидите более поздние сообщения, если они уже были получены, поэтому вам придется отбросить и их».
Есть ли чистый способ отбросить сообщения, уже прочитанные poll() после вызова seek()? Я начал реализовывать логику для этого, сохраняя начальное смещение (в recordOffset), увеличивая его в случае успеха. В случае неудачи я вызываю seek() и устанавливаю значение recordOffset в record.offset(). Затем для каждого нового сообщения я проверяю, больше ли record.offset(), чем recordOffset. Если это так, я просто вызываю accept(), тем самым «отбрасывая» все ранее прочитанные сообщения. Вот код -
// in onMessage()...
if (record.offset() > recordOffset){
acknowledgment.acknowledge();
return;
}
try {
processRecord(record);
recordOffset = record.offset()+1;
acknowledgment.acknowledge();
} catch (Exception e) {
recordOffset = record.offset();
consumerSeekCallback.seek(record.topic(), record.partition(), record.offset());
}
Проблема с этим подходом заключается в том, что он усложняется при наличии нескольких разделов. Есть ли более простой/чистый способ?
ИЗМЕНИТЬ 1 Основываясь на предложении Гэри ниже, я попытался добавить такой обработчик ошибок:
@KafkaListener(topicPartitions =
{@org.springframework.kafka.annotation.TopicPartition(topic = "${kafka.consumer.topic}", partitions = { "1" })},
errorHandler = "SeekToCurrentErrorHandler")
Что-то не так с этим синтаксисом, когда я получаю «Не удается разрешить метод« errorHandler »»?
РЕДАКТИРОВАТЬ 2 После того, как Гэри объяснил 2 обработчика ошибок, я удалил указанный выше обработчик ошибок и добавил в файл конфигурации следующее:
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProps()));
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
return factory;
}
Когда я запускаю приложение, я получаю эту ошибку сейчас...
java.lang.NoSuchMethodError: org.springframework.util.Assert.state(ZLjava/util/function/Supplier;)V at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.determineInferredType(MessagingMessageListenerAdapter.java:396)
Вот строка 396 -
Assert.state(!this.isConsumerRecordList || validParametersForBatch,
() -> String.format(stateMessage, "ConsumerRecord"));
Assert.state(!this.isMessageList || validParametersForBatch,
() -> String.format(stateMessage, "Message<?>"));