Каков порядок очередности в RabbitMQ?

Согласно документации RabbitMQ на Благодарности потребителей :

Когда сообщение повторно поставлено в очередь, оно будет помещено в исходную позицию в очереди, если это возможно. Если нет (из-за одновременных доставок и подтверждений от других потребителей, когда несколько потребителей совместно используют очередь), сообщение будет повторно поставлено в очередь на позицию ближе к заголовку очереди.

Таким образом, с одним клиентом-потребителем, если очередь сервера изначально

хвост [c b a] голова

и клиент-потребитель потребляет заголовочное сообщение (а), очередь сервера должна стать:

хвост [c b] голова

Затем, если клиент-потребитель загружает обработанное сообщение, сообщение должно быть повторно поставлено в очередь сервера в заголовке (его исходное положение согласно документации), а очередь сервера должна стать:

хвост [c b a] голова

Наконец, клиент-потребитель должен снова принять то же самое заголовочное сообщение (а).

Но это не то, что я наблюдал, используя библиотеку Python Pika. Я заметил, что заблокированные сообщения повторно ставятся в хвост очереди сервера, а не в начале (исходное положение). Правильна ли документация RabbitMQ или правильна библиотека Pika?

Образец кода:

import logging

import pika

logging.basicConfig(level=logging.INFO)
logging.getLogger("pika").propagate = False
parameters = pika.ConnectionParameters()

# Produce messages

with pika.BlockingConnection(parameters) as connection:
    queue = "foobar"
    routing_key = queue
    channel = connection.channel()
    channel.queue_declare(queue=queue)
    for body in ["a", "b", "c"]:
        channel.publish(exchange="", routing_key=routing_key, body=body)
        logging.info(
            "Produced message %r with routing key %r", body, routing_key
        )

# Consume messages

def handle(channel, method, properties, body):
    logging.info("Consumed message %r from queue %r", body.decode(), queue)
    channel.basic_nack(method.delivery_tag)

with pika.BlockingConnection(parameters) as connection:
    queue = "foobar"
    channel = connection.channel()
    channel.queue_declare(queue=queue)
    channel.basic_consume(queue=queue, on_message_callback=handle)
    channel.start_consuming()

Выход:

ИНФОРМАЦИЯ: root: Создано сообщение 'a' с ключом маршрутизации 'foobar'
ИНФОРМАЦИЯ: root: Создано сообщение 'b' с ключом маршрутизации 'foobar'
ИНФОРМАЦИЯ: root: Создано сообщение 'c' с ключом маршрутизации 'foobar '
INFO: root: получено сообщение' a 'из очереди' foobar '
INFO: root: получено сообщение' b 'из очереди' foobar '
INFO: root: получено сообщение' c 'из очереди' foobar '
ИНФОРМАЦИЯ: root: Полученное сообщение' a 'из очереди' foobar '
ИНФОРМАЦИЯ: root: Полученное сообщение' b 'из очереди' foobar '
ИНФОРМАЦИЯ: root: Полученное сообщение' c 'из очереди 'foobar'
ИНФОРМАЦИЯ: root: получено сообщение 'a' из очереди 'foobar'
ИНФОРМАЦИЯ: root: получено сообщение 'b' из очереди 'foobar'
ИНФОРМАЦИЯ: root: получено сообщение 'c' из очередь 'foobar'


person Maggyero    schedule 15.01.2019    source источник


Ответы (2)


Поведение, с которым вы столкнулись, скорее всего, связано с поведением предварительной выборки.

Поскольку вы не указали желаемое качество обслуживания, я считаю (был бы признателен за более осведомленный источник, чтобы подтвердить эту точку зрения?), Что предварительная выборка определяется сервером и, вероятно, будет довольно высокой.

Идея состоит в том, что с точки зрения производительности клиент может получать несколько сообщений, что в большинстве случаев было бы выгодно:

  • если на стороне потребителя есть многопоточность, он, вероятно, может параллельно обрабатывать несколько сообщений, поэтому будет несколько сообщений, которые еще не были подтверждены в любой момент времени.
  • чтобы обеспечить более гибкую обработку в «счастливых» случаях, клиент может подтвердить блок сообщений, давая серверу знать, что до данного сообщения все сообщения, полученные потребителем, подтверждены, это снижает накладные расходы, когда у нас есть случаи больших количество сообщений, требующих небольшой обработки

Если вы просмотрите ссылки на документацию ниже, они объяснят, как вы можете контролировать поведение.

Дополнительная информация по этим пунктам доступна по адресу:

person Olivier    schedule 15.01.2019

Спасибо, @Olivier. С channel.basic_qos(prefetch_count=1) я получаю задокументированное поведение:

ИНФОРМАЦИЯ: root: Создано сообщение 'a' с ключом маршрутизации 'foobar'
ИНФОРМАЦИЯ: root: Создано сообщение 'b' с помощью ключа маршрутизации 'foobar'
ИНФОРМАЦИЯ: root: Создано сообщение 'c' с ключом маршрутизации 'foobar '
INFO: root: полученное сообщение' a 'из очереди' foobar '
INFO: root: полученное сообщение' a 'из очереди' foobar '
INFO: root: полученное сообщение' a 'из очереди' foobar '
ИНФОРМАЦИЯ: root: Полученное сообщение' a 'из очереди' foobar '
ИНФОРМАЦИЯ: root: Полученное сообщение' a 'из очереди' foobar '
ИНФОРМАЦИЯ: root: Полученное сообщение' a 'из очереди 'foobar'
ИНФОРМАЦИЯ: root: Полученное сообщение 'a' из очереди 'foobar'
ИНФОРМАЦИЯ: root: Полученное сообщение 'a' из очереди 'foobar'
ИНФОРМАЦИЯ: root: Полученное сообщение 'a' из очередь 'foobar'

person Maggyero    schedule 15.01.2019