Почему celery.control.inspect сообщает о меньшем количестве задач в очереди, чем rabbitmqctl?

rabbitmqctl правильно сообщает о тысячах задач в очереди:

$ sudo rabbitmqctl -q list_queues name messages messages_ready messages_unacknowledged
default 13142   13126   16

Тем не менее, сельдерей сообщает:

>>> len(app.control.inspect().active()['celery@default'])
4
>>> len(app.control.inspect().scheduled()['celery@default'])
1
>>> len(app.control.inspect().reserved()['celery@default'])
16
>>> len(app.control.inspect().revoked()['celery@default'])
0

Правильное количество (тысячи) задач, кажется, отображается в app.control.inspect().stats()['celery@default']['total'], но я действительно хочу знать правильное количество ожидающих задач в очереди из python, а active() и другие, похоже, когда-либо сообщали только до 16 или около того - возможно, есть предел?

Если не считать использования привилегированных вызовов подпроцесса для rabbitmqctl, как я могу получить полное количество задач в очереди из Python, предпочтительно через celery (кстати, этот сервер в настоящее время использует Celery 3.1.8)


person DrMeers    schedule 29.11.2016    source источник


Ответы (1)


app.control.inspect Celery проверит задачи, которые обрабатывается только запущенными рабочими процессами.

Несмотря на то, что у вас есть тысячи задач в очереди, ваш рабочий процесс будет выполнять только несколько указанных задач в любой момент времени. Это active задачи.

В дополнение к этому рабочий может предварительно выбрать некоторые задачи, которые будут зарезервированы для этого рабочего. Они будут показаны в reserved задачах.

Если вы установили ETA для своих задач или если есть периодические задачи, они будут подпадать под scheduled задач.

Похоже, вы запустили воркер с параллелизмом 4 (или воркер с настройками по умолчанию на 4-ядерной машине). Таким образом, активных задач 4. Каждый рабочий процесс предварительно загрузил 4 задачи, что привело к 16 зарезервированным задачам.

Насколько я знаю, с сельдереем невозможно получить общее количество задач в очереди.

Однако есть несколько решений Python для получения общего количества сообщений в очереди. Вы можете проверить другой мой ответ здесь, чтобы узнать о других способах сделать это.

Обновлять:

pika — это клиент Python для взаимодействия с rabbitmq. Вы можете использовать его для потребления сообщений. Вот простой пример использования каждого сообщения. Вы можете просмотреть дополнительные примеры использования в документации pika.

person ChillarAnand    schedule 06.02.2017
comment
Спасибо! Могу ли я также получить подробную информацию о том, что каждое сообщение в очереди использует pika и т. д., или просто общее количество сообщений в очереди? - person DrMeers; 07.02.2017
comment
Спасибо @ChillarAnand; Потребление сообщений через pika оставляет их безопасно в очереди для обработки сельдереем? Если да, то это хорошее решение - person DrMeers; 08.02.2017
comment
@DrMeers Я не думаю, что для этого есть способ. Однако вы можете получать сообщения и помещать их в очередь rabbitmq.1065348.n5.nabble.com/ - person ChillarAnand; 08.02.2017