Я реализую пакетный прослушиватель spring kafka, который читает список сообщений из темы Kafka и отправляет данные в службу REST. Я хотел бы понять управление смещением в случае отказа службы REST, смещения для пакета не должны фиксироваться, а сообщения должны обрабатываться для следующего опроса. Я прочитал документацию spring kafka, но есть путаница в понимании разницы между обработчиком ошибок слушателя и поиском обработчиков ошибок текущего контейнера в пакетном режиме. Я использую версию spring-boot-2.0.0.M7, и ниже мой код.
Listener Config:
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(Integer.parseInt(env.getProperty("spring.kafka.listener.concurrency")));
// factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setBatchErrorHandler(kafkaErrorHandler());
factory.getContainerProperties().setAckMode(AckMode.BATCH);
factory.setBatchListener(true);
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap-servers"));
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
env.getProperty("spring.kafka.consumer.enable-auto-commit"));
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
env.getProperty("spring.kafka.consumer.auto-commit-interval"));
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("spring.kafka.session.timeout"));
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("spring.kafka.consumer.group-id"));
return propsMap;
}
Listener Class:
@KafkaListener(topics = "${spring.kafka.consumer.topic}", containerFactory = "kafkaListenerContainerFactory")
public void listen(List<String> payloadList) throws Exception {
if (payloadList.size() > 0)
//Post to the service
}
Kafka Error Handler:
public class KafkaErrorHandler implements BatchErrorHandler {
private static Logger LOGGER = LoggerFactory.getLogger(KafkaErrorHandler.class);
@Override
public void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
LOGGER.info("Exception occured while processing::" + thrownException.getMessage());
}
}
Как обрабатывать слушателя Kafka, чтобы, если что-то произойдет во время обработки пакета записей, я не потеряю данные.