Как настроить реактивный поток с помощью DSL для следующих шагов:
- Получите сообщение SQS, используя
SqsMessageDrivenChannelAdapter
- Проверьте сообщение Json [
JsonSchemaValidator
класс сvalidate
методом] - Преобразуйте json в объекты
- Передайте объекты активатору службы (
BusinessService
: бизнес-логика, конечный автомат) - Сохраните исходящий адаптер объектов R2DBC
В приведенном выше примере созданы выделенные потоки, которые возвращают Publisher
, а в тестах издателями являются subscribed
. Однако мой поток будет запущен, когда SqsMessageDrivenChannelAdapter
внесет сообщение в канал.
Как добиться конфигурации реактивного потока для сценария, описанного выше в шагах с 1 по 5?
Обновление: добавлен образец кода
@Bean
public IntegrationFlow importFlow() {
IntegrationFlows.from(sqsInboundChannel())
.handle((payload, messageHeaders) -> jsonSchemaValidator.validate(payload.toString()))
.transform(Transformers.fromJson(Entity.class))
.handle((payload, messageHeaders) ->businessService.process((Entity) payload))
.handle(
Jpa.outboundAdapter(this.entityManagerFactory)
.entityClass(Entity)
.persistMode(PersistMode.PERSIST),
ConsumerEndpointSpec::transactional)
.get();
}
@Bean
public MessageProducer sqsMessageDrivenChannelAdapter() {
SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter =
new SqsMessageDrivenChannelAdapter(asyncSqsClient, queueName);
sqsMessageDrivenChannelAdapter.setAutoStartup(true);
sqsMessageDrivenChannelAdapter.setOutputChannel(sqsInboundChannel());
return sqsMessageDrivenChannelAdapter;
}
@Bean
public MessageChannel sqsInboundChannel() {
return MessageChannels.flux().get();
}
Обновление 2: JPA перемещен в ветку различий с использованием канала исполнителя
@Bean
public IntegrationFlow importFlow() {
IntegrationFlows.from(sqsInboundChannel())
.handle((payload, messageHeaders) -> jsonSchemaValidator.validate(payload.toString()))
.transform(Transformers.fromJson(Entity.class))
.handle((payload, messageHeaders) ->businessService.process((Entity) payload))
.channel(persistChannel())
.handle(
Jpa.outboundAdapter(this.entityManagerFactory)
.entityClass(Entity)
.persistMode(PersistMode.PERSIST),
ConsumerEndpointSpec::transactional)
.get();
}
@Bean
public MessageProducer sqsMessageDrivenChannelAdapter() {
SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter =
new SqsMessageDrivenChannelAdapter(asyncSqsClient, queueName);
sqsMessageDrivenChannelAdapter.setAutoStartup(true);
sqsMessageDrivenChannelAdapter.setOutputChannel(sqsInboundChannel());
return sqsMessageDrivenChannelAdapter;
}
@Bean
public MessageChannel sqsInboundChannel() {
return MessageChannels.flux().get();
}
@Bean
public MessageChannel persistChannel() {
return MessageChannels.executor(Executors.newCachedThreadPool()).get();
}