Асинхронный запрос / ответ Spring Integration 4

Я пытаюсь написать простой поток сообщений с использованием DSL API Spring Integration v4, который будет выглядеть следующим образом:

       -> in.ch -> Processing -> JmsGatewayOut -> JMS_OUT_QUEUE
Gateway
       <- out.ch <- Processing <- JmsGatewayIn <- JMS_IN_QUEUE

Поскольку запрос / ответ является асинхронным, когда я вставляю сообщение через начальный шлюз, сообщение проходит весь путь до JMS_OUT_QUEUE. Помимо этого потока сообщений ответное сообщение помещается обратно в JMS_IN_QUEUE, которое затем принимается JmsGatewayIn. На этом этапе сообщение обрабатывается и помещается в out.ch (я знаю, что ответ попадает в out.ch, потому что у меня есть перехватчик регистратора, который регистрирует помещаемое туда сообщение), но шлюз никогда не получает ответ.

Вместо ответа система за пределами этого потока сообщений, которая приняла сообщение из JMS_OUT_QUEUE и поместила ответ в JMS_IN_QUEUE, получает javax.jms.MessageFormatException: MQJMS1061: Unable to deserialize object на собственном JmsOutboundgateway (я думаю, что она не может десериализовать объект ответа jms, просматривая журналы ).

У меня явно что-то неправильно настроено, но я точно не знаю, что. Кто-нибудь знает, что мне не хватает?

Работа с spring-integration-core-4.0.3.RELEASE, spring-integration-jms-4.0.3.RELEASE, spring-integration-java-dsl-1.0.0.M2, spring-jms-4.0.6.RELEASE.

Мой шлюз настроен следующим образом:

@MessagingGateway
public interface WsGateway {

    @Gateway(requestChannel = "in.ch", replyChannel = "out.ch", 
        replyTimeout = 45000)
    AResponse process(ARequest request);
}

Мой поток интеграции настроен следующим образом:

@Configuration
@EnableIntegration
@IntegrationComponentScan
@ComponentScan
public class IntegrationConfig {

    @Bean(name = "in.ch")
    public DirectChannel inCh() {
        return new DirectChannel();
    }

    @Bean(name = "out.ch")
    public DirectChannel outCh() {
        return new DirectChannel();
    }   

    @Autowired
    private MQQueueConnectionFactory mqConnectionFactory;

    @Bean
    public IntegrationFlow requestFlow() {

        return IntegrationFlows.from("in.ch")
                .handle("processor", "processARequest")
                .handle(Jms.outboundGateway(mqConnectionFactory)
                        .requestDestination("JMS_OUT_QUEUE")
                        .correlationKey("JMSCorrelationID")
                .get();
    }

    @Bean
    public IntegrationFlow responseFlow() {

        return IntegrationFlows.from(Jms.inboundGateway(mqConnectionFactory)
                .destination("JMS_IN_QUEUE"))
                .handle("processor", "processAResponse")
                .channel("out.ch")
                .get();
    }
}

Спасибо за любую помощь в этом, личка.


person Going Bananas    schedule 16.07.2014    source источник


Ответы (1)


Прежде всего, ваша конфигурация плохая:

  1. Поскольку вы начинаете поток с WsGateway#process, вам действительно стоит дождаться ответа там. Возможность запроса / ответа шлюза основана на TemporaryReplyChannel, который помещается в headers как несериализуемое значение.

  2. Пока вы ждете, полагайтесь на этот шлюз, на самом деле нет причин предоставлять replyChannel, если вы не собираетесь выполнять какую-то логику публикации-подписки для ответа.

  3. Отправляя сообщение в очередь JMS, вы должны понимать, что потребительская часть может быть отдельным удаленным приложением. И последний может ничего не знать о вашем out.ch.

  4. Возможность JMS-запроса / ответа действительно основана на JMSCorrelationID, но этого недостаточно. Еще одна вещь - это ReplyTo JMS-заголовок. Следовательно, если вы собираетесь отправить ответ от потребителя, вам действительно стоит полагаться только на JmsGatewayIn материал.

Поэтому я бы изменил ваш код на этот:

@MessagingGateway
public interface WsGateway {

