Spring Integration Flow DSL с SQS и Reactive

Как настроить реактивный поток с помощью DSL для следующих шагов:

  1. Получите сообщение SQS, используя SqsMessageDrivenChannelAdapter
  2. Проверьте сообщение Json [JsonSchemaValidator класс с validate методом]
  3. Преобразуйте json в объекты
  4. Передайте объекты активатору службы (BusinessService: бизнес-логика, конечный автомат)
  5. Сохраните исходящий адаптер объектов R2DBC

Я смотрел на это: https://github.com/spring-projects/spring-integration/blob/master/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java

В приведенном выше примере созданы выделенные потоки, которые возвращают Publisher, а в тестах издателями являются subscribed. Однако мой поток будет запущен, когда SqsMessageDrivenChannelAdapter внесет сообщение в канал.

Как добиться конфигурации реактивного потока для сценария, описанного выше в шагах с 1 по 5?

Обновление: добавлен образец кода

   @Bean
    public IntegrationFlow importFlow()  {
        IntegrationFlows.from(sqsInboundChannel())
                .handle((payload, messageHeaders) -> jsonSchemaValidator.validate(payload.toString()))
                .transform(Transformers.fromJson(Entity.class))
                .handle((payload, messageHeaders) ->businessService.process((Entity) payload))
                .handle(
                        Jpa.outboundAdapter(this.entityManagerFactory)
                                .entityClass(Entity)
                                .persistMode(PersistMode.PERSIST),
                        ConsumerEndpointSpec::transactional)
                .get();
    }

    @Bean
    public MessageProducer sqsMessageDrivenChannelAdapter() {
        SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter =
                new SqsMessageDrivenChannelAdapter(asyncSqsClient, queueName);
        sqsMessageDrivenChannelAdapter.setAutoStartup(true);
        sqsMessageDrivenChannelAdapter.setOutputChannel(sqsInboundChannel());
        return sqsMessageDrivenChannelAdapter;
    }

    @Bean
    public MessageChannel sqsInboundChannel() {
        return MessageChannels.flux().get();
    }

Обновление 2: JPA перемещен в ветку различий с использованием канала исполнителя

   @Bean
    public IntegrationFlow importFlow()  {
        IntegrationFlows.from(sqsInboundChannel())
                .handle((payload, messageHeaders) -> jsonSchemaValidator.validate(payload.toString()))
                .transform(Transformers.fromJson(Entity.class))
                .handle((payload, messageHeaders) ->businessService.process((Entity) payload))
                .channel(persistChannel())
                .handle(
                        Jpa.outboundAdapter(this.entityManagerFactory)
                                .entityClass(Entity)
                                .persistMode(PersistMode.PERSIST),
                        ConsumerEndpointSpec::transactional)
                .get();
    }

    @Bean
    public MessageProducer sqsMessageDrivenChannelAdapter() {
        SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter =
                new SqsMessageDrivenChannelAdapter(asyncSqsClient, queueName);
        sqsMessageDrivenChannelAdapter.setAutoStartup(true);
        sqsMessageDrivenChannelAdapter.setOutputChannel(sqsInboundChannel());
        return sqsMessageDrivenChannelAdapter;
    }

    @Bean
    public MessageChannel sqsInboundChannel() {
        return MessageChannels.flux().get();
    }

    @Bean
    public MessageChannel persistChannel() {
        return MessageChannels.executor(Executors.newCachedThreadPool()).get();
    }

comment
а какой код вы написали?   -  person Toerktumlare    schedule 09.03.2021
comment
Я обновил исходный пост кодом   -  person Nikhil    schedule 09.03.2021


Ответы (1)


Вероятно, вам нужно поближе познакомиться с тем, что у нас есть для реактивных потоков в Spring Integration: https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#reactive-streams

Пример, который вы показываете с этим тестовым классом, полностью не имеет отношения к вашему варианту использования. В этом тесте мы пытаемся охватить некоторые API, которые мы предоставляем в Spring Integration, вроде модульных тестов. Это не имеет ничего общего со всем потоком.

