Крюк друида воздушного потока не работает

Я пытаюсь использовать druid hook для загрузки данных из hdfs в druid, ниже мой скрипт dag:

from datetime import datetime, timedelta
import json
from airflow.hooks import HttpHook, DruidHook
from airflow.operators import PythonOperator
from airflow.models import DAG

def check_druid_con():
 dr_hook = DruidHook(druid_ingest_conn_id='DRUID_INDEX',druid_query_conn_id='DRUID_QUERY')
 dr_hook.load_from_hdfs("druid_airflow","hdfs://xx.xx.xx.xx/demanddata/demand2.tsv","stay_date",["channel","rate"],"2016-12-11/2017-12-13",1,-1,metric_spec=[{ "name" : "count", "type" : "count" }],hadoop_dependency_coordinates="org.apache.hadoop:hadoop-client:2.7.3")

default_args = {
    'owner': 'TC',
    'start_date': datetime(2017, 8, 7),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}
dag = DAG('druid_data_load', default_args=default_args)
druid_task1=PythonOperator(task_id='check_druid',
                   python_callable=check_druid_con,
                   dag=dag)

Я продолжаю получать ошибку, TypeError: load_from_hdfs() принимает не менее 10 аргументов (10 задано). Однако я дал 10 аргументов для load_from_hdfs, но все равно выдает ошибку. Пожалуйста помоги.

С уважением Рахул


person Rahul    schedule 09.08.2017    source источник


Ответы (1)


Проблема заключалась в определении функции в документах воздушного потока и фактическом определении в коде:

In documents it's defined as :
load_from_hdfs(datasource, static_path, ts_dim, columns, intervals, num_shards, target_partition_size, metric_spec=None, hadoop_dependency_coordinates=None)

But in reality the function definition is :
def load_from_hdfs(
            self, datasource, static_path,  ts_dim, columns,
            intervals, num_shards, target_partition_size, **query_granularity, segment_granularity**,
            metric_spec=None, hadoop_dependency_coordinates=None)

Передача правильных аргументов в соответствии с определением избавила меня от этой ошибки.

person Rahul    schedule 10.08.2017