Как получить идентификатор задания или результат воздушного потока DataFlowJavaOperator ()?

Я использую DataFlowJavaOperator () в воздушном потоке (Cloud Composer). Есть ли способ получить идентификатор выполненного задания потока данных в следующей задаче PythonOperator? Я хотел бы использовать job_id для вызова команды gcloud, чтобы получить результат работы.

def check_dataflow(ds, **kwargs)
  # here I want to execute gloud command with the job ID to get job result.
  # gcloud dataflow jobs describe <JOB_ID>

t1 = DataFlowJavaOperator(
    task_id='task1'
    jar='gs://path/to/jar/abc.jar',
    options={
        'stagingLocation': "gs://stgLocation/",
        'tempLocation': "gs://tmpLocation/",
    },
    provide_context=True
    dag=dag,
 )

t2 = PythonOperator(
    task_id='task2',
    python_callable=check_dataflow,
    provide_context=True
    dag=dag,
)


t1 >> t2

person SnoU    schedule 07.02.2019    source источник
comment
Я вижу, что ваш operator другой, но посмотрите, можете ли вы понять намеки из этого   -  person y2k-shubham    schedule 07.02.2019
comment
Один возможный совет: запишите идентификатор в хранилище и позвольте следующему оператору читать из него.   -  person Rui Wang    schedule 07.02.2019


Ответы (1)


Судя по всему, параметр job_name в DataFlowJavaOperator переопределяется task_id. К имени задания будет добавлен префикс задачи и добавлен случайный суффикс идентификатора. Если вы по-прежнему хотите иметь имя задания Dataflow, которое на самом деле отличается от идентификатора задачи, вы можете жестко добавить его в Java-код Dataflow:

options.setJobName("jobNameInCode")

Затем, используя PythonOperator, вы можете получить идентификатор задания из префикса (либо имя задания, указанное в коде, либо иначе идентификатор задачи Composer), как я объяснил здесь. Вкратце перечислите вакансии с:

result = dataflow.projects().locations().jobs().list(
  projectId=project,
  location=location,
).execute()

а затем отфильтруйте по префиксу, где job_prefix - это job_name, определенный при запуске задания:

for job in result['jobs']:
  if re.findall(r'' + re.escape(job_prefix) + '', job['name']):
    job_id = job['id']
    break

Оператор break нужен для того, чтобы гарантировать, что мы получим только последнее задание с этим именем, которое должно быть только что запущенным.

person Guillem Xercavins    schedule 07.02.2019
comment
Спасибо за быстрый ответ. Да, это действительно похожий подход, который я сейчас использую для получения идентификатора вакансии. Я заметил, что task.id- ‹случайное буквенно-цифровое› воздушного потока назначается имени задания потока данных. Поэтому я использую task.id в качестве префикса имени задания. Я не знал, что есть параметр job_name для DataFlowOperator, поэтому я просто попробовал, но оказалось, что job_name игнорируется, и все еще task.id- ‹случайный буквенно-цифровой› используется для имени задания потока данных, я не знаю почему. Кроме того, согласно журналу задач воздушного потока, параметр job_name будет устаревшим в Airflow 2.0. - person SnoU; 08.02.2019
comment
Но как только вы знаете название задания, задача PythonOperator работает нормально, верно? Другой подход - установить имя задания в коде Java, который я только что протестировал и работает (например, options.setJobName("jobNameInCode");). - person Guillem Xercavins; 09.02.2019
comment
Установка имени задания в java работает отлично! Я добавил дополнительный параметр, например customDataflowJobName, в параметр DataflowJavaOperator, поймал и использовал его на конце java. Итак, теперь я могу определять уникальное имя задания для каждого задания потока данных и проверять эти результаты в следующем операторе. Спасибо за помощь! - person SnoU; 13.02.2019
comment
Спасибо за подтверждение. Я отредактировал свой ответ, чтобы он был самодостаточным - person Guillem Xercavins; 14.02.2019