Тесты JUnit с Spring Boot 2.1.12 и Spring для Apache Kafka 2.2.12

Просто пытаюсь найти простой пример с Spring Boot 2.1.12 и Spring для Apache Kafka 2.2.12, который работает с KafkaListener, чтобы повторить последнее неудачное сообщение. Если сообщение не удается, оно должно быть перенаправлено в другую тему, где будут предприняты попытки повторения. У нас будет 4 темы. topic, retryTopic, sucessTopic и errorTopic Если тема не удалась, следует перенаправить на retryTopic, где будут сделаны 3 попытки повтора. Если эти попытки не удались, необходимо перенаправить на errorTopic. В случае успеха как по теме, так и по retryTopic, следует перенаправить на sucessTopic. И мне нужно покрыть 90% случаев тестом JUnit.


person Lucas Lopes    schedule 12.02.2020    source источник
comment
Я уже ответил на ваш вопрос здесь . Зачем открывать новый вопрос с той же информацией? Что тебе еще надо? Почему бы тебе не прокомментировать этот ответ?   -  person Gary Russell    schedule 13.02.2020
comment
Я новичок на форуме (извините), потому что мне нужен образец с тестами JUnit. Еще не знаю всех правил.   -  person Lucas Lopes    schedule 13.02.2020
comment
Обычно лучше задать новый вопрос, если у вас есть дополнительные вопросы, но вы должны хотя бы сослаться на исходный вопрос. Что-то вроде ответа на [этот вопрос], как мне писать тесты JUnit ?. См. Также stackoverflow.com/help/someone-answers - когда вопрос помечен как принятый, это помогает другим людям находить ответы. Я отвечу здесь тестами JUnit завтра, если только кто-то меня не опередит (здесь уже поздно).   -  person Gary Russell    schedule 13.02.2020
comment
Хорошо, мистер Рассел. Спасибо.   -  person Lucas Lopes    schedule 13.02.2020
comment
Кстати, spring-kafka и spring-kafka-test есть в версии 2.2.8.RELEASE.   -  person Lucas Lopes    schedule 13.02.2020


Ответы (2)


public Jaxb2Marshaller marshaller() {
    Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
    // this package must match the package in the <generatePackage>
    // specified in
    // pom.xml
    marshaller.setContextPath("br.com.company.ws");
    return marshaller;
}

@Bean
public CountryClient countryClient(Jaxb2Marshaller marshaller) {
    CountryClient client = new CountryClient();
    client.setDefaultUri(link);
    WebServiceTemplate template = client.getWebServiceTemplate();
    template.setMessageSender(new WebServiceMessageSenderWithAuth(username, password));
    client.setMarshaller(marshaller);
    client.setUnmarshaller(marshaller);
    return client;
}

Открытый класс @Service CountryClient расширяет WebServiceGatewaySupport {

@Value("${spring.link.consumer.link}")
private String link;


public ZfifNfMaoResponse getCountry(ZfifNfMao zfifNfMao) {

    zfifNfMao = new ZfifNfMao();

    ZfifNfMaoResponse response = (ZfifNfMaoResponse)getWebServiceTemplate().marshalSendAndReceive(link, zfifNfMao);

    return response;
}

}

person Lucas Lopes    schedule 13.02.2020

2.2.8.РЕЛИЗ

Это 6 месяцев.

Вы всегда должны использовать последнюю версию в второстепенном выпуске, чтобы убедиться, что у вас есть все исправления ошибок; текущая версия 2.2.x - 2.2.12.

Я переработал свой ответ 2.2.x из здесь с примером того, как можно протестировать такое приложение. Он использует встроенный тестовый брокер Kafka.

@SpringBootApplication
public class So601723041Application {

    public static void main(String[] args) {
        SpringApplication.run(So601723041Application.class, args);
    }

