Правильная настройка django redis celery и celery beats

Я пытаюсь настроить django + celery + redis + celery_beats, но это вызывает у меня проблемы. Документация довольно проста, но когда я запускаю сервер django, redis, celery и celery beats, ничего не печатается и не регистрируется (все мои тестовые задачи что-то делают в журнале).

Это моя структура папок:

- aenima 
 - aenima
   - __init__.py
   - celery.py

 - criptoball
   - tasks.py

celery.py выглядит так:

from __future__ import absolute_import, unicode_literals
import os
from django.conf import settings
from celery import Celery


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'aenima.settings')

app = Celery("criptoball")
app.conf.broker_url = 'redis://localhost:6379/0'

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.conf.timezone = 'UTC'

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

app.conf.beat_schedule = {
    'test-every-30-seconds': {
        'task': 'tasks.test_celery',
        'schedule': 30.0,
        'args': (16, 16)
    }, }

и tasks.py выглядит так:

from __future__ import absolute_import, unicode_literals
from datetime import datetime, timedelta
from celery import shared_task
import logging

from django_celery_beat.models import PeriodicTask, IntervalSchedule

cada_10_seg = IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)

test_celery_periodic = PeriodicTask.objects.create(interval=cada_10_seg, name='test_celery', task='criptoball.tasks.test_celery',
expires=datetime.utcnow()+timedelta(seconds=30))

@shared_task
def test_celery(x, y):
    logger = logging.getLogger("AENIMA")
    print("EUREKA")
    logger.debug("EUREKA")

Это документы django_celery_beat

Не уверен, что мне не хватает. Когда я бегу

сельдерей -Aenima beat -l debug --scheduler django_celery_beat.schedulers: DatabaseScheduler

сельдерей -Aenima worker -l debug

повторный клик PONG

django runserver и redis server, я ничего не печатаю.

settings.py

CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = TIME_ZONE
CELERY_IMPORTS = ('criptoball.tasks',)

Пока не нашел никакого авторитетного ответа на эту тему в SO.

Я бы хотел все исправить, эта ошибка может быть лишь одной из многих. Спасибо большое за вашу помощь!

Изменить:

Добавлены настройки для redis, по-другому объявлена ​​задача и повышен уровень отладки. Теперь ошибка:

Получено незарегистрированное задание типа u'tasks.test_celery '. Сообщение было проигнорировано и отклонено.

Вы не забыли импортировать модуль, содержащий эту задачу? Или, может быть, вы используете относительный импорт? KeyError: u'aenima.criptoball.tasks.test_celery '

Я считаю, что документация Celery оставляет желать лучшего.

РЕДАКТИРОВАТЬ 2. Попробовав все, это сработало, когда я поместил все задачи в один файл celery.py. @shared_task не работает, пришлось использовать @ app.task.


person Alejandro Veintimilla    schedule 03.02.2018    source источник
comment
У вас работает сельдерей? Например. из командной строки celery worker -A <your_module_name>. Материал должен печататься в терминале, с которого вы это начали.   -  person Chris    schedule 04.02.2018
comment
@Chris обновил ответ, добавил награду.   -  person Alejandro Veintimilla    schedule 24.03.2018
comment
Похоже, celery -A aenima beat celery -A aenima worker -l info может быть ошибкой транскрипции / форматирования. Вы хотели поместить их на две отдельные строчки?   -  person sytech    schedule 28.03.2018
comment
@sytech да. Спасибо за комментарий, поправил.   -  person Alejandro Veintimilla    schedule 28.03.2018
comment
Подумайте о повышении уровня журнала для отладки как для beat, так и для рабочих, и посмотрите, дает ли это какой-либо соответствующий результат. В противном случае я считаю, что стандартным поведением является подавление stdout, поэтому ваши print и .debug могут не отображаться, даже если задача действительно выполняется. По моему опыту, мне также пришлось указать планировщик как DatabaseScheduler, чтобы решить аналогичную проблему с celery_results, но не уверен, применимо ли это здесь.   -  person sytech    schedule 28.03.2018
comment
@sytech только что сделал это. Получено новое сообщение об ошибке. Любые идеи?   -  person Alejandro Veintimilla    schedule 28.03.2018
comment
При запуске сельдерея в режиме отладки он должен сообщить вам, какие задачи зарегистрированы. Если задача не зарегистрирована у воркера, воркер не заберет ее из очереди. Если ваши задачи не регистрируются, попробуйте изменить Celery('criptoball') на Celery('aenima'), как говорится в документации, чтобы использовать имя project. Я догадываюсь, что от этого зависит, как autodiscover_tasks находит ваши задачи. Также я не уверен, почему у вас это lambda в задачах автообнаружения, может быть подозрительным.   -  person sytech    schedule 28.03.2018
comment
Если вы действительно видите задачи, зарегистрированные у исполнителя ... Тогда проблема в имени, которое вы используете при настройке расписания. Попробуйте вместо этого использовать интерфейс администратора django или убедитесь, что используемые вами имена соответствуют именам, которые показывает рабочий. См. автоматическое именование для подробнее.   -  person sytech    schedule 28.03.2018
comment
Есть ли criptoball в вашем списке установленных приложений в django?   -  person 2ps    schedule 29.03.2018
comment
Я предлагаю вам использовать django-q. Это не ответ на твою проблему, но я чувствую твою боль. Некоторое время я употребляла сельдерей, потом бросила. Слишком много проблем, когда вы решаете одну, вам нужно отлаживать другую. С django-q все намного проще. Удачи   -  person Karim N Gorjux    schedule 29.03.2018


