Spring kafka Batch Listener - фиксировать смещения вручную в пакетном режиме

Я реализую пакетный прослушиватель spring kafka, который читает список сообщений из темы Kafka и отправляет данные в службу REST. Я хотел бы понять управление смещением в случае отказа службы REST, смещения для пакета не должны фиксироваться, а сообщения должны обрабатываться для следующего опроса. Я прочитал документацию spring kafka, но есть путаница в понимании разницы между обработчиком ошибок слушателя и поиском обработчиков ошибок текущего контейнера в пакетном режиме. Я использую версию spring-boot-2.0.0.M7, и ниже мой код.

Listener Config:

@Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        factory.setConcurrency(Integer.parseInt(env.getProperty("spring.kafka.listener.concurrency")));
        // factory.getContainerProperties().setPollTimeout(3000);
        factory.getContainerProperties().setBatchErrorHandler(kafkaErrorHandler());

        factory.getContainerProperties().setAckMode(AckMode.BATCH);
        factory.setBatchListener(true);
        return factory;
    }
@Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("spring.kafka.bootstrap-servers"));
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
                env.getProperty("spring.kafka.consumer.enable-auto-commit"));
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
                env.getProperty("spring.kafka.consumer.auto-commit-interval"));
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, env.getProperty("spring.kafka.session.timeout"));
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("spring.kafka.consumer.group-id"));
        return propsMap;
    }

Listener Class:

@KafkaListener(topics = "${spring.kafka.consumer.topic}", containerFactory = "kafkaListenerContainerFactory")
    public void listen(List<String> payloadList) throws Exception {
        if (payloadList.size() > 0)
            //Post to the service
    }

Kafka Error Handler:

public class KafkaErrorHandler implements BatchErrorHandler {

    private static Logger LOGGER = LoggerFactory.getLogger(KafkaErrorHandler.class);

    @Override
    public void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
        LOGGER.info("Exception occured while processing::" + thrownException.getMessage());

            }

}

Как обрабатывать слушателя Kafka, чтобы, если что-то произойдет во время обработки пакета записей, я не потеряю данные.


person user8363477    schedule 13.12.2017    source источник
comment
Привет. Вы смогли найти способ сделать это?   -  person jbakirov    schedule 24.07.2019


Ответы (1)


С Apache Kafka мы никогда не потеряем данные. В журналах разделов действительно есть смещение для поиска в любую произвольную позицию.

С другой стороны, когда мы потребляем записи из раздела, нет необходимости фиксировать их смещения - текущий потребитель хранит состояние в памяти. Нам нужно совершать фиксацию только для других, новых потребителей в той же группе, когда текущий мертвый. Независимо от ошибки текущий потребитель всегда переходит к опросу новых данных за своим текущим смещением в памяти.

Итак, чтобы повторно обработать одни и те же данные у одного и того же потребителя, мы определенно должны использовать операцию seek, чтобы переместить потребителя обратно в желаемое положение. Вот почему Spring Kafka представляет SeekToCurrentErrorHandler:

Это позволяет реализациям искать все необработанные темы / разделы, чтобы текущая запись (и остальные оставшиеся) были извлечены при следующем опросе. SeekToCurrentErrorHandler делает именно это.

https://docs.spring.io/spring-kafka/reference/htmlsingle/#_seek_to_current_container_error_handlers

person Artem Bilan    schedule 13.12.2017
comment
Обрабатывает ли SeekToCurrentErrorHandler также пакетные операции? поскольку я не вижу класса SeekToCurrentBatchErrorHandler, доступного в пакете org.springframework.kafka.listener. Кроме того, сообщите мне, как происходит запись в журнал, если я установил обработчик ошибок seektocurrenterrorhandler. - person user8363477; 14.12.2017
comment
Есть один SeekToCurrentBatchErrorHandler, но он уже с 2.1: docs.spring.io/spring-kafka/docs/2.1.0.RELEASE/reference/html/. Вы можете обновить или скопировать / вставить его код в свой собственный класс - person Artem Bilan; 14.12.2017