Применение функции к столбцу в группе в фрейме данных PySpark

У меня есть такой фрейм данных PySpark,

+----------+--------+---------+
|id_       | p      |   a     |
+----------+--------+---------+
|  1       | 4      |   12    |
|  1       | 3      |   14    |
|  1       | -7     |   16    |
|  1       | 5      |   11    |
|  1       | -20    |   90    |
|  1       | 5      |   120   |
|  2       |  11    |   267   |
|  2       | -98    |   124   |
|  2       | -87    |   120   |
|  2       | -1     |   44    |
|  2       |  5     |   1     |
|  2       |  7     |   23    |
-------------------------------

У меня также есть такая функция Python,

def fun(x):
    total = 0
    result = np.empty_like(x)
    for i, y in enumerate(x):
        total += (y)
        if total < 0:
            total = 0
        result[i] = total

    return result

Я хочу сгруппировать фрейм данных PySpark в столбце id_ и применить функцию fun к столбцу p.

Я хочу что-то вроде

spark_df.groupBy('id_')['p'].apply(fun)

В настоящее время я делаю это с помощью pandas udf с помощью pyarrow, что неэффективно с точки зрения времени для моего приложения.

Результат, который я ищу:

[4, 7, 0, 5, 0, 5, 11, -98, -87, -1, 5, 7]

Это результирующий фрейм данных, который я ищу,

+----------+--------+---------+
|id_       | p      |   a     |
+----------+--------+---------+
|  1       | 4      |   12    |
|  1       | 7      |   14    |
|  1       | 0      |   16    |
|  1       | 5      |   11    |
|  1       | 0      |   90    |
|  1       | 5      |   120   |
|  2       |  11    |   267   |
|  2       | 0      |   124   |
|  2       | 0      |   120   |
|  2       | 0      |   44    |
|  2       |  5     |   1     |
|  2       |  12    |   23    |
-------------------------------

Есть ли прямой способ сделать это с помощью самих API pyspark?

Я могу агрегировать и объединять p в список, используя collect_list при группировке на id_, использовать udf поверх этого и использовать explode, чтобы получить столбец p, как мне было нужно, во фрейме данных результата.

Но как сохранить другие столбцы, которые есть в моем фрейме данных?


person Sreeram TP    schedule 22.08.2019    source источник


Ответы (2)


Да, вы можете преобразовать указанную выше функцию Python в UDF Pyspark. Поскольку вы возвращаете массив целых чисел, важно указать тип возвращаемого значения как ArrayType(IntegerType()).

Ниже приведен код,

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType, collect_list

@udf(returnType=ArrayType(IntegerType()))
def fun(x):
    total = 0
    result = np.empty_like(x)
    for i, y in enumerate(x):
        total += (y)
        if total < 0:
            total = 0
        result[i] = total
    return result.tolist()    # Convert NumPy Array to Python List

Поскольку вход в ваш udf должен быть списком, давайте сгруппируем данные на основе «id» и преобразуем строки в массивы.

df = df.groupBy('id_').agg(collect_list('p'))
df = df.toDF('id_', 'p_')    # Assign a new alias name 'p_'
df.show(truncate=False)

Входные данные:

+---+------------------------+
|id_|collect_list(p)         |
+---+------------------------+
|1  |[4, 3, -7, 5, -20, 5]   |
|2  |[11, -98, -87, -1, 5, 7]|
+---+------------------------+

Затем мы применяем udf к этим данным,

df.select('id_', fun(df.p_)).show(truncate=False)

Выход:

+---+--------------------+
|id_|fun(p_)             |
+---+--------------------+
|1  |[4, 7, 0, 5, 0, 5]  |
|2  |[11, 0, 0, 0, 5, 12]|
+---+--------------------+
person noufel13    schedule 22.08.2019
comment
Но я хочу, чтобы вывод был в столбце. Думаю, я могу использовать для этого взрыв. См. Обновленный Q для ожидаемого фрейма данных - person Sreeram TP; 22.08.2019

Мне удалось добиться нужного мне результата с помощью следующих шагов:

Мой DataFrame выглядит так,

+---+---+---+
|id_|  p|  a|
+---+---+---+
|  1|  4| 12|
|  1|  3| 14|
|  1| -7| 16|
|  1|  5| 11|
|  1|-20| 90|
|  1|  5|120|
|  2| 11|267|
|  2|-98|124|
|  2|-87|120|
|  2| -1| 44|
|  2|  5|  1|
|  2|  7| 23|
+---+---+---+

Я сгруппирую по фрейму данных на id_ и соберу столбец, в котором я хочу применить функцию к списку, используя collect_list, и применить такую ​​функцию,

agg_df = df.groupBy('id_').agg(F.collect_list('p').alias('collected_p'))
agg_df = agg_df.withColumn('new', fun('collected_p'))

Теперь я хочу каким-то образом объединить agg_df с моим исходным фреймом данных. Для этого я сначала воспользуюсь разнесением, чтобы получить значения в столбце new в строках.

agg_df = agg_df.withColumn('exploded', F.explode('new'))

Для слияния я буду использовать monotonically_increasing_id, чтобы сгенерировать id для исходного фрейма данных и agg_df. Из этого я сделаю idx для каждого фрейма данных, потому что monotonically_increasing_id не будет одинаковым для обоих фреймов данных.

agg_df = agg_df.withColumn('id_mono', F.monotonically_increasing_id())
df = df.withColumn('id_mono', F.monotonically_increasing_id())

w = Window().partitionBy(F.lit(0)).orderBy('id_mono')

df = df.withColumn('idx', F.row_number().over(w))
agg_df = agg_df.withColumn('idx', F.row_number().over(w))

df = df.join(agg_df.select('idx', 'exploded'), ['idx']).drop('id_mono', 'idx')


+---+---+---+--------+
|id_|  p|  a|exploded|
+---+---+---+--------+
|  1|  4| 12|       4|
|  1|  3| 14|       7|
|  1| -7| 16|       0|
|  1|  5| 11|       5|
|  1|-20| 90|       0|
|  1|  5|120|       5|
|  2| 11|267|      11|
|  2|-98|124|       0|
|  2|-87|120|       0|
|  2| -1| 44|       0|
|  2|  5|  1|       5|
|  2|  7| 23|      12|
+---+---+---+--------+

Я не уверен, что это простой способ сделать. Было бы здорово, если бы кто-нибудь мог предложить какие-либо оптимизации для этого.

person Sreeram TP    schedule 25.08.2019