Интеграционный тест с Spring-контекстом не получает сообщения RabbitMQ

RabbitMQ MessageConsumer не получает опубликованное сообщение

Я пишу интеграционный тест, который должен получить сообщение из очереди и выполнить обработку. Но потребитель просто не получает сообщения.

Когда я вручную вставляю зависимости - без контекста Spring, все работает нормально. Но когда я использую SpringContext, потребитель не получает сообщения.

SpringInfraConfig.class загружает значения из переменных среды. Чтобы «эмулировать» среду, я использую класс EnvironmentVariables из этой библиотеки. Переменные env загружаются нормально - проверено на отладке.

ПРИМЕЧАНИЕ, когда я упоминаю, что без SpringContext работает нормально, я также не использовал библиотеку среды.

Чтобы опубликовать сообщение в очереди RabbitMQ, я делаю это «вручную» в тестовом методе. Сообщение публикуется нормально. Я написал готовый тестовый код перед вызовом моего реального тестового класса. Это простой необработанный потребитель, переопределяющий DefaultConsumer#handleDelivery с sysout для печати входящего сообщения. Работает.

Когда я тестирую, используя свой реальный класс тестирования - MessageConsumerServiceImpl.class, он просто регистрирует начало потребления из очереди, и тест заканчивается.

Что-то довольно странное происходит, когда я отлаживаю и вхожу во все методы - сообщение получено, и в середине обработки оно заканчивается без завершения всех вызовов - тест просто останавливается, никаких ошибок тоже не выдается.

Еще одна странная вещь - при включении плагина управления RabbitMQ нет очередей, обмена, каналов или даже открытого соединения. Я проверил это во время отладки, когда остановился в точке останова.

Класс SpringConfig