    @Bean
    ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory,
            KafkaTemplate<Object, Object> template) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<Object, Object>() {

                    @Override
                    protected void initializeContainer(ConcurrentMessageListenerContainer<Object, Object> instance,
                            KafkaListenerEndpoint endpoint) {

                        super.initializeContainer(instance, endpoint);
                        customize(instance, template);
                    }

        };
        configurer.configure(factory, kafkaConsumerFactory);
        return factory;
    }

    @Bean
    ConcurrentKafkaListenerContainerFactory<?, ?> retryKafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory,
            KafkaTemplate<Object, Object> template) {

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<Object, Object>() {

                    @Override
                    protected void initializeContainer(ConcurrentMessageListenerContainer<Object, Object> instance,
                            KafkaListenerEndpoint endpoint) {

                        super.initializeContainer(instance, endpoint);
                        customize(instance, template);
                    }

        };
        configurer.configure(factory, kafkaConsumerFactory);
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3));
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(5000L);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        factory.setRetryTemplate(retryTemplate);
        return factory;
    }

    private void customize(ConcurrentMessageListenerContainer<Object, Object> container,
            KafkaTemplate<Object, Object> template) {

        if (container.getContainerProperties().getTopics()[0].equals("topic")) {
            container.setErrorHandler(new SeekToCurrentErrorHandler(
                    new DeadLetterPublishingRecoverer(template,
                            (cr, ex) -> new TopicPartition("retryTopic", cr.partition())),
                    0));
        }
        else if (container.getContainerProperties().getTopics()[0].equals("retryTopic")) {
            container.setErrorHandler(new SeekToCurrentErrorHandler(
                    new DeadLetterPublishingRecoverer(template,
                            (cr, ex) -> new TopicPartition("errorTopic", cr.partition())),
                    0)); // no retries here - retry template instead.
        }
    }

}

@Component
public class Listener {

    private final KafkaTemplate<String, String> template;

    private SomeService service;

    public Listener(KafkaTemplate<String, String> template, SomeService service) {
        this.template = template;
        this.service = service;
    }

    public void setService(SomeService service) {
        this.service = service;
    }

    @KafkaListener(id = "so60172304.1", topics = "topic")
    public void listen1(String in) {
        this.service.process(in);
        this.template.send("successTopic", in);
    }

    @KafkaListener(id = "so60172304.2", topics = "retryTopic", containerFactory = "retryKafkaListenerContainerFactory")
    public void listen2(String in) {
        this.service.retry(in);
        this.template.send("successTopic", in);
    }

}

public interface SomeService {

    void process(String in);

    void retry(String in);

}

@Component
public class DefaultService implements SomeService {

    @Override
    public void process(String in) {
        System.out.println("topic: " + in);
    }

    @Override
    public void retry(String in) {
        System.out.println("retryTopic: " + in);
    }

}
spring.kafka.consumer.auto-offset-reset=earliest

Прецедент:

