Как получить идентификатор задачи из сельдерея в случае запланированных задач (бит)

Чтобы получить доступ к информации о задаче с сельдереем, мне нужен task_id. Когда задача сельдерея запускается вручную, я могу легко получить идентификатор этой задачи с помощью task.id (и записать его в БД или сделать что-то еще). Если я использую celery-beat, который периодически отправляет задания работнику, это кажется невозможным.

Итак, мой вопрос: как получить идентификатор из задачи в момент, когда beat отправляет задачу работнику сельдерея?

В тот момент, когда работник получает задачу, консоль показывает идентификатор задачи. Поэтому меня беспокоит, что в тот момент, когда задача была отправлена ​​битом работнику, у нее нет идентификатора задачи.

Ручной кейс для получения task_id:

task = tasks.LongRunningTask.delay(username_from_formTargetsLaden, password_from_formTargetsLaden, url_from_formTargetsLaden)

task_id = task.id

Может быть, у кого-то из вас возникла идея?


person chickenshifu    schedule 27.06.2019    source источник
comment
Название вводит в заблуждение. Единственный правильный ответ - нет, вы не можете получить его из-под сельдерея, но вы можете получить его откуда-то еще - либо из отдельного приложения, либо из другого задания, вы даже можете подписаться получать уведомления о событиях Celery (полученная задача для примера).   -  person DejanLekic    schedule 27.06.2019
comment
Спасибо, Деян! Я изменил название, надеюсь, теперь для других оно более понятное. И спасибо за ответ, очень полезно. Я не знал, что есть возможность подписаться на события ... все еще исследую силу сельдерея.   -  person chickenshifu    schedule 27.06.2019
comment
Вы не поверите, насколько хорошо спроектирован Celery - я, конечно, не поверил, когда начал его использовать ... - Замечательная программа.   -  person DejanLekic    schedule 28.06.2019


Ответы (1)


Я нашел ответ на эту маленькую проблему:

Если вам нужен идентификатор задачи, который изначально был отправлен с помощью beat, вы можете просто добавить функцию проверки к своей (запланированной) задаче worker.

Настроить периодическую задачу

Это расписание, которое «напоминает» сельдерей каждый день в 11:08 (всемирное координированное время), чтобы начать задание.

@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    test = sender.add_periodic_task(crontab(minute=8, hour=11), CheckLists.s(app.config['USR'], app.config['PWD']))

Задача должна выполняться периодически

Это запланированная задача, которая будет выполнена сельдереем после того, как рабочий получит «напоминание» от удара.

@celery.task(bind=True)
def CheckLists(self, arg1, arg2):
    #get task_id von scheduled Task 'Check-List'
    i = inspect()
    activetasks = i.active()
    list_of_tasks = {'activetasks': activetasks}
    task_id = list_of_tasks['activetasks']['celery@DESKTOP-XXXXX'][0]['id']   #adapt this section depending on environment (local, webserver, etc...)
    task_type = "CHECK_LISTS"
    task_id_to_db = Tasks(task_id, task_type)
    db.session.add(task_id_to_db)
    db.session.commit()

    long_runnning_task 
    [...more task relevant code here...] 

Итак, я использую app.control.inspect, который позволяет вам проверять запущенные рабочие процессы. Он использует команды дистанционного управления под капотом. С i.active() вы получите словарь, который можно легко разобрать.

Пока я не нахожу документации, как проще получить task_id из периодической задачи, я буду придерживаться этого решения.

После того, как вы сохранили идентификатор задачи, вы можете легко опросить статус задачи и т. Д., Например, через AJAX.

Надеюсь, это поможет вам, ребята :)

person chickenshifu    schedule 27.06.2019