Как реализовать потребителя очереди с циклическим перебором в Spring boot

Весной я создаю службу, управляемую сообщениями, которая будет работать в кластере и должна извлекать сообщения из очереди RabbitMQ в циклическом режиме. Реализация в настоящее время извлекает сообщения из очереди в порядке очереди, что приводит к резервному копированию некоторых серверов, в то время как другие простаивают.

Текущий QueueConsumerConfiguration.java выглядит так:

@Configuration
public class QueueConsumerConfiguration extends RabbitMqConfiguration {
private Logger LOG = LoggerFactory.getLogger(QueueConsumerConfiguration.class);

private static final int DEFAULT_CONSUMERS=2;

@Value("${eventservice.inbound}")
protected String inboudEventQueue;

@Value("${eventservice.consumers}")
protected int queueConsumers;

@Autowired
private EventHandler eventtHandler;

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    template.setRoutingKey(this.inboudEventQueue);
    template.setQueue(this.inboudEventQueue);
    template.setMessageConverter(jsonMessageConverter());
    return template;
}

@Bean
public Queue inboudEventQueue() {
    return new Queue(this.inboudEventQueue);
}

@Bean
public SimpleMessageListenerContainer listenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueueNames(this.inboudEventQueue);
    container.setMessageListener(messageListenerAdapter());
    if (this.queueConsumers > 0) {
        LOG.info("Starting queue consumers:" + this.queueConsumers );
        container.setMaxConcurrentConsumers(this.queueConsumers);
        container.setConcurrentConsumers(this.queueConsumers);
    } else {
        LOG.info("Starting default queue consumers:" + DEFAULT_CONSUMERS);
        container.setMaxConcurrentConsumers(DEFAULT_CONSUMERS);
        container.setConcurrentConsumers(DEFAULT_CONSUMERS);            
    }
    return container;
}

@Bean
public MessageListenerAdapter messageListenerAdapter() {
    return new MessageListenerAdapter(this.eventtHandler, jsonMessageConverter());
}
}

Это просто добавление

container.setChannelTransacted(true);

к конфигурации?


person Andrew Rutter    schedule 21.07.2016    source источник
comment
Непонятно, что вы имеете в виду - предварительная выборка по умолчанию (1) означает, что каждому потребителю будет отправлено одно сообщение за раз (циклический перебор). RabbitMQ не знает разницы между тремя потребителями в одном контейнере и контейнером. 3 контейнера с 1 потребителем в каждом — все они потребители с точки зрения Кролика.   -  person Gary Russell    schedule 21.07.2016
comment
Я думаю, что это моя проблема. Я ищу способ настроить так, чтобы 3 потребителя не обрабатывались последовательно, когда они зарегистрированы в RabbitMQ. Если у меня есть 3 контейнера (1,2,3) и два потребителя на каждом (A,B), когда эти контейнеры запускаются, я вижу их как A1, A2, A3, B1, B2, B3 - где я бы предпочел, чтобы их было больше случайно распределены. Можно ли оставить container.setConcurrentConsumers и запустить эти потребители по мере необходимости? Я думал, что читал о конфликте с CachingConnectionFactory, который, как я думаю, необходим для прохождения аутентификации.   -  person Andrew Rutter    schedule 21.07.2016


Ответы (1)


RabbitMQ одинаково относится ко всем потребителям — он не знает разницы между несколькими потребителями в одном контейнере и контейнером. один потребитель в нескольких контейнерах (например, на разных хостах). Каждый из них является потребителем с точки зрения Кролика.

Если вам нужен больший контроль над привязкой к серверу, вам нужно использовать несколько очередей, при этом каждый контейнер прослушивает свою собственную очередь.

Затем вы контролируете распространение на стороне производителя - например. использование темы или прямого обмена и определенных ключей маршрутизации для направления сообщений в определенную очередь.

Это крепко связывает производителя с потребителями (он должен знать, сколько их).

Или вы можете попросить своего производителя использовать ключи маршрутизации rk.0, rk.1, ..., rk.29 (неоднократно сбрасывая на 0 при достижении 30).

Затем вы можете связать очереди потребителей с несколькими привязками -

потребитель 1 получает от rk.0 до rk.9, 2 получает от rk.10 до rk.19 и т. д. и т. д.

Если затем вы решите увеличить число потребителей, просто соответствующим образом реорганизуйте привязки, чтобы перераспределить работу.

Контейнер масштабируется до maxConcurrentConsumers по запросу, но на практике масштабирование происходит только тогда, когда весь контейнер какое-то время простаивает.

person Gary Russell    schedule 21.07.2016