Тестирование @KafkaListener с использованием Spring Embedded Kafka

Я пытаюсь написать модульный тест для слушателя Kafka, который я разрабатываю с помощью Spring Boot 2.x. Будучи модульным тестом, я не хочу запускать полный сервер Kafka как экземпляр Zookeeper. Итак, я решил использовать Spring Embedded Kafka.

Определение моего слушателя очень простое.

@Component
public class Listener {
    private final CountDownLatch latch;

    @Autowired
    public Listener(CountDownLatch latch) {
        this.latch = latch;
    }

    @KafkaListener(topics = "sample-topic")
    public void listen(String message) {
        latch.countDown();
    }
}

Также очень проста проверка, которая проверяет, что счетчик latch равен нулю после получения сообщения.

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "sample-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ListenerTest {

    @Autowired
    private KafkaEmbedded embeddedKafka;

    @Autowired
    private CountDownLatch latch;

    private KafkaTemplate<Integer, String> producer;

    @Before
    public void setUp() {
        this.producer = buildKafkaTemplate();
        this.producer.setDefaultTopic("sample-topic");
    }

    private KafkaTemplate<Integer, String> buildKafkaTemplate() {
        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
        return new KafkaTemplate<>(pf);
    }

    @Test
    public void listenerShouldConsumeMessages() throws InterruptedException {
        // Given
        producer.sendDefault(1, "Hello world");
        // Then
        assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
    }
}

К сожалению, тест не проходит, и я не могу понять почему. Можно ли использовать экземпляр KafkaEmbedded для тестирования метода, отмеченного аннотацией @KafkaListener?

Весь код доступен в моем репозитории GitHub kafka-listener.

Спасибо всем.


person riccardo.cardin    schedule 01.05.2018    source источник
comment
См. Изменение моего ответа; Я не заметил, что вы не использовали свойства конфигурации загрузки для потребителя.   -  person Gary Russell    schedule 02.05.2018


Ответы (2)


Вы, вероятно, отправляете сообщение до того, как потребителю будет назначена тема / раздел. Установить свойство ...

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest

... по умолчанию latest.

Это похоже на использование --from-beginning с консольным потребителем.

ИЗМЕНИТЬ

Ой; вы не используете свойства загрузки.

Добавлять

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

РЕДАКТИРОВАТЬ2

Кстати, вам, вероятно, также следует сделать get(10L, TimeUnit.SECONDS) результат template.send() (a Future<>), чтобы подтвердить, что отправка была успешной.

РЕДАКТИРОВАТЬ3

Чтобы отменить сброс смещения только для теста, вы можете сделать то же самое, что и для адресов брокера:

@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;

...

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);

а также

@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.consumer.auto-offset-reset=earliest"})

Однако имейте в виду, что это свойство применяется только при первом потреблении группой. Чтобы всегда начинать с конца при каждом запуске приложения, вам нужно искать до конца во время запуска.

Кроме того, я бы рекомендовал установить enable.auto.commit на false, чтобы контейнер заботился о фиксации смещений, а не просто полагался на клиента-потребителя, выполняющего это по расписанию.

person Gary Russell    schedule 01.05.2018
comment
Спасибо. Задайте для свойства auto.offset.reset значение earliest, и это чудо :) - person riccardo.cardin; 02.05.2018
comment
Однако, если мне нужно использовать latest как значение auto.offset.reset? Как я могу заставить тест работать? Большое спасибо. - person riccardo.cardin; 02.05.2018
comment
Как это вообще тестирует класс Listener, содержащий метод @KafkaListener? Я вижу утверждение CountDownLatch, но никогда не вижу утверждения о том, что метод был поражен ... - person Rob; 21.05.2019
comment
? код в методе @KafkaListener ведет обратный отсчет защелки, поэтому он был вызван. Обычно, однако, вы бы так не поступили. Скорее всего, слушатель вызывает службу, и вместо этого вы вставляете фиктивную или заглушенную службу. - person Gary Russell; 21.05.2019
comment
@GaryRussell - большое спасибо за совет! - person Krzysztof Tomaszewski; 26.04.2021

Может, кому-то это пригодится. У меня была похожая проблема. Локальные тесты выполнялись (некоторые проверки выполнялись в Awaitility.waitAtMost), но в конвейере Jenkins тесты не выполнялись.

Решение было, как уже упоминалось в ответе с наибольшим количеством голосов, установкой auto-offset-reset=earliest. Когда тесты запущены, вы можете проверить, правильно ли вы настроили конфигурацию, просмотрев журналы тестов. Конфигурация пружинных выходов как для производителя, так и для потребителя

person Pawel Ryznar    schedule 28.12.2020
comment
Очень полезно, спасибо! Когда я использовал Intellij, у меня была такая же проблема, пытаясь собрать покрытие. - person rios0rios0; 01.06.2021