Как использовать стратегию стиля pandas split-apply-combin с scala api в Spark?

У меня есть функция scala, которая принимает фрейм данных искры и возвращает одно значение, скажем, двойное. Функция сложная, использует агрегаты, определенные в классе DataFrame, вызывает другие библиотеки Java и не может быть выражена в SQL. Для выполнения вычислений требуется все содержимое фрейма данных, он не может добавлять строку за раз и наращивать ее до результата.

У меня есть большой фрейм данных, который содержит столбец, который я хотел бы использовать, чтобы разбить фрейм данных на небольшие фрагменты и выполнить вышеуказанный расчет для каждого небольшого фрагмента. Затем я хотел бы вернуть новый фрейм данных, содержащий одну строку для каждой группы с двумя столбцами, один из которых содержит значение groupby, а другой - результат.

Это была бы относительно простая задача с использованием PandasUDF, но я не могу понять, как это сделать в Scala.

Я попытался переразбить фрейм данных, используя группу по столбцу, а затем вызвать mapPartitions, однако функция, переданная в mapPartitions, должна иметь подпись Iterator [Row] -> Iterator [X]. Я могу взять Iterator [Row] и создать Seq [Row] или List [Row] достаточно легко, но, похоже, невозможно создать фрейм данных из этого Seq, поскольку вычисления выполняются на рабочих узлах и создание фрейма данных может делать только из драйвера. Чтобы переписать исходную функцию так, чтобы она принимала Seq [Row], потребуется серьезная переработка, поскольку она использует некоторые высокоуровневые функции агрегирования из DataFrame (например, aboutQuantile).

Суть проблемы, по-видимому, в том, что нет понятия «локальный (/ только рабочий / не распределенный) фрейм данных» в отличие от Pandas, где фреймы данных явно ограничены, чтобы быть локальными.

Я упустил что-то очевидное?


person user1894205    schedule 21.04.2020    source источник


Ответы (1)


У меня есть большой фрейм данных, содержащий столбец, который я хотел бы использовать для разделения фрейма данных на небольшие фрагменты и выполнения вышеуказанного вычисления для каждого небольшого фрагмента.

Известны ли значения в этом столбце заранее? Если нет, то можно ли их хотя бы коллекционировать? Допустим, вы можете собирать их, как:

val chunkValues: Array[Any] = df.select("chunk")
  .collect()
  .map(r => r.getAs[Any](0))

Переберите значения, чтобы отфильтровать inputDF несколько раз и выполнить свою тяжелую логику:

val chunkDFs: Array[DataFrame] = chunkValues.map(value => {
  val chunkBeforeDF = inputDF.filter(col("chunk") === value)
  val chunkAfterDF = yourLogic(chunkBefore)
})

Объедините их снова.

person Civyshk    schedule 26.04.2020