@Import({SpringCoreConfig.class})
@ComponentScan({"br.com.fulltime.fullarm.fullcam.integration.infra",     "br.com.fulltime.fullarm.cross.cutting", "br.com.fulltime.fullarm.infrastructure.commons"})
@Configuration
public class SpringInfraConfig {

@Bean
public FInfraSettings getFInfraSettings() {
    Map<String, String> fInfraMap = new HashMap<>();
    fInfraMap.put("F_INFRA_RABBIT_HOST", "f_infra_rabbit_host");
    fInfraMap.put("F_INFRA_EXCHANGE", "f_infra_exchange");
    fInfraMap.put("F_INFRA_QUEUE", "f_infra_queue");
    fInfraMap.put("F_INFRA_PROCESS_ID", "f_infra_process_id");
    fInfraMap.put("F_INFRA_DESCRIPTION", "f_infra_description");
    fInfraMap.put("F_INFRA_TEXT", "f_infra_text");
    fInfraMap.put("F_INFRA_TAG", "f_infra_tag");
    fInfraMap.put("F_INFRA_WARNING_TIME", "f_infra_warning_time");
    fInfraMap.put("F_INFRA_CRITICAL_TIME", "f_infra_critical_time");

    return new FInfraSettings(
            getEnv("f_infra_run", "false").asBoolean(),
            getEnv("f_infra_ka_time", "1").asInt(),
            fInfraMap);
}

@Bean
public ApplicationSettings getApplicationSettings() {
    return new ApplicationSettings(
            getEnv("process_name", "FullArm-FullCam Integration").asString(),
            getEnv("process_version", "DEFAULT-1.0.0").asString());
}

@Bean
public PushoverSettings getPushoverSettings() {
    return new PushoverSettings(
            getEnv("pushover_api", "invalido").asString(),
            getEnv("pushover_user_id", "invalido").asString(),
            getEnv("pushover_run", "false").asBoolean());

}

@Bean
public RabbitMQSettings getRabbitMQSettings() {
    return new RabbitMQSettings(
            new RabbitConnectionInfo(
                    getEnv("rabbitmq_host", "127.0.0.1").asString(),
                    getEnv("rabbitmq_port", "5672").asInt(),
                    getEnv("rabbitmq_virtual_host", "/").asString(),
                    getEnv("rabbitmq_username", "guest").asString(),
                    getEnv("rabbitmq_password", "guest").asString()),
            new RabbitConnectionInfo(
                    getEnv("rabbitmq_fullcam_host", "127.0.0.1").asString(),
                    getEnv("rabbitmq_fullcam_port", "5672").asInt(),
                    getEnv("rabbitmq_fullcam_virtual_host", "/").asString(),
                    getEnv("rabbitmq_fullcam_username", "guest").asString(),
                    getEnv("rabbitmq_fullcam_password", "guest").asString()),
            new RabbitQueueInfo(
                    getEnv("rabbitmq_consumer_fullarm_queue", "fcomQueConsumerFullCam").asString(),
                    getEnv("rabbitmq_consumer_fullarm_exc", "fcomExcConsumer").asString(),
                    getEnv("rabbitmq_consumer_fullarm_rk", "fcomRKConsumerFullCam").asString()),
            new RabbitQueueInfo(
                    getEnv("rabbitmq_consumer_fullcam_queue", "foo").asString(),
                    getEnv("rabbitmq_consumer_fullcam_exc", "foo").asString(),
                    getEnv("rabbitmq_consumer_fullcam_rk", "foo").asString()),
            new RabbitQueueInfo(
                    getEnv("rabbitmq_publish_fullcam_queue", "fullcamRequest").asString(),
                    getEnv("rabbitmq_publish_fullcam_exc", "fullcamRequestExc").asString(),
                    getEnv("rabbitmq_consumer_fullarm_rk", "fullcamRequestRK").asString()));
}

@Bean
public RedisSettings getRedisSettings() {
    return new RedisSettings(
            getEnv("redis_host", "localhost").asString(),
            getEnv("redis_port", "6379").asInt(),
            getEnv("redis_password", "123456").asString());
}

@Bean
public Connection getConnection() {
    try {
        return RabbitConnectionFactory.create(getRabbitMQSettings().getConnectionInfo());
    } catch (IOException | TimeoutException e) {
        throw new ShutdownException(e);
    }
}

@Bean
public Logging getLogging() {
    return new DefaultLogger();
}

Класс MessageConsumerServiceImpl

@Component
public class MessageConsumerServiceImpl implements MessageConsumerService {

private final Connection rabbitMQConnection;
private final MessageConsumerFactory consumerFactory;
private final RabbitMQSettings mqSettings;
private final ShutdownService shutdownService;
private final Logging logger;

@Inject
public MessageConsumerServiceImpl(Connection rabbitMQConnection,
                                  MessageConsumerFactory consumerFactory,
                                  RabbitMQSettings mqSettings,
                                  ShutdownService shutdownService,
                                  Logging logger) {
    this.rabbitMQConnection = rabbitMQConnection;
    this.consumerFactory = consumerFactory;
    this.mqSettings = mqSettings;
    this.shutdownService = shutdownService;
    this.logger = logger;
}

@Override
public void startListening() {
    try {
        RabbitQueueInfo commandQueInfo = mqSettings.getRabbitMQFullArmConsumerQueue();
        final String queue = commandQueInfo.getQueue();

        Channel channel = rabbitMQConnection.createChannel();
        channel.queueDeclare(queue, true, false, false, null);
        MessageConsumer commandConsumer = consumerFactory.create(channel);

        logger.info("[MESSAGE-CONSUMER] - Consumindo da fila: {}", queue);
        channel.basicConsume(queue, commandConsumer);

    } catch (IOException e) {
        logger.error("[MESSAGE-CONSUMER] - ShutdownException", e);
        shutdownService.shutdown(e);
    }
}

Класс интеграционного теста

public class MessageConsumerServiceImplIntegrationTest {

private static final Integer RABBITMQ_PORT = 5672;
private static final String RABBITMQ_EXC = "fcomExcConsumer";
private static final String RABBITMQ_QUEUE = "fcomQueFullcamIntegration";
private static final String RABBITMQ_RK = "fcomRKConsumerFullCam";
private static final String REDIS_PASSWORD = "123456";
private static final int REDIS_PORT = 6379;

public static RabbitMQContainer rabbitMqContainer;
public static GenericContainer redisContainer;

static {
    redisContainer = new GenericContainer<>("redis:5.0.3-alpine")
            .withExposedPorts(REDIS_PORT)
            .withCommand("redis-server --requirepass " + REDIS_PASSWORD)
            .waitingFor(Wait.forListeningPort());
    redisContainer.start();
}

static {
    rabbitMqContainer = new RabbitMQContainer()
            .withExposedPorts(RABBITMQ_PORT)
            .withExposedPorts(15672)
            .withUser("guest", "guest")
            .withVhost("/")
            .waitingFor(Wait.forListeningPort());
    rabbitMqContainer.start();
}

@Rule
public final EnvironmentVariables environmentVariables = new EnvironmentVariables()
        .set("rabbitmq_host", rabbitMqContainer.getContainerIpAddress())
        .set("rabbitmq_port", String.valueOf(rabbitMqContainer.getMappedPort(RABBITMQ_PORT)))
        .set("rabbitmq_virtual_host", "/")
        .set("rabbitmq_username", "guest")
        .set("rabbitmq_password", "guest")

