Я пытаюсь использовать брокера ActiveMQ для доставки сообщения двум потребителям, слушающим автоматическую тему, с использованием средств интеграции Spring.
Вот мои компоненты конфигурации (общие для издателей и подписчиков):
@Value("${spring.activemq.broker-url}")
String brokerUrl;
@Value("${spring.activemq.user}")
String userName;
@Value("${spring.activemq.password}")
String password;
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(brokerUrl);
connectionFactory.setUserName(userName);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean
public JmsListenerContainerFactory<?> jsaFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true); //!!
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
template.setPubSubDomain(true); //!!
return template;
}
Вот бобы для потребителей:
@Bean(name = "jmsInputChannel")
public MessageChannel jmsInputChannel() {
return new PublishSubscribeChannel();
}
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination("myTopic"))
.channel("jmsInputChannel").get();
}
//Consumes the message.
@ServiceActivator(inputChannel="jmsInputChannel")
public void receive(String msg){
System.out.println("Received Message: " + msg);
}
А это фасоль для производителя:
@Bean(name = "jmsOutputChannel")
public MessageChannel jmsOutputChannel() {
return new PublishSubscribeChannel();
}
@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() {
return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(connectionFactory())
.destination("myTopic")
).get();
}
private static int counter = 1;
@Scheduled(initialDelay=5000, fixedDelay=2000)
public void send() {
String s = "Message number " + counter;
counter++;
jmsOutputChannel().send(MessageBuilder.withPayload(s).build());
}
Я НЕ использую встроенный брокер ActiveMQ. Я использую одного брокера, одного производителя и двух потребителей, каждый в своем собственном (докер) контейнере.
Моя проблема в том, что, хотя я вызвал setPubSubDomain(true)
как для JmsListenerContainerFactory
, так и для JmsTemplate
, мои «темы» ведут себя как очереди: один потребитель печатает все сообщения с четными номерами, а другой - все сообщения с нечетными номерами.
Фактически, получив доступ к веб-интерфейсу ActiveMQ, я вижу, что мои «темы» (т.е. на странице /topics.jsp) называются ActiveMQ.Advisory.Consumer.Queue.myTopic
и ActiveMQ.Advisory.Producer.Queue.myTopic
, а «myTopic» действительно появляется на странице очередей (т.е. /queues.jsp) .
Узлы запускаются в следующем порядке:
- Брокер AMQ
- Потребитель 1
- Потребитель 2
- Режиссер
Первая "тема", которая создается, - это ActiveMQ.Advisory.Consumer.Queue.myTopic
, в то время как продюсерская тема, очевидно, появляется только после того, как продюсер начал.
Я не эксперт по ActiveMQ, поэтому, возможно, тот факт, что мои "темы" производителя / потребителя названы ".Queue", просто вводит в заблуждение. Однако я получаю семантику, описанную в официальная документация ActiveMQ для очередей, а не тем.
Я также уже рассмотрел этот вопрос, однако все мои задействованные каналы уже относятся к типу PublishSubscribeChannel.
Мне нужно добиться, чтобы все сообщения были доставлены всем моим (возможно,> 2) потребителям.
ОБНОВЛЕНИЕ: я забыл упомянуть, что мой файл application.properties
уже содержит spring.jms.pub-sub-domain=true
вместе с другими настройками.
Кроме того, я использую версию Spring Integration 4.3.12.RELEASE.
Проблема в том, что я по-прежнему получаю семантику с балансировкой нагрузки RR, а не семантику публикации-подписки. Что касается того, что я вижу в ссылке, предоставленной @Hassen Bennour, я бы ожидал чтобы получить ActiveMQ.Advisory.Producer.Topic.myTopic
и ActiveMQ.Advisory.Consumer.Topic.myTopic
строку в списке всех тем. Почему-то мне кажется, что я плохо использую библиотеки Spring Integration, и поэтому я настраиваю очередь, когда хочу создать тему.
ОБНОВЛЕНИЕ 2: извините за недоразумение. jmsOutputChannel2
на самом деле jmsOutputChannel
здесь, я отредактировал основную часть. Я использую второстепенную «тему» в своем коде в качестве проверки, что-то, что производитель может отправлять и получать ответы. Название "темы" также отличается, так что ... это целиком в отдельном потоке.
Я добился небольшого прогресса, изменив потоки приемника следующим образом:
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {
//return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(connectionFactory()).destination("myTopic"))
//.channel("jmsInputChannel").get();
return IntegrationFlows.from(Jms.publishSubscribeChannel(connectionFactory()).destination("myTopic")) //Jms.publishSubscribeChannel() rather than Jms.messageDrivenChannelAdapter()
.channel("jmsInputChannel").get();
}
Это создает рекомендательную тему типа Consumer.Topic.myTopic
, а не Consumer.Queue.myTopic
для брокера, И действительно, тема называется просто myTopic
(как я могу видеть на вкладке тем). Однако, как только производитель запускается, создается Producer.Queue
рекомендательная тема, и сообщения отправляются туда, но не доставляются.
Выбор адаптера во входном потоке, по-видимому, определяет, какой тип консультативной темы для потребителя создается (Тема против очереди при переключении на Jms.publishSubscribeChannel()
с Jms.messageDrivenChannelAdapter()
). Однако мне не удалось найти что-то похожее на выходной поток.
ОБНОВЛЕНИЕ 3: проблема решена благодаря @Hassen Bennour. Резюме:
Подключил jmsTemplate()
в Jms.outboundAdapter()
производителя
@Bean(name = "jmsOutputFlow")
public IntegrationFlow jmsOutputFlow() {
return IntegrationFlows.from(jmsOutputChannel()).handle(Jms.outboundAdapter(jsaTemplate())
.destination("myTopic")
).get();
}
И более сложная конфигурация для потребителя Jms.messageDrivenChannelAdapter()
:
@Bean(name = "jmsInputFlow")
public IntegrationFlow buildReceiverFlow() {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(
Jms.container(connectionFactory(),"myTopic")
.pubSubDomain(true).get()) )
.channel("jmsInputChannel").get();
}
Хотя это, вероятно, самый простой и гибкий метод, имея такой компонент ...
@Bean
public Topic topic() {
return new ActiveMQTopic("myTopic");
}
для подключения в качестве места назначения для адаптеров, а не только для String.
Спасибо еще раз.
jmsOutputChannel()
, который связан сJms.outboundAdapter()
через IntegrationFlow. - person Kurobara   schedule 23.02.2018