Spark Dataset API - присоединиться

Я пытаюсь использовать Dataset API Spark, но я у меня возникли проблемы с простым соединением.

Допустим, у меня есть два набора данных с полями: date | value, тогда в случае DataFrame мое соединение будет выглядеть так:

val dfA : DataFrame
val dfB : DataFrame

dfA.join(dfB, dfB("date") === dfA("date") )

Однако для Dataset существует метод .joinWith, но тот же подход не работает:

val dfA : Dataset
val dfB : Dataset

dfA.joinWith(dfB, ? )

Какой аргумент требует .joinWith?


person mastro    schedule 06.04.2016    source источник


Ответы (3)


Чтобы использовать joinWith, вам сначала нужно создать DataSet, а скорее всего два из них. Чтобы создать DataSet, вам нужно создать класс case, соответствующий вашей схеме, и вызвать DataFrame.as[T], где T - ваш класс case. Так:

case class KeyValue(key: Int, value: String)
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value")
val ds = df.as[KeyValue]
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]

Вы также можете пропустить класс case и использовать кортеж:

val tupDs = df.as[(Int,String)]
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]

Затем, если у вас был другой класс case / DF, например, скажите:

case class Nums(key: Int, num1: Double, num2: Long)
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2")
val ds2 = df2.as[Nums]
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]

Тогда, хотя синтаксис join и joinWith похож, результаты разные:

df.join(df2, df.col("key") === df2.col("key")).show
// +---+-----+---+----+----+
// |key|value|key|num1|num2|
// +---+-----+---+----+----+
// |  1| asdf|  1| 7.7| 101|
// |  2|34234|  2| 1.2|  10|
// +---+-----+---+----+----+

ds.joinWith(ds2, df.col("key") === df2.col("key")).show
// +---------+-----------+
// |       _1|         _2|
// +---------+-----------+
// | [1,asdf]|[1,7.7,101]|
// |[2,34234]| [2,1.2,10]|
// +---------+-----------+

Как видите, joinWith оставляет объекты нетронутыми как части кортежа, а join объединяет столбцы в единое пространство имен. (Что вызовет проблемы в приведенном выше случае, потому что имя столбца «ключ» повторяется.)

Как ни странно, я должен использовать df.col("key") и df2.col("key"), чтобы создать условия для присоединения ds и ds2 - если вы используете только col("key") с обеих сторон, это не сработает, а ds.col(...) не существует. Однако использование оригинального df.col("key") помогает.

person David Griffin    schedule 07.04.2016
comment
детальное объяснение. Только одна путаница. Есть ли лучший способ написать типизированное условие соединения. например, для df.col (key) можем ли мы иметь что-то более безопасное по типу, которое может определить правильность ключа во время компиляции. - person Mohammad Adnan; 10.10.2016
comment
Я полностью согласен, на основе этого синтаксиса нет смысла создавать набор данных, так в чем же преимущество? Не могу смириться с тем, что нет печатной альтернативы .. Какая жалость! - person Sparky; 12.12.2016

Из https://docs.cloud.databricks.com/docs/latest/databricks_guide/05%20Spark/1%20Intro%20Datasets.html

похоже, ты мог бы просто сделать

dfA.as("A").joinWith(dfB.as("B"), $"A.date" === $"B.date" )
person Raghuram Onti Srinivasan    schedule 11.04.2016

В приведенном выше примере вы можете попробовать следующее:

Определите класс case для вашего вывода

case class JoinOutput(key:Int, value:String, num1:Double, num2:Long) 

Объедините два набора данных с помощью Seq("key"), это поможет вам избежать двух повторяющихся ключевых столбцов в выходных данных, что также поможет применить класс case или получить данные на следующем шаге.

val joined = ds.join(ds2, Seq("key")).as[JoinOutput]
// res27: org.apache.spark.sql.Dataset[JoinOutput] = [key: int, value: string ... 2 more fields]

Вместо этого результат будет плоским:

joined.show

+---+-----+----+----+
|key|value|num1|num2|
+---+-----+----+----+
|  1| asdf| 7.7| 101|
|  2|34234| 1.2|  10|
+---+-----+----+----+
person Syntax    schedule 06.10.2017
comment
вы конкретно не отвечаете на вопрос, но подсказка Seq (ключ) выручила меня - person ImDarrenG; 30.11.2017
comment
Вы не отвечаете, как использовать .joinWith, а также .join на самом деле является нетипизированным преобразованием, и в этом случае вы не получаете выгоды от безопасности типов Dataset - person jack; 03.03.2021