как использовать группу в наборе данных spark

Я использую набор данных Spark (версия Spark 1.6.1). Ниже мой код

object App { 

val conf = new SparkConf()
.setMaster("local")
.setAppName("SparkETL")

val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc);
import sqlContext.implicits._

}

override def readDataTable(tableName:String):DataFrame={
val dataFrame= App.sqlContext.read.jdbc(JDBC_URL, tableName, JDBC_PROP);
return dataFrame;
}


case class Student(stud_id , sname , saddress)
case class Student(classid, stud_id, name)


var tbl_student = JobSqlDAO.readDataTable("tbl_student").filter("stud_id = '" + studId + "'").as[Student].as("tbl_student")

var tbl_class_student = JobSqlDAO.readDataTable("tbl_class_student").as[StudentClass].as("tbl_class_student")


 var result = tbl_class_student.joinWith(tbl_student, $"tbl_student.stud_id" === $"tbl_class_student.stud_id").as("ff")

теперь я хотел выполнить предложение group by для нескольких столбцов? Как это сделать? result.groupBy(_._1._1.created_at) вот так можно? если да, то я не могу видеть результат как группу, а также как это сделать для нескольких столбцов?


person Swadeshi    schedule 14.06.2016    source источник


Ответы (1)


Если я правильно понял ваши требования, лучше всего использовать функцию reduceByKey в PairRDDFunctions.

Подпись функции — def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)], и это просто означает, что вы используете ряд пар ключ/значение.

Позвольте мне объяснить рабочий процесс:

  1. Вы получаете набор, с которым вам нужно работать (в вашем коде: result)
  2. С помощью функции RDD map вы разделяете результирующий набор на кортеж, содержащий два подкортежа, содержащих поля, составляющие ключ, и поля, которые вы хотите агрегировать (пример: result.map(row => ((row.key1, row.key2), (row.value1, row.value2)))
  3. Теперь у вас есть RDD[(K,V)], где тип K — тип кортежа ключевых полей, а V — тип кортежа полей значений.
  4. Вы можете напрямую использовать reduceByKey, передав функцию типа (V,V) => V, которая агрегирует значения (пример: (agg: (Int, Int), val: (Int, Int)) => (agg._1 + val._1, agg._2 + val._2))

Пожалуйста, обрати внимание:

  • Вы должны вернуть тот же тип значения из функции агрегации
  • Вы должны импортировать org.apache.spark.SparkContext._ для автоматического использования служебных функций PairRDDFunctions
  • То же самое относится и к groupBy, вам нужно сопоставить начальный RDD с парой RDD[K,V], но у вас нет агрегатной функции, потому что вы просто сохраняете значения в последовательности для дальнейших вычислений.
  • Если вам нужно начальное значение для агрегации (пример: 0 для подсчета), используйте вместо этого функцию foldByKey
person Vincenzo Maggio    schedule 27.06.2016