Как настроить CELERYBEAT_SCHEDULE в настройках Django?

Я могу запустить это как отдельное приложение, но у меня проблемы с его работой в Django.

Вот отдельный код:

from celery import Celery
from celery.schedules import crontab


app = Celery('tasks')
app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_RESULT_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_TIMEZONE='US/Central',
    CELERY_ENABLE_UTC=True,
    CELERYBEAT_SCHEDULE = {
    'test': {
        'task': 'tasks.test',
        'schedule': crontab(),
        },
    }
)

@app.task
def test():
    with open('test.txt', 'a') as f:
        f.write('Hello, World!\n')`

Он питает сервер Rabbitmq и записывает в файл каждую минуту. Это работает как шарм, но когда я пытаюсь заставить его работать в Django, я получаю эту ошибку:

Вы не забыли импортировать модуль, содержащий эту задачу? Или, может быть, вы используете относительный импорт? Пожалуйста, смотрите ____ для получения дополнительной информации.

Полное содержание тела сообщения: {'повторные попытки': 0, 'эта': нет, 'kwargs': {}, 'набор задач': нет, 'лимит времени': [нет, нет], 'обратные вызовы': нет , 'task': 'proj.test', 'args': [], 'expires': нет, 'id': '501ca998-b5eb-4ba4-98a8-afabda9e88dd', 'utc': True, 'errbacks': Нет, «аккорд»: Нет} (246b) Трассировка (последний последний вызов): Файл «/home/user/CeleryDjango/venv/lib/python3.5/site-packages/celery/worker/consumer.py», строка 456, в стратегиях on_task_received [имя] (сообщение, тело, KeyError: 'proj.test' [2016-06-16 01:16:00,051: INFO/Beat] Планировщик: отправка теста выполнения задачи (proj.test) [2016- 06-16 01:16:00,055: ERROR/MainProcess] Получено незарегистрированное задание типа «proj.test».

И это мой код в Django:

# CELERY STUFF
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'US/Central'
CELERYBEAT_SCHEDULE = {
    'test': {
        'task': 'proj.test',
        'schedule': crontab(),
    }
}

celery.py

from __future__ import absolute_import

import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
from django.conf import settings  # noqa

app = Celery('proj')

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

task.py

from __future__ import absolute_import
from celery import shared_task


@shared_task
def test():
    with open('test.txt', 'w') as f:
        print('Hello, World', file=f)

init.py

from __future__ import absolute_import

from .celery import app as celery_app 

Любые мысли по этому поводу очень ценятся. Спасибо.


person a_Fraley    schedule 16.06.2016    source источник


Ответы (1)


Почему бы вам не попробовать следующее и сообщить мне, сработало ли это для вас или нет. Это работает для меня.

В настройках.py

CELERYBEAT_SCHEDULE = {
    'my_scheduled_job': {
        'task': 'run_scheduled_jobs', # the same goes in the task name
        'schedule': crontab(),
    },
}

А в tasks.py..

from celery.task import task # notice the import of task and not shared task. 

@task(name='run_scheduled_jobs') # task name found! celery will do its job
def run_scheduled_jobs():
    # do whatever stuff you do
    return True

Но если вы ищете shared_task, то...

@shared_task(name='my_shared_task') # name helps celery identify the functions it has to run
def my_shared_task():
    # do what you want here..
    return True

Я использую общую задачу для асинхронных заданий. Поэтому мне нужно вызвать ее из функции, подобной следующей.

в views.py/или в любом месте.py в приложении вашего проекта

def some_function():
    my_shared_task.apply_async(countdown= in_seconds)
    return True

и на всякий случай, если вы забыли, не забудьте включить свое приложение, в котором вы пытаетесь запустить задачи.

INSTALLED_APPS = [...
 'my_app'...
] # include app

Я уверен, что этот подход работает нормально. Спасибо.

person d-coder    schedule 16.06.2016
comment
Большое спасибо. Похоже, это работает и подтвердило мое первоначальное подозрение, что ошибка связана с @shared_tasks и тем, что Celery не может видеть задачу из-за. Спасибо еще раз; это сработало. - person a_Fraley; 16.06.2016
comment
Это заслуживает плюса? : P Также отметьте это как ответ, чтобы он помог кому-то еще в будущем :) - person d-coder; 16.06.2016
comment
К сожалению, на данный момент from celery.task import task устарел, а shared_task предназначен для использования в библиотеках. - person webtweakers; 14.03.2018
comment
Добрый день, уже много дней пытаюсь справиться с подобной задачей. Не могли бы вы помочь мне с этим вопросом, я был бы очень признателен stackoverflow.com/questions/62801096/ - person Марсель Абдулли&; 10.07.2020