Spring Integration JMS создает очередь ActiveMQ вместо темы

Я пытаюсь использовать брокера ActiveMQ для доставки сообщения двум потребителям, слушающим автоматическую тему, с использованием средств интеграции Spring.

Вот мои компоненты конфигурации (общие для издателей и подписчиков):

@Value("${spring.activemq.broker-url}")
String brokerUrl;

@Value("${spring.activemq.user}")
String userName;

@Value("${spring.activemq.password}")
String password;

@Bean
public ConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    connectionFactory.setBrokerURL(brokerUrl);
    connectionFactory.setUserName(userName);
    connectionFactory.setPassword(password);
    return connectionFactory;
}

@Bean
public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory,
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setPubSubDomain(true); //!!
    configurer.configure(factory, connectionFactory);
    return factory;
}

@Bean
public JmsTemplate jmsTemplate() {
    JmsTemplate template = new JmsTemplate();
    template.setConnectionFactory(connectionFactory());
    template.setPubSubDomain(true); //!!
    return template;
}

Вот бобы для потребителей:

@Bean(name = "jmsInputChannel")
public MessageChannel jmsInputChannel() {
    return new PublishSubscribeChannel();
}

@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {        
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination("myTopic"))
            .channel("jmsInputChannel").get();
}

//Consumes the message.
@ServiceActivator(inputChannel="jmsInputChannel")
public void receive(String msg){
    System.out.println("Received Message: " + msg);
}

А это фасоль для производителя:

@Bean(name = "jmsOutputChannel")
public MessageChannel jmsOutputChannel() {
    return new PublishSubscribeChannel();
}

@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() {
    return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(connectionFactory())
            .destination("myTopic")
    ).get();
}



private static int counter = 1;

@Scheduled(initialDelay=5000, fixedDelay=2000)
public void send() {
     String s = "Message number " + counter;
     counter++;
     jmsOutputChannel().send(MessageBuilder.withPayload(s).build());
}

Я НЕ использую встроенный брокер ActiveMQ. Я использую одного брокера, одного производителя и двух потребителей, каждый в своем собственном (докер) контейнере.

Моя проблема в том, что, хотя я вызвал setPubSubDomain(true) как для JmsListenerContainerFactory, так и для JmsTemplate, мои «темы» ведут себя как очереди: один потребитель печатает все сообщения с четными номерами, а другой - все сообщения с нечетными номерами.

Фактически, получив доступ к веб-интерфейсу ActiveMQ, я вижу, что мои «темы» (т.е. на странице /topics.jsp) называются ActiveMQ.Advisory.Consumer.Queue.myTopic и ActiveMQ.Advisory.Producer.Queue.myTopic, а «myTopic» действительно появляется на странице очередей (т.е. /queues.jsp) .

Узлы запускаются в следующем порядке:

  • Брокер AMQ
  • Потребитель 1
  • Потребитель 2
  • Режиссер

Первая "тема", которая создается, - это ActiveMQ.Advisory.Consumer.Queue.myTopic, в то время как продюсерская тема, очевидно, появляется только после того, как продюсер начал.

Я не эксперт по ActiveMQ, поэтому, возможно, тот факт, что мои "темы" производителя / потребителя названы ".Queue", просто вводит в заблуждение. Однако я получаю семантику, описанную в официальная документация ActiveMQ для очередей, а не тем.

Я также уже рассмотрел этот вопрос, однако все мои задействованные каналы уже относятся к типу PublishSubscribeChannel.

Мне нужно добиться, чтобы все сообщения были доставлены всем моим (возможно,> 2) потребителям.

ОБНОВЛЕНИЕ: я забыл упомянуть, что мой файл application.properties уже содержит spring.jms.pub-sub-domain=true вместе с другими настройками.

Кроме того, я использую версию Spring Integration 4.3.12.RELEASE.

Проблема в том, что я по-прежнему получаю семантику с балансировкой нагрузки RR, а не семантику публикации-подписки. Что касается того, что я вижу в ссылке, предоставленной @Hassen Bennour, я бы ожидал чтобы получить ActiveMQ.Advisory.Producer.Topic.myTopic и ActiveMQ.Advisory.Consumer.Topic.myTopic строку в списке всех тем. Почему-то мне кажется, что я плохо использую библиотеки Spring Integration, и поэтому я настраиваю очередь, когда хочу создать тему.

