Обновить только измененные строки таблицы данных pyspark delta table

Требуется обновлять только измененные строки в существующей таблице по сравнению с созданным фреймом данных. Итак, прямо сейчас я вычитаю и получаю измененные строки, но не знаю, как объединить их в существующую таблицу.

old_df = spark.sql("select * from existing table")
diff = new_df.subtract(old_df)

Теперь необходимо вставить фрейм данных diff (если новые строки) или обновить существующие записи

(deltaTable.alias("full_df").merge(
    merge_df.alias("append_df"),
    "full_df.col1 = append_df.col1 OR full_df.col2 =append_df.col2") 
  .whenNotMatchedInsertAll() 
  .execute()
)

Это не обновляет существующие записи (case: значение col2 изменено; col1 не изменено)


person harshini    schedule 25.09.2020    source источник
comment
Запишите свои данные во временную таблицу и используйте обновление вставки jdbc.   -  person Lamanus    schedule 26.09.2020


Ответы (1)


.whenMatchedUpdateAll() принимает условие, которое можно использовать для сохранения неизмененных строк:

(deltaTable.alias("full_df").merge(
    merge_df.alias("append_df"),
    "full_df.col1 = append_df.col1 OR full_df.col2 = append_df.col2") 
  .whenNotMatchedInsertAll()
  .whenMatchedUpdateAll("full_df.col1 != append_df.col1 OR full_df.col2 != append_df.col2")
  .execute()
)
person marat    schedule 11.10.2020