Создавайте динамические задачи в зависимости от результата sql-запроса в воздушном потоке

Я пытаюсь создавать динамические задачи с помощью TaskGroup, сохраняя результат в переменной. Переменная изменяется каждые N минут в зависимости от запроса к базе данных, но когда переменная изменяется во второй раз, планировщик выходит из строя.

В основном мне нужно создавать задачи на основе количества уникальных строк, полученных в запросе.

с TaskGroup (ftask) в качестве задачи:

    data_variable = Variable.get("df")
    data = data_variable

    try :
        if data != False and data !='none':
            df = pd.read_json(data)

            for field_id in df.field.unique():
             

                task1 = PythonOperator(
                   
                )
                task2 = PythonOperator(
                   
                )

               
                task1 >> task2

    except:
        pass

Есть ли способ сделать это с помощью группы задач?


person Francisco    schedule 15.03.2021    source источник


Ответы (1)


Это анти-шаблон для Airflow.

Хотя вы можете использовать Variable.get("df") в верхнем коде, этого делать не следует. Переменные / Соединения / любой другой код, который создает запрос к любой базе данных, должен выполняться только внутри области действия операторов или с использованием шаблонов Jinja. Причина этого в том, что Airflow периодически анализирует файл DAG (каждые 30 секунд, если вы не меняли значение по умолчанию min_file_process_interval), поэтому наличие кода, который взаимодействует с базой данных каждые 30 секунд, вызовет большую нагрузку на эту базу данных. Для некоторых из этих случаев в будущих версиях воздушного потока будет предупреждение (см. PR).

Задачи воздушного потока должны быть как можно более статичными (или медленно меняющимися).

person Elad    schedule 16.03.2021
comment
Да, я понимаю, это было всего лишь возможное решение (которое не сработало) для динамической генерации задач. Кажется идеальным, чтобы группа задач предоставляла xcom или default_args для этого, но кажется, что это пока невозможно: github.com/apache/airflow/issues/13911 Любое другое решение? - person Francisco; 16.03.2021
comment
у вас есть предложения по этому поводу? - person Francisco; 18.03.2021
comment
Это все еще антипаттерн. Airflow не предназначен для создания задачи на основе данных, известных только во время выполнения. - person Elad; 18.03.2021