Ответы (4)


У меня были эти проблемы раньше. Это не твой код. Обычно это проблема с окружающей средой. Вы должны запустить все в virtualenv, добавив requirements.txt файл с конкретными версиями пакетов.

Существует известная проблема, касающаяся celery 4.x и django 1.x, поэтому вам следует подумать о пакетах, которые вы используете.

В этом руководстве подробно объясняется, как построить virtualenv из сельдерея.

Если вы можете сказать мне версии своих пакетов, я могу попробовать помочь другим способом.

Изменить:

Думаю, дело в том, как вы управляете сельдереем. Если мы исправили первую проблему, попробуйте поиграть с этим:

celery -A aenima.celery:app beat -l debug --scheduler django_celery_beat.schedulers:DatabaseScheduler

or

celery -A aenima.aenima.celery:app beat -l debug --scheduler django_celery_beat.schedulers:DatabaseScheduler

Последняя ошибка, которую вы получаете, связана с обнаружением вашего модуля. Сначала попробуйте.

person Gal Silberman    schedule 25.03.2018
comment
Я использую virtualenv. У меня django 1.10 и celery 4.1. Стоит ли обновлять django? - person Alejandro Veintimilla; 26.03.2018
comment
Вам следует понизить рейтинг сельдерея до 3.x. Я сделал это в своей установке, и она отлично сработала. - person Gal Silberman; 26.03.2018
comment
Могу я добавить примечание, что в документации это написано как Celery 4.0 supports Django 1.8 and newer versions. Please use Celery 3.1 for versions older than Django 1.8.? Итак, 4.x должен иметь возможность работать с django 1.10, во всяком случае, я бы посоветовал обновить django до более высокой версии :) - person King Reload; 26.03.2018
comment
Вы правы, но дело в том, что это все еще проблематично. Думаю, мы оба до сих пор согласны с тем, что проблема в версиях. Если бы это был я, я бы предпочел понизить версию сельдерея до апгрейда django. Если ваш проект большой, обновление django может стать головной болью ... - person Gal Silberman; 26.03.2018
comment
Да, Django доставит много проблем, когда вы попытаетесь его обновить, но в последнее время версии Django довольно быстро растут, поэтому в конечном итоге обновление будет необходимо в любом случае, но да. Я согласен с тем, что это также может быть разница между версиями. :) - person King Reload; 26.03.2018
comment
Спасибо за ответ, отредактировал свой код, стараясь сделать его максимально простым. Теперь у меня нет модуля с именем celery (возможно, это было решено путем изменения местоположения celery.py). Однако я до сих пор ничего не печатаю. - person Alejandro Veintimilla; 27.03.2018
comment
А в вашем файле log.txt демона не видно? - person Gal Silberman; 27.03.2018
comment
Увеличил уровень журнала, и я получил четкое сообщение об ошибке. Добавил его как Edit. - person Alejandro Veintimilla; 28.03.2018

Было бы удобно использовать virtualenv.

