Требуется обновлять только измененные строки в существующей таблице по сравнению с созданным фреймом данных. Итак, прямо сейчас я вычитаю и получаю измененные строки, но не знаю, как объединить их в существующую таблицу.
old_df = spark.sql("select * from existing table")
diff = new_df.subtract(old_df)
Теперь необходимо вставить фрейм данных diff (если новые строки) или обновить существующие записи
(deltaTable.alias("full_df").merge(
merge_df.alias("append_df"),
"full_df.col1 = append_df.col1 OR full_df.col2 =append_df.col2")
.whenNotMatchedInsertAll()
.execute()
)
Это не обновляет существующие записи (case: значение col2 изменено; col1 не изменено)