Обновление массива структур — Spark

У меня есть следующая структура таблицы дельта искры,

+---+------------------------------------------------------+
|id |addresses                                             |
+---+------------------------------------------------------+
|1  |[{"Address":"ABC", "Street": "XXX"}, {"Address":"XYZ", "Street": "YYY"}]|
+---+------------------------------------------------------+

Здесь столбец адресов представляет собой массив структур.

Мне нужно обновить первый адрес внутри массива как XXX из значения атрибутов улицы без изменения второго элемента в списке.

Таким образом, ABC должен быть обновлен до XXX, а XYZ должен быть обновлен до YYY.

Вы можете предположить, что у меня так много атрибутов в структуре, как улица, почтовый индекс и т. д., поэтому я хочу оставить их нетронутыми и просто обновить значение атрибута Address from Street.

Как я могу сделать это в Spark, Databricks или Sql?

схема,

|-- id: string (nullable = true)
|-- addresses: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- Address: string (nullable = true)
 |    |    |    |    |-- Street: string (nullable = true)

Ваше здоровье!


person mani_nz    schedule 23.06.2020    source источник
comment
@mani_nz какую версию искры вы используете?   -  person koiralo    schedule 23.06.2020
comment
@koiralo Искра 2.4.5   -  person mani_nz    schedule 23.06.2020
comment
вы обновляете значение столбца Street в Address, это правильно??   -  person Srinivas    schedule 23.06.2020


Ответы (1)


Пожалуйста, проверьте код ниже.

scala> vdf.show(false)
+---+--------------+
|id |addresses     |
+---+--------------+
|1  |[[ABC], [XYZ]]|
+---+--------------+


scala> vdf.printSchema
root
 |-- id: integer (nullable = false)
 |-- addresses: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Address: string (nullable = true)

scala> val new_address = array(struct(lit("AAA").as("Address")))

scala> val except_first = array_except($"addresses",array($"addresses"(0)))

scala> val addresses = array_union(new_address,except_first).as("addresses")

scala> vdf.select($"id",addresses).select($"id",$"addresses",to_json($"addresses").as("json_addresses")).show(false)

+---+--------------+-------------------------------------+
|id |addresses     |json_addresses                       |
+---+--------------+-------------------------------------+
|1  |[[AAA], [XYZ]]|[{"Address":"AAA"},{"Address":"XYZ"}]|
+---+--------------+-------------------------------------+

Обновлено

scala> vdf.withColumn("addresses",explode($"addresses")).groupBy($"id").agg(collect_list(struct($"addresses.Street".as("Address"),$"addresses.Street")).as("addresses")).withColumn("json_data",to_json($"addresses")).show(false)
+---+------------------------+-------------------------------------------------------------------+
|id |addresses               |json_data                                                          |
+---+------------------------+-------------------------------------------------------------------+
|1  |[[XXX, XXX], [YYY, YYY]]|[{"Address":"XXX","Street":"XXX"},{"Address":"YYY","Street":"YYY"}]|
+---+------------------------+-------------------------------------------------------------------+
person Srinivas    schedule 23.06.2020
comment
Спасибо, но на самом деле я имел в виду другую проблему. Моя фактическая структура имеет довольно много атрибутов, и я не могу установить все из них, поскольку каждый из них может иметь разные значения. Я обновлю вопрос, поскольку понимаю, почему это сбивает с толку. Извинения! - person mani_nz; 23.06.2020
comment
Вам не нужно устанавливать столбцы.. только первый, который вам нужно изменить - person Srinivas; 23.06.2020
comment
Теперь я правильно обновил вопрос. Значение, которое я хочу обновить, не является статическим, а должно исходить из другого атрибута структуры. - person mani_nz; 23.06.2020