RabbitMQ приостанавливает потребление очереди

Каков наилучший способ сохранить устойчивую очередь и ее привязки, но приостановить ее потребителей?

Вариант использования: я хотел бы «дать ему сбой» и прекратить обработку сообщений, если мы продолжаем получать кучу сообщений, с которыми мы не можем справиться (например, база данных не работает или проблема со схемой), но хотел бы продолжать агрегировать в очередь. То есть разрешить публикацию, но приостановить потребление.

Я могу придумать три решения:

  1. Я мог бы заставить всех потребителей, привязанных к очереди, постоянно отклонять сообщения и повторно ставить их в очередь, но это своего рода пустая трата ресурсов, не говоря уже о том, что я программно выполняю вышеуказанную логику.
  2. Я мог бы вызвать basic.cancelConsumer для всех потребителей (см. ниже)
  3. Или с точки зрения spring-amqp Думаю, я мог бы вызвать shutdown для всех SimpleMessageListenerContainer, привязанные к очереди.

#1 мы уже делаем, так как сообщения отклоняются. Проблема в том, что в конечном итоге это похоже на бесконечный цикл сбоев, и если вы регистрируете этот сбой, еще больше ресурсов тратится впустую.

#3 кажется идеальным, но мне нужно как-то узнать обо всех прослушивателях сообщений, а затем уведомить их о завершении работы. Я полагаю, что мог бы использовать разветвленный обмен, чтобы уведомить о том, что очередь должна быть приостановлена. Я чувствую, что RabbitMQ должен иметь что-то встроенное для этой логики. Другая проблема заключается в том, что вы можете привязать несколько очередей к контейнеру сообщений (не все очереди могут нуждаться в приостановке).

Для #2 я знаю, что могу отменить потребителя с его consumerTag, но вопрос (при условии, что это правильный способ сделать вышеизложенное) заключается в том, где мне получить список consumerTags в очередь?


person Adam Gent    schedule 08.03.2013    source источник


Ответы (2)


Если вам нужно остановить потребителя, просто вызовите базовую отмену, это способ сделать это.

Наличие устойчивых очередей — это вопрос, который вы решаете, когда объявляете очередь: durable=true auto_delete=false.

Наличие постоянных сообщений определяется при их публикации: delivery_mode=2.

У каждого потребителя есть свой потребительский тег. Всякий раз, когда вы получаете сообщение из очереди, конверт должен иметь тег потребителя, и вы должны вызывать базовую отмену с этим тегом.

Насколько я знаю, вы не можете иметь потребителя A в соединении 1 и вызывать базовую отмену для этого потребителя в другом соединении. Я могу ошибаться в этом.

person old_sound    schedule 08.03.2013
comment
Спасибо. Я знаю, как объявлять очереди в кролике, как указано выше. Похоже, мне придется управлять своими потребителями самостоятельно, и это путь, по которому я уже иду. Было бы неплохо, если бы у кролика была возможность администратора приостановить использование очереди. - person Adam Gent; 08.03.2013
comment
Единственное, что вы можете сделать из соединения, это принудительно закрыть его от администратора, что на самом деле не то, что вы ищете. - person old_sound; 09.03.2013

Это решение относится к spring-amqp . Это в основном #3 из моего ответа.

Я поддерживаю службу, которая имеет Map<String,CustomSimpleMessageListenerContainer> имени очереди для пользовательского расширенного SimpleMessageListenerContainer.

После определенного количества исключений я отправляю сообщение «panic», которое отправляется в специальную очередь, полученную службой, для отключения потребителя.

person Adam Gent    schedule 16.03.2013
comment
panic с дополнительной очередью — отличная идея! - person watery; 16.01.2020