Во-первых, как сказал @Gal, вам нужно убедиться, что у вас есть celery 4.x.

Вы можете установить это, выполнив pip:

pip install сельдерей

Конечно, вы также можете установить 4.x версию, добавив ее в свой requirements.txt следующим образом:

сельдерей == 4.1.0

Или более поздние версии, если они будут доступны в будущем.

Затем вы можете переустановить все свои пакеты, используя:

  • pip install -r requirements.txt

Это гарантирует, что у вас установлен этот определенный пакет сельдерея.

Теперь о Celery, хотя ваш код может и не ошибочен, но я напишу так, как я заставил мое приложение Celery работать.

__init __.py:

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery_conf import app as celery_app

__all__ = ['celery_app']

celery_conf.py:

from __future__ import absolute_import, unicode_literals

import os

from celery import Celery
from datetime import timedelta

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '<PATH.TO.YOUR.SETTINGS>')

app = Celery('tasks')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

# Set a beat schedule to update every hour.
app.conf.beat_schedule = {
    'update-every-hour': {
        'task': 'tasks.update',
        'schedule': timedelta(minutes=60),
        'args': (16, 16),
    },
}

# The default task that Celery runs.
@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

tasks.py:

# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import requests

from django.conf import settings
from django.http import HttpResponse

from celery.task import Task
from celery.five import python_2_unicode_compatible
from celery import Celery
app = Celery()


@python_2_unicode_compatible
class Update(Task):
    name = 'tasks.update'

    def run(self, *args, **kwargs):
        # Run the task you want to do.

""" For me the regular TaskRegistry didn't work to register classes, 
so I found this handy TaskRegistry demo and made use of it to 
register tasks as classes."""
class TaskRegistry(Task):

    def NotRegistered_str(self):
        self.assertTrue(repr(TaskRegistry.NotRegistered('tasks.add')))

    def assertRegisterUnregisterCls(self, r, task):
        with self.assertRaises(r.NotRegistered):
            r.unregister(task)
        r.register(task)
        self.assertIn(task.name, r)

    def assertRegisterUnregisterFunc(self, r, task, task_name):
        with self.assertRaises(r.NotRegistered):
            r.unregister(task_name)
        r.register(task, task_name)
        self.assertIn(task_name, r)

    def task_registry(self):
        r = TaskRegistry()
        self.assertIsInstance(r, dict, 'TaskRegistry is mapping')

        self.assertRegisterUnregisterCls(r, Update)

        r.register(Update)
        r.unregister(Update.name)
        self.assertNotIn(Update, r)
        r.register(Update)

        tasks = dict(r)
        self.assertIsInstance(
            tasks.get(Update.name), Update)

        self.assertIsInstance(
            r[Update.name], Update)

        r.unregister(Update)
        self.assertNotIn(Update.name, r)

        self.assertTrue(Update().run())

    def compat(self):
        r = TaskRegistry()
        r.regular()
        r.periodic()

Как я объяснил в коде, обычный taskregistry не работал, встроенный в Celery 4.x, поэтому я использовал демонстрационный реестр задач. Конечно, вы также можете не использовать классы для выполнения задач, но я предпочел использовать классы.

settings.py:

# Broker settings for redis
CELERY_BROKER_HOST = '<YOUR_HOST>'
CELERY_BROKER_PORT = 6379
CELERY_BROKER_URL = 'redis://'
CELERY_DEFAULT_QUEUE = 'default'

# Celery routes
CELERY_IMPORTS = (
    'PATH.TO.tasks' # The path to your tasks.py
)

CELERY_DATABASE_URL = {
    'default': '<CELERY_DATABASE>', # You can also use your already being used database here
}

INSTALLED_APPS = [
    ...
    'PATH.TO.TASKS' # But exclude the tasks.py from this path
]

LOGGING = {
    ...
    'loggers': {
        'celery': {
            'level': 'DEBUG',
            'handlers': ['console'],
            'propagate': True,
        },
    }
}

Я запускаю свой рабочий с помощью следующих команд:

redis-server --daemonize да

celery multi start worker -A PATH.TO.TASKS -l info --beat # Но исключить tasks.py из пути

Я надеюсь, что эта информация может помочь вам или всем, кто борется с сельдереем.

РЕДАКТИРОВАТЬ:

