Я могу запустить это как отдельное приложение, но у меня проблемы с его работой в Django.
Вот отдельный код:
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks')
app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TIMEZONE='US/Central',
CELERY_ENABLE_UTC=True,
CELERYBEAT_SCHEDULE = {
'test': {
'task': 'tasks.test',
'schedule': crontab(),
},
}
)
@app.task
def test():
with open('test.txt', 'a') as f:
f.write('Hello, World!\n')`
Он питает сервер Rabbitmq и записывает в файл каждую минуту. Это работает как шарм, но когда я пытаюсь заставить его работать в Django, я получаю эту ошибку:
Вы не забыли импортировать модуль, содержащий эту задачу? Или, может быть, вы используете относительный импорт? Пожалуйста, смотрите ____ для получения дополнительной информации.
Полное содержание тела сообщения: {'повторные попытки': 0, 'эта': нет, 'kwargs': {}, 'набор задач': нет, 'лимит времени': [нет, нет], 'обратные вызовы': нет , 'task': 'proj.test', 'args': [], 'expires': нет, 'id': '501ca998-b5eb-4ba4-98a8-afabda9e88dd', 'utc': True, 'errbacks': Нет, «аккорд»: Нет} (246b) Трассировка (последний последний вызов): Файл «/home/user/CeleryDjango/venv/lib/python3.5/site-packages/celery/worker/consumer.py», строка 456, в стратегиях on_task_received [имя] (сообщение, тело, KeyError: 'proj.test' [2016-06-16 01:16:00,051: INFO/Beat] Планировщик: отправка теста выполнения задачи (proj.test) [2016- 06-16 01:16:00,055: ERROR/MainProcess] Получено незарегистрированное задание типа «proj.test».
И это мой код в Django:
# CELERY STUFF
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'US/Central'
CELERYBEAT_SCHEDULE = {
'test': {
'task': 'proj.test',
'schedule': crontab(),
}
}
celery.py
from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
from django.conf import settings # noqa
app = Celery('proj')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
task.py
from __future__ import absolute_import
from celery import shared_task
@shared_task
def test():
with open('test.txt', 'w') as f:
print('Hello, World', file=f)
init.py
from __future__ import absolute_import
from .celery import app as celery_app
Любые мысли по этому поводу очень ценятся. Спасибо.