Задача Django-celery и транзакция django

У меня вопрос относительно транзакций и задач сельдерея. Так что для меня не секрет, что, конечно, если у вас есть транзакция и задача сельдерея, обращающаяся к одной и той же таблице/записям, у нас будет состояние гонки.

Однако рассмотрим следующий фрагмент кода:

def f(self):
   # function of module that inherits from models.Model
   self.field_a = datetime.now()

   self.save()
   transaction.commit_unless_managed()

   # depending on the configuration of this module
   # this might return None or a datetime object.
   eta = self.get_task_eta()

   if eta:
       celery_task_do_something.apply_async(args=(self.pk, self.__class__),
                                            eta=eta)
   else:
       celery_task_do_something.delay(self.pk, self.__class__)

Вот задача сельдерея:

def celery_task_do_something(pk, cls):

    o = cls.objects.get(pk=pk)

    if o.field_a:
        # perform something
        return True
    return False

Как видите, перед созданием задачи мы вызываем transaction.commit_unless_managed и она должна зафиксироваться, так как транзакция django в данный момент не управляется.

Однако при запуске задачи сельдерея поле field_a не устанавливается.

Мой вопрос:

Поскольку мы делаем коммит перед созданием задачи, возможно ли, что есть состояние гонки?

Дополнительная информация

  • Мы используем Postgres версии 9.1.

  • Каждая транзакция выполняется с уровнем изоляции READ COMMITTED.

  • На другой БД с движком dowant.lib.db.backends.postgresql_psycopg2_debugger уже установлен field_a и задача работает как положено. С двигателем dowant.lib.db.backends.postgresql_psycopg2_hstore_ready появляется описанная проблема (не уверен, что это связано с двигателем).

  • Сельдерей версии 2.2

  • Пробовал разные базы. Все то же самое поведение, за исключением случаев, когда двигатели меняются. Вот почему я упомянул об этом.

Большое спасибо.


person Fred    schedule 13.09.2012    source источник
comment
Должен ли быть self.save() перед transaction.commit_unless_managed()?   -  person Steven    schedule 13.09.2012
comment
хорошо, на самом деле есть. Простите за это.   -  person Fred    schedule 13.09.2012


Ответы (2)


Попробуйте добавить self.__class__.objects.select_for_update().get(pk=self.pk) перед save и посмотрите, что получится.

Он должен блокировать все операции чтения в эту строку до тех пор, пока не будет выполнена фиксация.

person Krzysztof Szularz    schedule 28.10.2012

Это поздно, но с django 1.9

transaction.on_commit(lambda: enqueue_atask()))
person Trinh Hoang Nhu    schedule 28.06.2019