Прослушивание очереди сообщений SQS с Spring Boot не работает со стандартной конфигурацией

Я не могу создать прослушиватель очереди работ с помощью Spring Boot и SQS (сообщение отправляется и отображается в пользовательском интерфейсе SQS)

@MessageMapping или @SqsListener не работают

Java: 11
Spring Boot: 2.1.7
Зависимость: spring-cloud-aws-messaging

Это мой конфиг

@Configuration
@EnableSqs
public class SqsConfig {

    @Value("#{'${env.name:DEV}'}")
    private String envName;

    @Value("${cloud.aws.region.static}")
    private String region;

    @Value("${cloud.aws.credentials.access-key}")
    private String awsAccessKey;

    @Value("${cloud.aws.credentials.secret-key}")
    private String awsSecretKey;

    @Bean
    public Headers headers() {
        return new Headers();
    }

    @Bean
    public MessageQueue queueMessagingSqs(Headers headers,
                                          QueueMessagingTemplate queueMessagingTemplate) {
        Sqs queue = new Sqs();
        queue.setQueueMessagingTemplate(queueMessagingTemplate);
        queue.setHeaders(headers);
        return queue;
    }

    private ResourceIdResolver getResourceIdResolver() {
        return queueName -> envName + "-" + queueName;
    }

    @Bean
    public DestinationResolver destinationResolver(AmazonSQSAsync amazonSQSAsync) {
        DynamicQueueUrlDestinationResolver destinationResolver = new DynamicQueueUrlDestinationResolver(
                amazonSQSAsync,
                getResourceIdResolver());
        destinationResolver.setAutoCreate(true);
        return destinationResolver;
    }

    @Bean
    public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSQSAsync,
                                                         DestinationResolver destinationResolver) {
        return new QueueMessagingTemplate(amazonSQSAsync, destinationResolver, null);
    }

    @Bean
    public QueueMessageHandlerFactory queueMessageHandlerFactory() {
        QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
        MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
        messageConverter.setStrictContentTypeMatch(false);
        factory.setArgumentResolvers(Collections.singletonList(new PayloadArgumentResolver(messageConverter)));
        return factory;
    }

    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSqs);
        factory.setMaxNumberOfMessages(10);
        factory.setWaitTimeOut(2);
        return factory;
    }

}

Я также заметил, что org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory и org.springframework.cloud.aws.messaging.config.annotation.SqsConfiguration запускаются при запуске

И мой тест

@RunWith(SpringJUnit4ClassRunner.class)
public class ListenTest {

    @Autowired
    private MessageQueue queue;

    private final String queueName = "test-queue-receive";

    private String result = null;

    @Test
    public void test_listen() {
        // given
        String data = "abc";

        // when
        queue.send(queueName, data).join();

        // then
        Awaitility.await()
                .atMost(10, TimeUnit.SECONDS)
                .until(() -> Objects.nonNull(result));

        Assertions.assertThat(result).equals(data);
    }

    @MessageMapping(value = queueName)
    public void receive(String data) {
        this.result = data;
    }
}

Вы думаете, что что-то не так?

Я создаю репо, например: (https://github.com/mmaryo/java-sqs-test)
В папке test измените учетные данные aws в application.yml.
Затем запустите тесты.


person Maryo    schedule 23.08.2019    source источник
comment
Пожалуйста, уточните, чем не работает. Что конкретно происходит? Есть ли где-нибудь сообщение об ошибке или сообщение в очереди ошибок SQS?   -  person chrylis -cautiouslyoptimistic-    schedule 23.08.2019
comment
Сообщение в очереди SQS остается в очереди, и метод receive () никогда не запускается. Вроде @MessageMapping(value = queueName) очередь не слушают?   -  person Maryo    schedule 25.08.2019
comment
Я не уверен насчет этого инструмента. Я использовал только @SqsListener.   -  person chrylis -cautiouslyoptimistic-    schedule 25.08.2019
comment
@SqsListener тоже не работает: /   -  person Maryo    schedule 26.08.2019
comment
В QueueMessageHandler SqsListener sqsListenerAnnotation = AnnotationUtils.findAnnotation(method, SqsListener.class); всегда пусто. Таким образом, Spring dot не сканирует @Sqs в @SpringBootTest и не сканирует также в @Compent   -  person Maryo    schedule 27.08.2019


Ответы (2)


У меня была такая же проблема при использовании пакета spring-cloud-aws-messaging, но затем я использовал URL-адрес очереди в аннотации @SqsListener вместо имени очереди, и это сработало.

@SqsListener(value = { "https://full-queue-URL" }, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receive(String message) {
     // do something
}

Кажется, вы можете использовать имя очереди при использовании пакета spring-cloud-starter-aws-messaging. Я считаю, что есть некоторая конфигурация, которая позволяет использовать имя очереди вместо URL-адреса, если вы не хотите использовать стартовый пакет.

РЕДАКТИРОВАТЬ: Я заметил, что для региона по умолчанию используется us-west-2, несмотря на то, что я указал us-east-1 в моем файле свойств. Затем я создал bean-компонент RegionProvider и установил в нем регион us-east-1, и теперь, когда я использую имя очереди в @SqsMessaging, оно обнаруживается и правильно разрешается в URL-адрес в коде фреймворка.

person Niemanator    schedule 19.09.2019
comment
вы можете опубликовать фрагмент для метода RegionProvider @Bean? - person heug; 17.12.2019
comment
@heug @Bean public RegionProvider regionProvider() { return () -> Region.getRegion(Regions.fromName(queueRegion)); } - person Julian; 12.05.2020

вам нужно будет использовать аннотацию @Primary, это то, что у меня сработало:

@Autowired(required = false)
private AWSCredentialsProvider awsCredentialsProvider;

@Autowired
private AppConfig appConfig;

@Bean
public QueueMessagingTemplate getQueueMessagingTemplate() {
    return new QueueMessagingTemplate(sqsClient());
}

@Primary
@Bean
public AmazonSQSAsync sqsClient() {
    AmazonSQSAsyncClientBuilder builder = AmazonSQSAsyncClientBuilder.standard();

    if (this.awsCredentialsProvider != null) {
        builder.withCredentials(this.awsCredentialsProvider);
    }

    if (appConfig.getSqsRegion() != null) {
        builder.withRegion(appConfig.getSqsRegion());
    } else {
        builder.withRegion(Regions.DEFAULT_REGION);
    }

    return builder.build();
}

build.gradle нужны эти зависимости:

implementation("org.springframework.cloud:spring-cloud-starter-aws:2.2.0.RELEASE")
implementation("org.springframework.cloud:spring-cloud-aws-messaging:2.2.0.RELEASE")
person heug    schedule 17.12.2019
comment
что такое AppConfig? - person Evan Gertis; 20.08.2020
comment
@EvanGert - это ссылка на bean-компонент, содержащий переменные env. - person heug; 09.01.2021