Весной я создаю службу, управляемую сообщениями, которая будет работать в кластере и должна извлекать сообщения из очереди RabbitMQ в циклическом режиме. Реализация в настоящее время извлекает сообщения из очереди в порядке очереди, что приводит к резервному копированию некоторых серверов, в то время как другие простаивают.
Текущий QueueConsumerConfiguration.java выглядит так:
@Configuration
public class QueueConsumerConfiguration extends RabbitMqConfiguration {
private Logger LOG = LoggerFactory.getLogger(QueueConsumerConfiguration.class);
private static final int DEFAULT_CONSUMERS=2;
@Value("${eventservice.inbound}")
protected String inboudEventQueue;
@Value("${eventservice.consumers}")
protected int queueConsumers;
@Autowired
private EventHandler eventtHandler;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.inboudEventQueue);
template.setQueue(this.inboudEventQueue);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean
public Queue inboudEventQueue() {
return new Queue(this.inboudEventQueue);
}
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueNames(this.inboudEventQueue);
container.setMessageListener(messageListenerAdapter());
if (this.queueConsumers > 0) {
LOG.info("Starting queue consumers:" + this.queueConsumers );
container.setMaxConcurrentConsumers(this.queueConsumers);
container.setConcurrentConsumers(this.queueConsumers);
} else {
LOG.info("Starting default queue consumers:" + DEFAULT_CONSUMERS);
container.setMaxConcurrentConsumers(DEFAULT_CONSUMERS);
container.setConcurrentConsumers(DEFAULT_CONSUMERS);
}
return container;
}
@Bean
public MessageListenerAdapter messageListenerAdapter() {
return new MessageListenerAdapter(this.eventtHandler, jsonMessageConverter());
}
}
Это просто добавление
container.setChannelTransacted(true);
к конфигурации?