    @Gateway(requestChannel = "in.ch", replyTimeout = 45000)
    AResponse process(ARequest request);
}

@Configuration
@EnableIntegration
@IntegrationComponentScan
@ComponentScan
public class IntegrationConfig {

    @Bean(name = "in.ch")
    public DirectChannel inCh() {
        return new DirectChannel();
    }

    @Autowired
    private MQQueueConnectionFactory mqConnectionFactory;

    @Bean
    public IntegrationFlow requestFlow() {
        return IntegrationFlows.from("in.ch")
                .handle("processor", "processARequest")
                .handle(Jms.outboundGateway(mqConnectionFactory)
                        .requestDestination("JMS_OUT_QUEUE")
                        .replyDestination("JMS_IN_QUEUE"))
                .handle("processor", "processAResponse")
                .get();
    }

}

Дайте мне знать, если вам это подходит, или попытайтесь объяснить, почему вы используете two-way шлюзы в одном one-way случаях. Может быть, Jms.outboundAdapter() и Jms.inboundAdapter() вам больше подходят?

ОБНОВЛЕНИЕ

Как использовать <header-channels-to-string> из Java DSL:

.enrichHeaders(e -> e.headerChannelsToString())
person Artem Bilan    schedule 16.07.2014
comment
Привет, ваше решение правильное, спасибо. Сначала я попытался заменить шлюзы ввода / вывода Jms на адаптеры ввода / вывода Jms, но у меня возникли проблемы с их настройкой. Затем я попробовал только Jms Out Gateway с replyDestination, как в вашем решении, и это сработало. Я не удалил исходный канал replyChannel @ Gateway, который, как вы упомянули, не нужен. Спасибо! - person Going Bananas; 16.07.2014
comment
Еще раз здравствуйте, мне нужно вернуться к этому сообщению ... У меня есть требование, в котором я считаю, что мне нужно использовать отдельные входящие / исходящие адаптеры JMS, чтобы обеспечить асинхронные ответы. Для этого я снова установил replyChannel = out.ch на WsGateway, переключил Jms.outboundGateway на Jms.outboundAdapter и Jms.inboundAdapter и установил заголовок JMSCorrelationID в сообщении перед отправкой, но когда ответ возвращается и помещается в out.ch, WsGateway не принимает ответное сообщение? Моя схема SI на самом деле немного сложнее, чем показано здесь ... - person Going Bananas; 28.08.2014
comment
Попробуйте использовать <header-enricher> с <header-channels-to-string> перед отправкой запроса - person Artem Bilan; 28.08.2014
comment
Как мне использовать header-channels-to-string в новой форме DSL? Это просто случай установки его как имени заголовка с нулевым значением? - person Going Bananas; 28.08.2014
comment
Ой! Извините, я забыл, что вы уже используете DSL. Я добавляю образец к своему ответу - person Artem Bilan; 28.08.2014
comment
Я использую spring-integration-java-dsl: 1.0.0.M2, для которого, похоже, нет e.headerChannelsToString(). Это включено в более новую версию пакета? M2 - это последняя версия, которую я вижу в репозитории maven. ru / artifact / org.springframework.integration /. - person Going Bananas; 28.08.2014
comment
Да ты прав. Итак, вам нужно сделать следующее обходное решение: github.com/spring-projects/spring-integration-extensions/blob/ - person Artem Bilan; 28.08.2014
comment
Превосходно. При использовании обходного пути ответ будет получен исходным WsGateway. Я также вижу, что ответ содержит необходимый заголовок replyChannel в следующей форме: replyChannel=fd972c0f-ec93-4dad-b6f5-ea7ede3b98ed:1. Еще раз спасибо, Артем, и, надеюсь, я буду следить за появлением e.headerChannelsToString() в следующей вехе DSL! - person Going Bananas; 28.08.2014
comment
Большой! Он будет выпущен на следующей неделе, незадолго до SpringOne. Следите за новостями и новостями Spring IO News and Feeds - person Artem Bilan; 28.08.2014