Рассчитать дельта-время в Pyspark 2.0.1 с помощью python 2.6.6

У меня есть DF, состоящий из двух столбцов, у которых есть время, и я хочу вычислить дельта-время между ними.

следующий DF является образцом исходного DF:

+-------------------+-------------------+
|               time|              time2|
+-------------------+-------------------+
|2017-01-13 00:17:21|2017-01-13 14:08:03|
|2017-01-13 14:08:08|2017-01-13 14:08:03|
|2017-01-13 14:08:59|2017-01-13 14:08:03|
|2017-01-13 04:21:42|2017-01-13 14:08:03|
+-------------------+-------------------+

схема ДФ выглядит следующим образом:

root
 |-- time: string (nullable = true)
 |-- time2: string (nullable = true)

Я использовал следующий метод:

import pyspark.sql.types as typ
import pyspark.sql.functions as fn
from pyspark.sql.functions import udf
import datetime
from time import  mktime, strptime

def diffdates(t1, t2):
    #Date format: %Y-%m-%d %H:%M:%S
    delta= ((mktime(strptime(t1,"%Y-%m-%d %H:%M:%S")) - mktime(strptime(t2, "%Y-%m-%d %H:%M:%S"))))
    return (delta)



dt = udf(diffdates, typ.IntegerType())
Time_Diff = df.withColumn('Diff',(dt(df.time,df.time2)))

Результирующий новый столбец имеет нулевое значение следующим образом:

+-------------------+-------------------+----+
|               time|              time2|Diff|
+-------------------+-------------------+----+
|2017-01-13 00:17:21|2017-01-13 14:08:03|null|
|2017-01-13 14:08:08|2017-01-13 14:08:03|null|
|2017-01-13 14:08:59|2017-01-13 14:08:03|null|
|2017-01-13 04:21:42|2017-01-13 14:08:03|null|
+-------------------+-------------------+----+

что мне делать?


person Ahmad Senousi    schedule 21.01.2018    source источник
comment
проверьте это, stackoverflow.com/questions/30283415/   -  person Suresh    schedule 21.01.2018
comment
Я уже использовал метод unix_timestamp и также получил нулевое значение. Кроме того, я использовал метод, описанный во втором ответе, он уже дает правильное значение, но когда я отфильтровал результирующий кадр данных, чтобы получить строки, которые имеют минимальное значение, существующее в столбце дельта-времени (Diff) для каждой группы в столбце time2. Я получил эту ошибку TypeError: аргумент strptime() 1 должен быть строкой, а не None   -  person Ahmad Senousi    schedule 21.01.2018
comment
В вашем кадре данных столбцами являются time и time1 , тогда как здесь Time_Diff = df.withColumn('Diff',(dt(T_GPS_On_fi.time, T_GPS_On_fi.uptime))) вы используете время безотказной работы. Мы что-то упускаем?   -  person Suresh    schedule 21.01.2018


Ответы (1)


Это то, что я устал, и это работает для меня. Дайте мне знать, если я что-то пропущу,

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.getOrCreate()
>>> l = [('2017-01-13 00:17:21','2017-01-13 14:08:03'),('2017-01-13 14:08:08','2017-01-13 14:08:03'),('2017-01-13 14:08:59','2017-01-13 14:08:03'),('2017-01-13 04:21:42','2017-01-13 14:08:03')]
>>> df = spark.createDataFrame(l,['time1','time2'])
>>> df1 = df.select(df.time1.cast('timestamp'),df.time2.cast('timestamp'))
>>> df1.show()
+--------------------+--------------------+
|               time1|               time2|
+--------------------+--------------------+
|2017-01-13 00:17:...|2017-01-13 14:08:...|
|2017-01-13 14:08:...|2017-01-13 14:08:...|
|2017-01-13 14:08:...|2017-01-13 14:08:...|
|2017-01-13 04:21:...|2017-01-13 14:08:...|
+--------------------+--------------------+

>>> from pyspark.sql import functions as F
>>> timeFmt = "yyyy-MM-dd'T'HH:mm:ss.SSS"
>>> timeDiff = (F.unix_timestamp('time1', format=timeFmt) - F.unix_timestamp('time2', format=timeFmt))
>>> df1 = df1.withColumn("delta",timeDiff) ## delta is in unit of seconds
>>> df1.show(truncate=False)
+---------------------+---------------------+------+
|time1                |time2                |delta |
+---------------------+---------------------+------+
|2017-01-13 00:17:21.0|2017-01-13 14:08:03.0|-49842|
|2017-01-13 14:08:08.0|2017-01-13 14:08:03.0|5     |
|2017-01-13 14:08:59.0|2017-01-13 14:08:03.0|56    |
|2017-01-13 04:21:42.0|2017-01-13 14:08:03.0|-35181|
+---------------------+---------------------+------+
>>> df1.groupby('time2').agg(F.min('delta')).show()
+--------------------+----------+
|               time2|min(delta)|
+--------------------+----------+
|2017-01-13 14:08:...|    -49842|
+--------------------+----------+
person Suresh    schedule 21.01.2018
comment
Спасибо. Ваш метод работает хорошо. - person Ahmad Senousi; 21.01.2018