Обратите внимание, что я запускаю рабочий как демон, поэтому вы не сможете увидеть журналы в консоли. Для меня это записано в .txt файл.

Также обратите внимание на пути, которые можно использовать, например, для некоторых, вам нужно включить путь к вашему приложению, например:

project.apps.app

А для других случаев вам нужно включить tasks.py без .py, я написал, когда исключать этот файл, а когда нет.

РЕДАКТИРОВАТЬ 2:

Декоратор @shared_task возвращает прокси, который всегда использует экземпляр задачи в current_app. Это делает декоратор @shared_task полезным для библиотек и приложений многократного использования, поскольку у них не будет доступа к приложению пользователя.

Обратите внимание, что @shared_task не имеет доступа к приложению пользователя. Приложение, которое вы сейчас пытаетесь зарегистрировать, не имеет доступа к вашему приложению. Метод, который вы действительно хотите использовать для регистрации задачи:

from celery import Celery
app = Celery()

@app.task
def test_celery(x, y):
    logger = logging.getLogger("AENIMA")
    print("EUREKA")
    logger.debug("EUREKA")
person King Reload    schedule 26.03.2018
comment
Спасибо за ответ ... изменил мой код, как вы думаете, он должен напечатать что-то подобное? Как мне узнать, выполняется ли тест? - person Alejandro Veintimilla; 27.03.2018
comment
@alejoss, если вы хотите что-то увидеть в консоли, то вам сначала нужно запустить redis как демон redis-server --daemonize yes, а затем работник сельдерея не как демон celery -A aenima -l info --beat, вам также необходимо включить регистратор сельдерея в настройках django, вам придется посмотрите, как настроить регистратор сельдерея. - person King Reload; 27.03.2018
comment
Добавил правку, и я думаю, что подошел ближе, чтобы заставить ее работать. Вышла новая ошибка. Пожалуйста, взгляните на это, если сможете. - person Alejandro Veintimilla; 28.03.2018
comment
@alejoss Я дал вам полную настройку того, как зарегистрировать задачу, ваша задача просто не зарегистрирована, что должно быть легко решено в Google? - person King Reload; 28.03.2018
comment
Пожалуйста, не рекомендуйте Celery 4.1 - в нем есть ошибки, которые мешают правильной работе периодических задач. - person 2ps; 29.03.2018
comment
@ 2ps, как я написал, запускает периодическую задачу. - person King Reload; 29.03.2018
comment
Это ошибка в celery 4.1 - он не запускается постоянно в нужное время для OP. - person 2ps; 29.03.2018
comment
Я принимаю этот ответ, потому что он помог мне больше всего. Хотя я бы хотел разделить награду ... Так что давайте это сделаем! - person Alejandro Veintimilla; 30.03.2018
comment
@alejoss Я рад, что мой ответ помог, но ваша проблема еще не решена, не так ли? Я заинтересован в решении вашей проблемы с сельдереем, если это возможно :) - person King Reload; 31.03.2018
comment
@KingReload, нет, через время вернулась к той же ошибке. KeyError: u'aenima.criptoball.tasks.test_celery' - person Alejandro Veintimilla; 04.04.2018
comment
@alejoss, может быть, было бы лучше положить tasks.py в ту же папку, что и celery.py? потому что моя структура сельдерея такая же. Также вместо @shared_task вы можете использовать @app.task. Тогда вам понадобится from celery import Celery & app = Celery() для app. - person King Reload; 04.04.2018

Получено незарегистрированное задание типа u'tasks.test_celery '. Сообщение было проигнорировано и отклонено.

Вы не забыли импортировать модуль, содержащий эту задачу? Или, может быть, вы используете относительный импорт?

Возможно, ваш путь к задаче неверен, должен быть:

app.conf.beat_schedule = {
    'test-every-30-seconds': {
        'task': 'criptoball.tasks.test_celery',
        'schedule': 30.0,
        'args': (16, 16)
    }, 
}

tasks.test_celery должен быть полным путем : criptoball.tasks.test_celery

person gushitong    schedule 29.03.2018

Вам следует исправить одну вещь: используйте:

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

чтобы сообщить Celery, какие задачи приложений вы хотите, чтобы он обнаруживал, если вы используете Celery 3.x.

person unixia    schedule 03.02.2018