ОБНОВЛЕНИЕ 2: извините за недоразумение. jmsOutputChannel2 на самом деле jmsOutputChannel здесь, я отредактировал основную часть. Я использую второстепенную «тему» ​​в своем коде в качестве проверки, что-то, что производитель может отправлять и получать ответы. Название "темы" также отличается, так что ... это целиком в отдельном потоке.

Я добился небольшого прогресса, изменив потоки приемника следующим образом:

@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {        
    //return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination("myTopic"))
            //.channel("jmsInputChannel").get();
return IntegrationFlows.from(Jms.publishSubscribeChannel(connectionFactory()).destination("myTopic")) //Jms.publishSubscribeChannel() rather than Jms.messageDrivenChannelAdapter()
            .channel("jmsInputChannel").get();
}

Это создает рекомендательную тему типа Consumer.Topic.myTopic, а не Consumer.Queue.myTopic для брокера, И действительно, тема называется просто myTopic (как я могу видеть на вкладке тем). Однако, как только производитель запускается, создается Producer.Queue рекомендательная тема, и сообщения отправляются туда, но не доставляются.

Выбор адаптера во входном потоке, по-видимому, определяет, какой тип консультативной темы для потребителя создается (Тема против очереди при переключении на Jms.publishSubscribeChannel() с Jms.messageDrivenChannelAdapter()). Однако мне не удалось найти что-то похожее на выходной поток.

ОБНОВЛЕНИЕ 3: проблема решена благодаря @Hassen Bennour. Резюме:

Подключил jmsTemplate() в Jms.outboundAdapter() производителя

@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() {
    return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(jsaTemplate())
            .destination("myTopic")
    ).get();
}

И более сложная конфигурация для потребителя Jms.messageDrivenChannelAdapter():

@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {        
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
        Jms.container(connectionFactory(),"myTopic")
        .pubSubDomain(true).get()) )
        .channel("jmsInputChannel").get();
}

Хотя это, вероятно, самый простой и гибкий метод, имея такой компонент ...

@Bean
public Topic topic() {
    return new ActiveMQTopic("myTopic");
}

для подключения в качестве места назначения для адаптеров, а не только для String.

Спасибо еще раз.


person Kurobara    schedule 23.02.2018    source источник
comment
jmsOutputChannel2 (). send () отправляет сообщения в jmsOutputChannel () ??   -  person Hassen Bennour    schedule 23.02.2018
comment
Это 2 было ошибкой сообщить здесь с моей стороны. Как бы то ни было, я просто отправляю строки непосредственно в канал сообщений jmsOutputChannel(), который связан с Jms.outboundAdapter() через IntegrationFlow.   -  person Kurobara    schedule 23.02.2018


Ответы (1)


добавить spring.jms.pub-sub-domain = true в application.properties

or

@Bean
public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory,
        DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    // the configurer will use PubSubDomain from application.properties if defined or false if not
    //so setting it on the factory level need to be set after this
    configurer.configure(factory, connectionFactory);
    factory.setPubSubDomain(true);
    return factory;
}

ActiveMQ.Advisory.Consumer.Queue.myTopic - это консультативная тема для очереди с именем myTopic. Посмотрите здесь, чтобы прочитать о консультативном http://activemq.apache.org/advisory-message.html

ОБНОВЛЕНИЕ:

обновите свои определения, как показано ниже

@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() {
    return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(jmsTemplate())
            .destination("myTopic")
    ).get();
}

@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {        
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
            Jms.container(connectionFactory(),"myTopic")
            .pubSubDomain(true).get()) )
            .channel("jmsInputChannel").get();
}

или определите пункт назначения как тему и замените пункт назначения ("myTopic") пунктом назначения (тема ())

@Bean
public Topic topic() {
    return new ActiveMQTopic("myTopic");
}
person Hassen Bennour    schedule 23.02.2018
comment
Свойство уже было установлено, я забыл упомянуть. Я также пробовал переместить строку factory.setPubSubDomain(true) после вызова конфигуратора. У меня точно такое же поведение. - person Kurobara; 23.02.2018