@RunWith(SpringRunner.class)
@SpringBootTest
@TestPropertySource(properties = "spring.kafka.bootstrap-servers = ${spring.embedded.kafka.brokers}")
public class So601723041ApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule embedded = new EmbeddedKafkaRule(1, true, 1,
            "topic", "retryTopic", "successTopic", "errorTopic");

    @Autowired
    private Listener listener;

    @Autowired
    private KafkaTemplate<String, String> template;

    @Test
    public void test() {
        TestService testService = spy(new TestService());
        this.listener.setService(testService);
        template.send("topic", "failAlways");
        template.send("topic", "onlyFailFirst");
        template.send("topic", "good");
        Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "false", embedded.getEmbeddedKafka());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
        embedded.getEmbeddedKafka().consumeFromEmbeddedTopics(consumer, "successTopic");
        List<ConsumerRecord<Integer, String>> received = new ArrayList<>();
        int n = 0;
        while (received.size() < 2 && n++ < 10) {
            ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(10));
            records.forEach(rec -> received.add(rec));
        }
        assertThat(received.size() == 2);
        consumer.close();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
        consumer = new KafkaConsumer<>(props);
        embedded.getEmbeddedKafka().consumeFromEmbeddedTopics(consumer, "errorTopic");
        n = 0;
        while (received.size() < 3 && n++ < 10) {
            ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(10));
            records.forEach(rec -> received.add(rec));
        }
        assertThat(received.size() == 3);
        consumer.close();
        verify(testService, times(3)).process(anyString());
        verify(testService, times(4)).retry(anyString());
        assertThat(received).extracting(rec -> rec.value())
                .contains("good", "onlyFailFirst", "failAlways");
    }

    public static class TestService implements SomeService {

        CountDownLatch latch = new CountDownLatch(1);

        @Override
        public void process(String in) {
            System.out.println("topic: " + in);
            if (in.toLowerCase().contains("fail")) {
                throw new RuntimeException(in);
            }
        }

        @Override
        public void retry(String in) {
            System.out.println("retryTopic: " + in);
            if (in.startsWith("fail")) {
                throw new RuntimeException(in);
            }
        }

    }

}

EMMA показывает 97,2% покрытия

ПОМ:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.12.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>so60172304-1</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>so60172304-1</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
person Gary Russell    schedule 13.02.2020
comment
Я только что повторно запустил его против 2.2.8, но тест все еще работает зеленым. - person Gary Russell; 13.02.2020
comment
К сожалению, embeddedTopics не используется. Вы знаете, что это могло быть? записи имеют размер 0. - person Lucas Lopes; 13.02.2020
comment
У вас spring.kafka.consumer.auto-offset-reset=earliest в application.properties? По умолчанию новые потребители потребляют с конца темы, и в тесте происходит гонка между отправкой записей и запуском потребителей. Приведенный выше код взят из полностью рабочего приложения. - person Gary Russell; 13.02.2020
comment
Да. Мне пришлось поместить эту конфигурацию (spring.main.allow-bean-definition-overriding = true) в application.properties, потому что без нее приложение не запускается. - person Lucas Lopes; 14.02.2020
comment
Почему? Какие компоненты вы переопределяете в своем тесте? В моем тесте не было переопределений. - person Gary Russell; 14.02.2020
comment
Мне нужно общаться с SAP с помощью SOAP. Потом я получил это исключение. Компонент 'countryClient', определенный в ресурсе пути к классу [br / com / company / config / ReceiverConfig.class], не может быть зарегистрирован. Компонент с таким именем уже определен в файле [C: \ Users \ kafka_projects \ csm-microservice \ target \ classes \ br \ com \ company \ service \ CountryClient.class], и переопределение отключено. - person Lucas Lopes; 14.02.2020
comment
Это не имеет отношения к этому тесту, поэтому разрешить переопределение bean-компонентов можно; он не должен иметь никакого отношения к тому, получит ли ваш потребитель в тесте какие-либо записи; Я предлагаю вам включить ведение журнала DEBUG, чтобы понять, что происходит. Как я уже сказал, приведенный выше тест работает нормально, поэтому что-то не так с вашим кодом / конфигурацией. Я больше ничем не могу помочь. - person Gary Russell; 14.02.2020
comment
Я добавил пом. - person Gary Russell; 14.02.2020
comment
Вы знаете, что это за ошибка :? org.apache.kafka.test.TestUtils: ошибка при удалении C: \ Users \ username \ AppData \ Local \ Temp \ kafka-2102474369578365822 - person Lucas Lopes; 14.02.2020
comment
Иногда возникали некоторые проблемы с завершением работы встроенного брокера в Windows, но мы не дошли до сути; какая-то проблема с синхронизацией в самом коде Kafka. Но не забудьте закрыть всех потребителей или производителей, которых вы создаете вручную в своих тестах (Spring позаботится о закрытии всех, которыми он управляет). - person Gary Russell; 14.02.2020