На самом деле ваш вариант использования - это просто поток полного черного ящика, начинающийся с SQS listener и заканчивающийся R2DBC. Следовательно, в вашем потоке нет смысла пытаться преобразовать его часть в Publisher, а затем вернуть его в другую часть потока: вы не собираетесь каким-то образом отслеживать и подписываться на это Publisher самостоятельно.

Вы можете подумать о том, чтобы поместить FluxMessageChannel между конечными точками в вашем потоке, но это все равно не имеет смысла для вашего варианта использования. Он не будет полностью реактивным, как вы ожидаете, только потому, что org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer не блокирует поток потребителя, чтобы быть готовым к противодавлению со стороны нижестоящего.

Единственная действительно реактивная часть вашего потока - это адаптер исходящего канала R2DBC, но, вероятно, он не принесет вам слишком большой пользы, потому что источник данных не является реактивным.

Как я уже сказал: вы можете попробовать поместить channel(channels -> channels.flux()) сразу после определения SqsMessageDrivenChannelAdapter, чтобы начать реактивный поток с этой точки. В то же время вы должны попытаться установить maxNumberOfMessages на 1, чтобы попытаться заставить его ждать свободного места, прежде чем вытащить следующее сообщение из SQS.

person Artem Bilan    schedule 09.03.2021
comment
Я добавил в сообщение образец кода. В приведенном выше потоке, поскольку sqsInboundChannel является магнитным каналом, он должен запустить реактивный поток, а затем каждый .handle привязан к sqsInboundChannel. Это правильно ?. В этом сценарии, то есть в приведенном выше коде, должен ли каждый обработчик принимать и возвращать Mono? Например, jsonSchemaValidator.validate? Также в данный момент в приведенном выше коде есть 2 нереактивных компонента - SimpleMessageListenerContainer и исходящий адаптер JPA. Правильно ли приведенное выше определение потока до тех пор, пока они не будут заменены их реактивными аналогами? - person Nikhil; 09.03.2021
comment
Нет, нормально иметь обработчики ответов в памяти как обычные. Они вызываются из своего рода map() операции Flux. Часть JPA - это что-то новое, и это определенно другая история. Вы можете не пройти проверку Blockhound, если не отсортируете этот адаптер канала по другому потоку. К счастью, это легко сделать с ExecutorChannel в Spring Integrtion. С другой стороны, если он не является реактивным там, вероятно, нет полного смысла преследовать реактивность, поскольку у вас есть он только посередине, где вы все равно выполняете логику в памяти, так что это может произойти в потоке SQS. - person Artem Bilan; 09.03.2021
comment
Спасибо ! Я добавил ExecutorChannel в Обновлении 2 этого сообщения. Кроме того, как будет maxNumberOfMessages на 1? Будет ли он sqs adapter опрашивать по одному сообщению из очереди, пытаясь понять это. - person Nikhil; 09.03.2021
comment
Верно. Одно сообщение за раз. Чтобы не тянуть слишком много к памяти, прежде чем вы начнете их обрабатывать. Просто потому, что в SimpleMessageListenerContainer для SQS слишком много смещений потоков, я был бы как можно осторожнее, чтобы не потерять данные между ними. - person Artem Bilan; 09.03.2021
comment
Не влияет ли опрос одного сообщения за раз? Это на уровне потока? Например, если у меня в очереди 100 сообщений, будет ли 100 опросов с использованием одного потока? А как насчет длительного опроса, в этом случае я могу опросить только 1 сообщение до waitTimeOut? Извините за основные вопросы, и если это не по теме - person Nikhil; 09.03.2021
comment
Я думаю, что идет. Глядя на исходный код SimpleMessageListenerContainer для SQS, я бы полностью держался подальше от него. Он планирует каждое отдельное сообщение исполнителю задачи и возвращается для получения дополнительных сообщений. Так что риск потерять сообщения очень высок. - person Artem Bilan; 09.03.2021