Использование функций Hive в Spark Job через hiveContext

Я использую Hive 1.2 и Spark 1.4.1. Следующий запрос отлично работает через Hive CLI:

hive> select row_number() over (partition by one.id order by two.id) as sk,
two.id, two.name, one.name, current_date() 
from avant_source.one one 
inner join avant_source.two two 
on one.id = two.one_id;

но когда я пытаюсь использовать его через HiveContext в задании pyspark, это дает мне ошибку:

py4j.protocol.Py4JJavaError: An error occurred while calling o26.sql.
: java.lang.RuntimeException: Couldn't find function current_date

Фрагмент кода:

from pyspark import HiveContext

conf = SparkConf().setAppName('DFtest')
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)

df = sqlContext.sql("select row_number() over (partition by one.id order by two.id) as sk, two.id, two.name, one.name, current_date() from avant_source.one one inner join avant_source.two two on one.id = two.one_id")

df.show()

sc.stop()

Есть ли способ получить текущую дату или метку времени в pyspark? Я пытался импортировать дату, дату и время, но он всегда выдает ошибку о том, что функция не найдена.

Я пытался использовать current_date во фреймах данных в песочнице pyspark 1.5, но также получаю другую ошибку.

df = sqlContext.createDataFrame([(current_date,)],[‘d’])
df.select(date_sub(df.d,1).alias('d')).collect()

Ошибка:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/mapr/spark/spark-1.5.2/python/pyspark/sql/dataframe.py", line 769, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/opt/mapr/spark/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/opt/mapr/spark/spark-1.5.2/python/pyspark/sql/utils.py", line 40, in deco
    raise AnalysisException(s.split(': ', 1)[1])
pyspark.sql.utils.AnalysisException: cannot resolve 'datesub(d,1)' due to data type mismatch: argument 1 requires date type, however, 'd' is of struct<> type.;

Пожалуйста, порекомендуйте.


person learning    schedule 06.04.2016    source источник
comment
почему вы используете F.current_date()?   -  person eliasah    schedule 06.04.2016
comment
Я пытался использовать функции импорта из pyspark.sql как F, потому что простой current_date() не работал. А то тоже выдавало ошибку, а Ф. забыл вынуть.   -  person learning    schedule 06.04.2016


Ответы (1)


Для моего сценария я использовал следующее

import datetime 
now =  datetime.datetime.now()
df = df.withColumn('eff_start', lit(now.strftime("%Y-%m-%d")))

Из-за ошибки, связанной с невозможностью корректного использования HiveContext для HiveQL для функций Hive, это была проблема с кластером, когда на одном из узлов, на котором работал HiveServer2, было слишком много предупреждений из-за проблем с памятью. Это было причиной проблемы. Он был успешно протестирован в песочнице MapR под управлением Spark 1.5 и Hive 1.2.

person learning    schedule 06.04.2016