Spring-integration: отправка сообщений из очереди выборочному потребителю

У меня есть поток интеграции Spring, который создает сообщения, которые следует хранить в ожидании, пока соответствующий потребитель придет и потребует их.

@Bean
public IntegrationFlow messagesPerCustomerFlow() {
    return IntegrationFlows.
            from(WebFlux.inboundChannelAdapter("/messages/{customer}")
                    .requestMapping(r -> r
                            .methods(HttpMethod.POST)
                    )
                    .requestPayloadType(JsonNode.class)
                    .headerExpression("customer", "#pathVariables.customer")
            )
            .channel(messagesPerCustomerQueue()) 
            .get();
}

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerSpec poller() {
    return Pollers.fixedRate(100);
}

@Bean
public QueueChannel messagesPerCustomerQueue() {
    return MessageChannels.queue()
            .get();
}

Сообщения в очереди должны доставляться как отправленные сервером события через http, как показано ниже.

PublisherSubscription - это просто держатель для Publisher и IntegrationFlowRegistration, последний используется для уничтожения динамически созданного потока, когда он больше не нужен (обратите внимание, что входящее сообщение для GET не имеет содержимого, которое не обрабатывается банкоматом должным образом с помощью Интеграция с Webflux, поэтому необходимо небольшое обходное решение, чтобы получить доступ к переменной пути, помещенной в заголовок customer):

@Bean
public IntegrationFlow eventMessagesPerCustomer() {
    return IntegrationFlows
       .from(WebFlux.inboundGateway("/events/{customer}")
            .requestMapping(m -> m.produces(TEXT_EVENT_STREAM_VALUE))
            .headerExpression("customer", "#pathVariables.customer")
            .payloadExpression("''") // neeeded to make handle((p,h) work
       )
       .log()
       .handle((p, h) -> {
           String customer = h.get("customer").toString();
           PublisherSubscription<JsonNode> publisherSubscription =
               subscribeToMessagesPerCustomer(customer);
           return Flux.from(publisherSubscription.getPublisher())
                   .map(Message::getPayload)
                   .doFinally(signalType ->
                      publisherSubscription.unsubscribe());
       })
       .get();
}

Вышеупомянутый запрос для событий, отправленных сервером, динамически регистрирует поток, который подписывается на канал очереди по запросу с помощью выборочный потребитель, реализованный с помощью фильтра с throwExceptionOnRejection(true). Следуя спецификации для цепочки обработчиков сообщений, что должно гарантировать, что сообщение будет предлагаться всем потребителям, пока один из них не примет его.

public PublisherSubscription<JsonNode> subscribeToMessagesPerCustomer(String customer) {
    IntegrationFlowBuilder flow = IntegrationFlows.from(messagesPerCustomerQueue())
            .filter("headers.customer=='" + customer + "'",
                    filterEndpointSpec -> filterEndpointSpec.throwExceptionOnRejection(true));
    Publisher<Message<JsonNode>> messagePublisher = flow.toReactivePublisher();

    IntegrationFlowRegistration registration = integrationFlowContext.registration(flow.get())
            .register();

    return new PublisherSubscription<>(messagePublisher, registration);
}

Эта конструкция в принципе работает, но со следующими проблемами:

  • Сообщения, отправленные в очередь при отсутствии подписчиков, приводят к MessageDeliveryException: Dispatcher has no subscribers for channel 'application.messagesPerCustomerQueue'
  • Сообщения, отправленные в очередь при отсутствии подходящего подписчика, приводят к AggregateMessageDeliveryException: All attempts to deliver Message to MessageHandlers failed.

Я хочу, чтобы сообщение оставалось в очереди и неоднократно предлагалось всем подписчикам до тех пор, пока оно не будет использовано или не истечет (надлежащий выборочный потребитель). Как я могу это сделать?


person dschulten    schedule 29.05.2018    source источник


Ответы (1)


обратите внимание, что входящее сообщение для GET не имеет содержимого, которое не обрабатывается ATM должным образом интеграцией Webflux

Я не понимаю этого беспокойства.

WebFluxInboundEndpoint работает по следующему алгоритму:

if (isReadable(request)) {
   ...
else {
    return (Mono<T>) Mono.just(exchange.getRequest().getQueryParams());
}

Где GET метод действительно переходит в ветку else. И payload отправляемого сообщения - это MultiValueMap. А также недавно мы вместе с вами исправили проблему для POST, который уже выпущен в версии 5.0.5: https://jira.spring.io/browse/INT-4462

У диспетчера нет подписчиков

На QueueChannel такого не может быть в принципе. Там вообще нет диспетчера. Это просто очередь, и отправитель предлагает сообщение для сохранения. Вам не хватает чего-то еще, чем вы могли бы поделиться с нами. Но давайте называть вещи своими именами: messagesPerCustomerQueue не QueueChannel в вашем приложении.

ОБНОВЛЕНИЕ

Касательно:

Я хочу, чтобы сообщение оставалось в очереди и неоднократно предлагалось всем подписчикам до тех пор, пока оно не будет использовано или не истечет (надлежащий выборочный потребитель).

Мы видим только PollableJmsChannel на основе встроенного ActiveMQ для соблюдения TTL для сообщений. Как потребитель этой очереди вы должны иметь PublishSubscribeChannel с setMinSubscribers(1), чтобы MessagingTemplate выдавал MessageDeliveryException, когда еще нет подписчиков. Таким образом, транзакция JMS будет отменена, и сообщение вернется в очередь для следующего цикла опроса.

Проблема с in-memory QueueChannel, заключающаяся в том, что не выполняется повторная доставка транзакций и сообщение, однажды полученное из этой очереди, будет потеряно.

Другой вариант, похожий на JMS (транзакционный), - это JdbcChannelMessageStore для QueueChannel. Хотя таким образом у нас нет функции TTL ...

person Artem Bilan    schedule 29.05.2018
comment
Как вы говорите, полезная нагрузка сообщения GET становится пустой многозначной картой. Но это дает забавный эффект: полезная нагрузка, а не заголовок копируется в args[1] для handle() в строке 103/104 из LambdaMessageProcessor, так что и p, и h содержат пустую карту. Я открою вопрос Jira для дальнейшего обсуждения этого вопроса. Что касается очереди: на самом деле это канал очереди с опросчиком, я добавил конфигурацию компонента выше. Я могу зафиксировать образец в github.com/spring-projects/spring- интеграция-образцы / тянуть / 225 - person dschulten; 29.05.2018
comment
Ой! Это интересно ... Да, пожалуйста, поднимите JIRA по этому поводу. - person Artem Bilan; 29.05.2018
comment
Пожалуйста, загляните в мое ОБНОВЛЕНИЕ в ответе. - person Artem Bilan; 29.05.2018
comment
Спасибо за JIRA! Посмотрю позже. Пожалуйста, обратите внимание на ОБНОВЛЕНИЕ в моем ответе на вашу исходную проблему. - person Artem Bilan; 29.05.2018