Я создал дельта-таблицу и теперь пытаюсь вставить данные в эту таблицу с помощью foreachBatch (). Я следил за этим пример. Единственная разница в том, что я использую Java, а не в записной книжке, но, полагаю, это не должно иметь никакого значения?
Мой код выглядит следующим образом:
spark.sql("CREATE TABLE IF NOT EXISTS example_src_table(id int, load_date timestamp) USING DELTA LOCATION '/mnt/delta/events/example_src_table'");
Dataset<Row> exampleDF = spark.sql("SELECT e.id as id, e.load_date as load_date FROM example e");
try {
exampleDF
.writeStream()
.format("delta")
.foreachBatch((dataset, batchId) -> {
dataset.persist();
// Set the dataframe to view name
dataset.createOrReplaceTempView("updates");
// Use the view name to apply MERGE
// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
dataset.sparkSession().sql("MERGE INTO example_src_table e" +
" USING updates u" +
" ON e.id = u.id" +
" WHEN NOT MATCHED THEN INSERT (e.id, e.load_date) VALUES (u.id, u.load_date)");
})
.outputMode("update")
.option("checkpointLocation", "/mnt/delta/events/_checkpoints/example_src_table")
.start();
} catch (TimeoutException e) {
e.printStackTrace();
}
Этот код работает без проблем, но в дельта-таблицу с URL-адресом '/ mnt / delta / events / example_src_table' не записываются данные. Кто-нибудь знает, что я делаю не так?
Я использую Spark 3.0 и Java 8.
ИЗМЕНИТЬ
Протестировано на записной книжке Databricks с использованием Scala, и затем все заработало.
start()
, и вызоветеquery.processAllAvailable()
, он вызовет исключение, происходящее в фоновом режиме. - person zsxwing   schedule 06.09.2020