У меня есть такой фрейм данных PySpark,
+----------+--------+---------+
|id_ | p | a |
+----------+--------+---------+
| 1 | 4 | 12 |
| 1 | 3 | 14 |
| 1 | -7 | 16 |
| 1 | 5 | 11 |
| 1 | -20 | 90 |
| 1 | 5 | 120 |
| 2 | 11 | 267 |
| 2 | -98 | 124 |
| 2 | -87 | 120 |
| 2 | -1 | 44 |
| 2 | 5 | 1 |
| 2 | 7 | 23 |
-------------------------------
У меня также есть такая функция Python,
def fun(x):
total = 0
result = np.empty_like(x)
for i, y in enumerate(x):
total += (y)
if total < 0:
total = 0
result[i] = total
return result
Я хочу сгруппировать фрейм данных PySpark в столбце id_
и применить функцию fun
к столбцу p
.
Я хочу что-то вроде
spark_df.groupBy('id_')['p'].apply(fun)
В настоящее время я делаю это с помощью pandas udf с помощью pyarrow
, что неэффективно с точки зрения времени для моего приложения.
Результат, который я ищу:
[4, 7, 0, 5, 0, 5, 11, -98, -87, -1, 5, 7]
Это результирующий фрейм данных, который я ищу,
+----------+--------+---------+
|id_ | p | a |
+----------+--------+---------+
| 1 | 4 | 12 |
| 1 | 7 | 14 |
| 1 | 0 | 16 |
| 1 | 5 | 11 |
| 1 | 0 | 90 |
| 1 | 5 | 120 |
| 2 | 11 | 267 |
| 2 | 0 | 124 |
| 2 | 0 | 120 |
| 2 | 0 | 44 |
| 2 | 5 | 1 |
| 2 | 12 | 23 |
-------------------------------
Есть ли прямой способ сделать это с помощью самих API pyspark?
Я могу агрегировать и объединять p
в список, используя collect_list
при группировке на id_
, использовать udf
поверх этого и использовать explode
, чтобы получить столбец p
, как мне было нужно, во фрейме данных результата.
Но как сохранить другие столбцы, которые есть в моем фрейме данных?