        .set("rabbitmq_fullcam_host", rabbitMqContainer.getContainerIpAddress())
        .set("rabbitmq_fullcam_port", String.valueOf(rabbitMqContainer.getMappedPort(RABBITMQ_PORT)))
        .set("rabbitmq_fullcam_virtual_host", "/")
        .set("rabbitmq_fullcam_username", "guest")
        .set("rabbitmq_fullcam_password", "guest")

        .set("rabbitmq_publish_fullcam_queue", "Fullarm.Request")
        .set("rabbitmq_publish_fullcam_exc", "fcomExcFullcam")
        .set("rabbitmq_publish_fullcam_rk", "fcomRKFullcamRequest")

        .set("rabbitmq_consumer_fullarm_queue", RABBITMQ_QUEUE)
        .set("rabbitmq_consumer_fullarm_exc", RABBITMQ_EXC)
        .set("rabbitmq_consumer_fullarm_rk", RABBITMQ_RK)

        .set("rabbitmq_consumer_fullcam_queue", "Fullarm.Reponse")
        .set("rabbitmq_consumer_fullcam_exc", "fcomExcFullarm")
        .set("rabbitmq_consumer_fullcam_rk", "fcomRKFullarmFullcamIntegration")

        .set("f_infra_rabbit_host", "abobora")
        .set("f_infra_exchange", "abobora")
        .set("f_infra_queue", "abobora")
        .set("f_infra_process_id", "0")
        .set("f_infra_description", "abobora")
        .set("f_infra_text", "abobora")
        .set("f_infra_tag", "0")
        .set("f_infra_warning_time", "0")
        .set("f_infra_critical_time", "0")
        .set("f_infra_run", "false")
        .set("f_infra_ka_time", "1")

