Сведение вложенных кортежей в RDD

Я использую Spark SQL для извлечения строк из таблицы. Некоторые из этих данных повторяются, и я пытаюсь подсчитать их количество. По сути, я пытаюсь выполнить базовый пример «подсчета слов», но вместо того, чтобы мои данные были в форме: (Word : String, Count : Int), у нас есть строка данных, заменяющая слово/строку.

В частности, мои данные выглядят так: RDD[((row), count)], где строка извлекается из таблицы sql и содержит строки, двойные числа, целые числа и т. д.

Он в форме RDD, потому что я хочу использовать reduceByKey. См. статью Избегайте использования groupByKey. Это пара (Key, Value) с очень длинным ключом (какая-то строка из базы данных sql), а ее значением является «количество слов».

Мое приложение делает это:

myDataframe
    // Append a 1 to each row
    .map(row => (row, 1))
    // Convert to RDD so we can use the reduceByKey method
    .rdd
    // Add up the 1's corresponding to matching keys
    .reduceByKey(_ + _)
    //Filter by rows that show up more than 10 times
    .filter(_._2 > 100)

    ...

Теперь предположим, что мои данные строки содержат (string, double, int). Здесь я хочу распаковать свои данные из RDD[((string, double, int), count)] в RDD[(string, double, int, count)], чтобы в конечном итоге сохранить эти данные в другой таблице SQL.

Есть ли какой-нибудь метод, который позволяет мне распаковать содержимое этого... вложенного кортежа... что-то вроде этого?

Мое решение состояло в том, чтобы «распаковать» элементы RDD следующим образом: .map(row => (row._1._1, row._1._2, row._1._3, row._2))

Но должен быть лучший способ! Если я решу получить больше элементов из строки, мне придется изменить этот вызов .map().

Спасибо!


person R. Gosman    schedule 30.05.2018    source источник


Ответы (2)


Вы можете использовать toSeq и fromSeq Row, как в следующем примере:

val df = Seq(
  ("a", 10.0, 1),
  ("a", 10.0, 1),
  ("b", 20.0, 2),
  ("c", 30.0, 3),
  ("c", 30.0, 3)
).toDF("c1", "c2", "c3")

import org.apache.spark.sql.Row

df.rdd.
  map((_, 1)).
  reduceByKey(_ + _).
  filter(_._2 > 1).
  map{
    case (row: Row, count: Int) => Row.fromSeq(row.toSeq :+ count)
  }
// res1: Array[org.apache.spark.sql.Row] = Array([a,10.0,1,2], [c,30.0,3,2])
person Leo C    schedule 30.05.2018
comment
Спасибо за совет! Я наткнулся на несколько похожее предложение, но мне показалось забавным создать последовательность... просто для извлечения элементов из этой последовательности на следующем шаге. Похоже, это скорее патч, чем решение. Я также не хотел переписывать код, например: _._2 в _._3, если мои столбцы когда-либо менялись. - person R. Gosman; 31.05.2018
comment
Row похож на Tuple, в котором элементы могут быть разных типов, и добавление (или удаление) элемента к Tuple не так тривиально, как сделать это к Seq. Вот почему Row оснащен to/fromSeq методами для удовлетворения таких потребностей. В этом случае они будут принимать любую строку, когда происходит преобразование, поэтому изменение кода не требуется, если строка была изменена. Эти методы также широко используются в других случаях использования (например, создание непрерывных индексов в фрейме данных). - person Leo C; 31.05.2018

Вам НЕ нужно возвращаться к использованию RDD; Статья, на которую вы правильно ссылаетесь, предостерегает от использования RDD.groupByKey, но ее не следует применять к DataFrame groupBy. Безопасно (и эффективно) использовать groupBy в DataFrame! Подробнее см. здесь.

Итак, чтобы сгруппировать по всем столбцам DataFrame, подсчитать количество вхождений каждой группы и отфильтровать группы с количеством > 10, вы можете просто использовать:

df.groupBy(df.columns.map(col): _*) // alternatively: df.groupBy(df.columns.head, df.columns.tail: _*)
  .count()
  .filter($"count" > 10)

Результат имеет схему, аналогичную входной, с дополнительным длинным столбцом count.

person Tzach Zohar    schedule 30.05.2018
comment
Спасибо, что указали на это! Посты о производительности разные было очень интересно читать. Я вижу, что в вашем примере вы подсчитываете количество элементов в группе и фильтруете популярные, но я не уверен, почему вы делаете карту для «col». Я не думаю, что это заполнитель для имени столбца... но я получаю синтаксическую ошибку. - person R. Gosman; 31.05.2018
comment
есть два метода groupBy: один ожидает аргумент типа cols: Column*, что означает серию объектов Column, которые я использую, и другой, который ожидает два аргумента: (col1: String, cols: String*), что означает имя первого столбца, а затем остальные имена; Вы можете использовать любой из них, но я нахожу первый более удобным, поскольку он не требует отдельного прохождения первого столбца; Простой вызов groupBy(df.columns: _*) также не соответствует этим подписям (распространенная ошибка — см. аналогичную проблему с select: stackoverflow.com/a/36131805/5344058< /а>). - person Tzach Zohar; 31.05.2018
comment
... Оказывается, я просто должен был import org.apache.spark.sql.functions._ иметь гораздо больше смысла; Я думал, что происходит какое-то вуду с «колом» в качестве заполнителя. Оказывается, это просто хорошая старая функция. Спасибо за разъяснение и дальнейшее чтение! - person R. Gosman; 31.05.2018