Анализировать корень JSON в столбце с помощью Spark-Scala

У меня проблемы с преобразованием корня JSOM в запись во фрейме данных для неопределенного количества записей.

У меня есть фрейм данных, сгенерированный с помощью JSON, подобный следующему:

val exampleJson = spark.createDataset(
  """
  {"ITEM1512":
        {"name":"Yin",
         "address":{"city":"Columbus",
                    "state":"Ohio"}
                    }, 
    "ITEM1518":
        {"name":"Yang",
         "address":{"city":"Working",
                    "state":"Marc"}
                    }
  }""" :: Nil)

Когда я прочитал следующую инструкцию

val itemsExample = spark.read.json(exampleJson)

Сгенерированные схема и фрейм данных следующие:

+-----------------------+-----------------------+
|ITEM1512               |ITEM1518               |
+-----------------------+-----------------------+
|[[Columbus, Ohio], Yin]|[[Working, Marc], Yang]|
+-----------------------+-----------------------+

root
 |-- ITEM1512: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- ITEM1518: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |-- name: string (nullable = true)

Но я хочу создать что-то вроде этого:

+-----------------------+-----------------------+
|Item                   |Values                 |
+-----------------------+-----------------------+
|ITEM1512               |[[Columbus, Ohio], Yin]|
|ITEM1518               |[[Working, Marc], Yang]|
+-----------------------+-----------------------+

Итак, чтобы проанализировать эти данные JSON, мне нужно прочитать все столбцы и добавить их в запись в фрейме данных, потому что в качестве примера я пишу больше двух элементов. Фактически, есть миллионы элементов, которые я хотел бы добавить во фрейм данных.

Я пытаюсь воспроизвести решение, найденное здесь, в: Как проанализируйте данные JSON с помощью Spark-Scala с помощью этого кода:

val columns:Array[String]       = itemsExample.columns
var arrayOfDFs:Array[DataFrame] = Array() 

for(col_name <- columns){

  val temp = itemsExample.selectExpr("explode("+col_name+") as element")
    .select(
      lit(col_name).as("Item"),
      col("element.E").as("Value"))

  arrayOfDFs = arrayOfDFs :+ temp
}

val jsonDF = arrayOfDFs.reduce(_ union _)
jsonDF.show(false)

Но я столкнулся с проблемой, когда в примере, читающем в другом вопросе, корень находится в массиве, в моем случае корнем является StrucType. Поэтому выбрасывается следующее исключение:

org.apache.spark.sql.AnalysisException: невозможно разрешить 'explode (ITEM1512)' из-за несоответствия типа данных: входные данные функции explode должны быть массивом или типом карты, а не структурой, имя: строка>


person jqc    schedule 07.05.2020    source источник
comment
вы очень близки, и вам действительно не нужно взорваться. Просто замените временное выражение на val temp = itemsExample.select(lit(col_name).as("Item"), col(col_name).as("Value"))   -  person abiratsis    schedule 08.05.2020
comment
Возможно, будет проще просто изменить json на array, чем пытаться сделать explode позже. Особенно, если ваш json obj многовложен.   -  person moon    schedule 08.05.2020
comment
Разве это не сработало @jqc? Вы пытались заменить временную переменную?   -  person abiratsis    schedule 09.05.2020
comment
@AlexandrosBiratsis Да, ваше решение тоже сработало.   -  person jqc    schedule 10.05.2020


Ответы (1)


Вы можете использовать функцию stack.

Example:

itemsExample.selectExpr("""stack(2,'ITEM1512',ITEM1512,'ITEM1518',ITEM1518) as (Item,Values)""").
show(false)
//+--------+-----------------------+
//|Item    |Values                 |
//+--------+-----------------------+
//|ITEM1512|[[Columbus, Ohio], Yin]|
//|ITEM1518|[[Working, Marc], Yang]|
//+--------+-----------------------+

ОБНОВЛЕНИЕ:

Dynamic Stack query:

val stack=df.columns.map(x => s"'${x}',${x}").mkString(s"stack(${df.columns.size},",",",")as (Item,Values)")
//stack(2,'ITEM1512',ITEM1512,'ITEM1518',ITEM1518) as (Item,Values)

itemsExample.selectExpr(stack).show()
//+--------+-----------------------+
//|Item    |Values                 |
//+--------+-----------------------+
//|ITEM1512|[[Columbus, Ohio], Yin]|
//|ITEM1518|[[Working, Marc], Yang]|
//+--------+-----------------------+
person Shu    schedule 08.05.2020
comment
Привет! Замечу, что несмотря на то, что это решение в порядке. Мне нужно объединить фрейм данных с разными схемами. Поэтому я должен сделать что-то подобное, как сказал @AlexandrosBiratsis выше. Но я столкнулся с проблемой с профсоюзом. Я задал еще один вопрос, если вы хотите увидеть: stackoverflow.com/questions/61739858/ - person jqc; 12.05.2020