Одновременное использование Celery в процессах и gevent в задачах

Я хотел бы использовать Celery в качестве очереди для моих задач, чтобы мое веб-приложение могло поставить задачу в очередь, вернуть ответ, и задача будет обработана тем временем / когда-нибудь / ... Я создаю своего рода API, поэтому я не знать, какие задачи будут там, заранее - в будущем могут быть задачи, связанные с HTTP-запросами, другим вводом-выводом, а также задачами, потребляющими ресурсы ЦП. В связи с этим я бы хотел запустить рабочих Celery на процессах, поскольку это универсальный вид параллелизма в Python.

Однако я бы тоже хотел использовать gevent в своих задачах, чтобы у меня была одна задача, порождающая множество HTTP-запросов и т. Д. Проблема в том, когда я делаю это:

from gevent import monkey
monkey.patch_all()

Сельдерей перестает работать. Он запускается, но никакие задачи не могут быть эффективно поставлены в очередь - они, кажется, переходят к брокеру, но работник Celery не собирает и не обрабатывает их. Только запускается и ждет. Если я удалю эти строки и выполню задачу без gevent и распараллеливания, все заработает.

Я думаю, это может быть связано с тем, что патчи gevent также работают с потоками. Так что я попробовал

from gevent import monkey
monkey.patch_all(thread=False)

... но тогда Celery даже не запускается, он вылетает без объяснения причин (включен уровень отладки логирования).

Можно ли использовать Celery для постановки задач в очередь, а gevent - для выполнения каких-то действий внутри одной задачи? Как? Что я делаю не так?


person Honza Javorek    schedule 02.11.2012    source источник
comment
+1 Я тоже последнее время об этом думаю.   -  person Katriel    schedule 02.11.2012
comment
Могут быть некоторые проблемы с gevent.monkey.patch_all () github.com/kennethreitz/grequests/issues / 8   -  person sputnikus    schedule 02.11.2012
comment
Конечно. К сожалению, чтобы использовать gevent с сокетами и т. Д., Необходимо запустить такой оператор.   -  person Honza Javorek    schedule 02.11.2012
comment
Вы запускаете цикл событий после запуска зеленых потоков? например: gevent.joinall() Думаю, самая большая проблема с этим - это очистить потом. AFAIK вы не можете временно исправить, процесс должен оставаться исправленным навсегда.   -  person asksol    schedule 12.11.2012
comment
Да, я сначала создал гринлеты, а затем использовал gevent.joinall().   -  person Honza Javorek    schedule 12.11.2012


Ответы (4)


Я считаю, что рекомендуемый способ запуска задачи следующий.

python manage.py celery worker -P gevent --loglevel=INFO

Gevent необходимо исправить как можно раньше.

person myusuf3    schedule 28.05.2013
comment
Это относится только к проектам Django, и в вопросе не упоминалось ни о каком веб-фреймворке. - person Alex K; 17.07.2014
comment
@ myusuf3 где должен быть патч, если он будет как можно раньше? какой хук использовать? - person eligro; 03.05.2015
comment
@eligro celery обрабатывает исправление обезьяны внутренне, просто указывая пул в командной строке. - person bmoran; 24.09.2019

Вы можете запустить сельдерей с несколькими потоками, содержащими несколько зеленок, например:

$ celery multi start 4 -P gevent -l info -c:1-4 1000
person remdezx    schedule 03.01.2013
comment
Не удалось найти подходящую документацию для сельдерея мульти. Как это работает? Я пробовал, но он каким-то образом порождает фоновые процессы (возможно, неуправляемые Supervisord?). - person Honza Javorek; 06.05.2013
comment
вы также случайно запускаете несколько узлов. - person myusuf3; 29.05.2013
comment
Я тоже не нашел никакой документации. Я нашел приведенную выше информацию здесь: groups.google.com/ forum /? fromgroups = #! topic / celery-users / - person remdezx; 07.06.2013

По моему странному опыту, Celery Beat не может правильно работать с воркерами с пулом gevent (запланированные задачи заблокированы и ждут вечно), если вы не активируете исправление gevent monkey для процесса Beat.

Однако celery beat не поддерживает параметр --pool=gevent или -P gevent. Правильный способ ввести исправление gevent monkey - использовать ограниченный двоичный файл celery, например:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from gevent import monkey
monkey.patch_all()

import re
import sys

from celery.__main__ import main

if __name__ == '__main__':
    sys.argv[0] = re.sub(r'(-script\.pyw|\.exe)?$', '', sys.argv[0])
    sys.exit(main())

Сохраните его как celery-gevent и запустите службу Beat следующим образом:

celery-gevent beat --app=proj.celery:app --loader=djcelery.loaders.DjangoLoader -f /var/log/celery/beat.log -l INFO --workdir=/my/proj --pidfile=/var/run/celery/beat.pid

В proj.celery вам также следует исправить соединение Django, чтобы избежать DatabaseError:

from __future__ import absolute_import

import os
# Set the Django settings module for the 'celery' program
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

import django
# Load Django model definitions, etc
django.setup()

from django.db import connection
# Allow thread sharing to ensure that Django database connection
# works properly with gevent.
connection.allow_thread_sharing = True

from django.conf import settings
from celery import Celery

app = Celery('proj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

(Приведенный выше пример работает для Python 2.7.10, Celery 3.1.18, Django 1.8.2 и gevent 1.0.2)

person Rockallite    schedule 08.06.2015
comment
У Celery Beat нет опции -P, потому что она ему не нужна: нет пула задач. Это легкий процесс, который периодически добавляет задачи в пул. Я прекрасно использую его с работниками gevent без каких-либо подобных хаков, но тогда я не использую --app или DjangoLoader, а просто файл конфигурации сельдерея, в котором перечислены задачи и их расписания. - person Václav Slavík; 20.02.2016
comment
Сельдерей сам исправляет обезьяну, вы делаете уже завершенную работу: docs.celeryproject.org/en/2.2/_modules/celery/concurrency/ - person jooks; 12.01.2021

Насколько мне удалось узнать, это невозможно. Если кто-то найдет ответ получше, я приму его вместо своего.

Единственный вариант - использовать gevent в качестве бэкэнда для рабочих Celery. Что нужно сделать для этого, так это добавить в файл конфигурации следующее:

CELERYD_POOL = 'gevent'

Более подробную информацию об этих параметрах можно найти здесь. Дополнительная информация о пуле gevent находится на этой странице. Обратите внимание на тот факт, что бассейн gevent все еще помечен как экспериментальный. Я не нашел доступных тестов для сравнения процессов и пула async gevent для разных задач (задачи, ориентированные на ввод-вывод, задачи, ориентированные на ЦП), но, наконец, я понял, что даже мои задачи, связанные с привязкой к ЦП, на самом деле будут больше чем CPU, потому что я использую базу данных для сохранения результатов, и соединение с базой данных будет узким местом, а не вычислительной частью. У меня не будет научных задач, которые сильно ударили бы по процессору.

person Honza Javorek    schedule 07.11.2012
comment
Попытка понять пулы gevent немного лучше. Разве это не помогает с задачами, связанными с вводом-выводом? Как именно они работают? - person Andres; 01.05.2013
comment
В момент, когда был задан вопрос и дан ответ, -P там не было. Я принял ответ myusuf3. - person Honza Javorek; 14.06.2014