        .set("redis_host", redisContainer.getContainerIpAddress())
        .set("redis_port", String.valueOf(redisContainer.getMappedPort(REDIS_PORT)))
        .set("redis_password", REDIS_PASSWORD);

private MessageConsumerService instance;
private ApplicationContext context;

@Before
public void setUp() {
    context = new AnnotationConfigApplicationContext(SpringInfraConfig.class);
    instance = context.getBean(MessageConsumerService.class);
}

@Test
public void deveProcessarRequisicao() throws IOException, TimeoutException {
    String message = "{ \"tipoPacote\" : 3, \"descricao_painel\" : \"Casa Mauro Naves\", \"setor_disparado\" : \"Porta da Frente\", \"data_disparo\" : 1587151300000, \"cameras\" : [90851, 90853, 90854] }";

    ConnectionFactory factory = new ConnectionFactory();
    RabbitMQSettings settings = context.getBean(RabbitMQSettings.class);
    factory.setHost(settings.getConnectionInfo().getHost());
    factory.setPort(settings.getConnectionInfo().getPort());
    factory.setVirtualHost(settings.getConnectionInfo().getVirtualHost());
    factory.setAutomaticRecoveryEnabled(true);
    factory.setUsername(settings.getConnectionInfo().getUsername());
    factory.setPassword(settings.getConnectionInfo().getPassword());
    factory.setRequestedHeartbeat(50);
    Connection connection = factory.newConnection();

    RabbitQueueInfo commandQueInfo = settings.getRabbitMQFullArmConsumerQueue();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare(commandQueInfo.getExchange(), "direct", true);
    channel.queueDeclare(commandQueInfo.getQueue(), true, false, false, null);
    channel.queueBind(commandQueInfo.getQueue(), commandQueInfo.getExchange(), commandQueInfo.getRoutingKey());
    channel.basicPublish(commandQueInfo.getExchange(), commandQueInfo.getRoutingKey(), MessageProperties.PERSISTENT_BASIC, message.getBytes());
    channel.close();
    connection.close();

    instance.startListening();

}

Зависимости Gradle

core-build.gradle

dependencies {

   compile group: 'javax.inject', name: 'javax.inject', version: '1'
   compile group: 'org.springframework', name: 'spring-context', version: '5.2.5.RELEASE'

   compile 'com.fasterxml.jackson.core:jackson-core:2.7.1'
   compile 'com.fasterxml.jackson.core:jackson-databind:2.7.1-1'

   compile group: 'br.com.fulltime.fullarm', name: 'cross-cutting-commons', version: '1.13.0'
   compile group: 'br.com.fulltime.fullarm', name: 'constants', version: '1.110.0'
}

infra-build.gradle

dependencies {

   testCompile group: 'junit', name: 'junit', version: '4.12'
   testCompile "org.testcontainers:testcontainers:1.14.1"
   testCompile "org.testcontainers:rabbitmq:1.14.1"
   testCompile group: 'com.github.stefanbirkner', name: 'system-rules', version: '1.19.0'


   compile project(':core')

   compile group: 'br.com.fulltime.fullarm', name: 'infrastructure-commons', version: '1.6.0'
   compile group: 'br.com.fulltime.fullarm', name: 'FInfraJavaLibrary', version: '2.3.0'
   compile group: 'br.com.fulltime.fullarm', name: 'pushover-lib', version: '1.0.0'

   compile group: 'redis.clients', name: 'jedis', version: '3.3.0'
}

Тестовый вывод

Testing started at 08:38 ...
Starting Gradle Daemon...
Gradle Daemon started in 815 ms
> Task :core:compileJava UP-TO-DATE
> Task :core:processResources NO-SOURCE
> Task :core:classes UP-TO-DATE
> Task :core:jar UP-TO-DATE
> Task :infra:compileJava UP-TO-DATE
> Task :infra:processResources NO-SOURCE
> Task :infra:classes UP-TO-DATE
> Task :infra:compileTestJava
> Task :infra:processTestResources NO-SOURCE
> Task :infra:testClasses
> Task :infra:test
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.junit.contrib.java.lang.system.EnvironmentVariables (file:/home/*omited*/.gradle/caches/modules-2/files-2.1/com.github.stefanbirkner/system-rules/1.19.0/d541c9a1cff0dda32e2436c74562e2e4aa6c88cd/system-rules-1.19.0.jar) to field java.util.Collections$UnmodifiableMap.m
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2020-05-14 08:38:35 INFO - [MESSAGE-CONSUMER] - Consumindo da fila: fcomQueFullcamIntegration
WARNING: Please consider reporting this to the maintainers of org.junit.contrib.java.lang.system.EnvironmentVariables
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access     operations
WARNING: All illegal access operations will be denied in a future release
Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.1/userguide/command_line_interface.html#sec:command_line_warnings
BUILD SUCCESSFUL in 22s
5 actionable tasks: 2 executed, 3 up-to-date
08:38:36: Task execution finished ':infra:test --tests     "br.com.fulltime.fullarm.fullcam.integration.infra.consumer.MessageConsumerServiceImplIntegrationTest.deveProcessarRequisicao"'.

У меня больше нет идей о проблеме. Любая помощь приветствуется. заранее спасибо

ОБНОВИТЬ

Я снова написал свой тест, сделав его проще. Я написал один код с Spring-контекстом и окружением, а другой — без Spring-контекста и окружения. ОБЕ НЕ РАБОТАЛИ.

Итак, для тестирования porpuse я закодировал простой Thread#sleep() и знаете что, ОБА теста сработали!

Я думаю, что причина в том, что RabbitMQ DefaultConsumer создает новый поток для потребления сообщений, который выпускает основной поток тестирования, и он был остановлен. Поскольку основной поток был остановлен, все остальные тоже останавливаются.

Итак, я думаю, что у нас здесь проблема с проверкой синхронизма.

Можно ли получить непройденный тест, если тестовый код проверяет значение базы данных, которое должно быть вставлено при выполнении, но во время проверки оно еще не было обработано?


person Guerino Rodella    schedule 14.05.2020    source источник


Ответы (1)


Во-первых, у вас не включено ведение журнала, поэтому трудно сказать, что происходит на самом деле.

Является ли Spring Boot вариантом для вас? Он имеет встроенную поддержку ведения журнала. Или вы специально используете только библиотеку *context?

person Vitaly Chura    schedule 14.05.2020
comment
Я не привык к SpringBoot, я использовал Guice для внедрения зависимостей, но затем я перешел на Spring-контекст для удобства использования. Что касается ведения журнала, ошибка вывода связана с другой библиотекой свойств, для которой я не предоставляю реализацию журнала. Но мой собственный код использует Log4J, который вы можете увидеть в этой строке: 2020-05-14 08:38:35 INFO - [MESSAGE-CONSUMER] - Consumindo da fila: fcomQueFullcamIntegration - person Guerino Rodella; 14.05.2020