У меня есть поток интеграции Spring, который создает сообщения, которые следует хранить в ожидании, пока соответствующий потребитель придет и потребует их.
@Bean
public IntegrationFlow messagesPerCustomerFlow() {
return IntegrationFlows.
from(WebFlux.inboundChannelAdapter("/messages/{customer}")
.requestMapping(r -> r
.methods(HttpMethod.POST)
)
.requestPayloadType(JsonNode.class)
.headerExpression("customer", "#pathVariables.customer")
)
.channel(messagesPerCustomerQueue())
.get();
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
return Pollers.fixedRate(100);
}
@Bean
public QueueChannel messagesPerCustomerQueue() {
return MessageChannels.queue()
.get();
}
Сообщения в очереди должны доставляться как отправленные сервером события через http, как показано ниже.
PublisherSubscription - это просто держатель для Publisher и IntegrationFlowRegistration, последний используется для уничтожения динамически созданного потока, когда он больше не нужен (обратите внимание, что входящее сообщение для GET не имеет содержимого, которое не обрабатывается банкоматом должным образом с помощью Интеграция с Webflux, поэтому необходимо небольшое обходное решение, чтобы получить доступ к переменной пути, помещенной в заголовок customer
):
@Bean
public IntegrationFlow eventMessagesPerCustomer() {
return IntegrationFlows
.from(WebFlux.inboundGateway("/events/{customer}")
.requestMapping(m -> m.produces(TEXT_EVENT_STREAM_VALUE))
.headerExpression("customer", "#pathVariables.customer")
.payloadExpression("''") // neeeded to make handle((p,h) work
)
.log()
.handle((p, h) -> {
String customer = h.get("customer").toString();
PublisherSubscription<JsonNode> publisherSubscription =
subscribeToMessagesPerCustomer(customer);
return Flux.from(publisherSubscription.getPublisher())
.map(Message::getPayload)
.doFinally(signalType ->
publisherSubscription.unsubscribe());
})
.get();
}
Вышеупомянутый запрос для событий, отправленных сервером, динамически регистрирует поток, который подписывается на канал очереди по запросу с помощью выборочный потребитель, реализованный с помощью фильтра с throwExceptionOnRejection(true)
. Следуя спецификации для цепочки обработчиков сообщений, что должно гарантировать, что сообщение будет предлагаться всем потребителям, пока один из них не примет его.
public PublisherSubscription<JsonNode> subscribeToMessagesPerCustomer(String customer) {
IntegrationFlowBuilder flow = IntegrationFlows.from(messagesPerCustomerQueue())
.filter("headers.customer=='" + customer + "'",
filterEndpointSpec -> filterEndpointSpec.throwExceptionOnRejection(true));
Publisher<Message<JsonNode>> messagePublisher = flow.toReactivePublisher();
IntegrationFlowRegistration registration = integrationFlowContext.registration(flow.get())
.register();
return new PublisherSubscription<>(messagePublisher, registration);
}
Эта конструкция в принципе работает, но со следующими проблемами:
- Сообщения, отправленные в очередь при отсутствии подписчиков, приводят к
MessageDeliveryException: Dispatcher has no subscribers for channel 'application.messagesPerCustomerQueue'
- Сообщения, отправленные в очередь при отсутствии подходящего подписчика, приводят к
AggregateMessageDeliveryException: All attempts to deliver Message to MessageHandlers failed
.
Я хочу, чтобы сообщение оставалось в очереди и неоднократно предлагалось всем подписчикам до тех пор, пока оно не будет использовано или не истечет (надлежащий выборочный потребитель). Как я могу это сделать?