Слияние с дельта-таблицей не работает с java foreachbatch

Я создал дельта-таблицу и теперь пытаюсь вставить данные в эту таблицу с помощью foreachBatch (). Я следил за этим пример. Единственная разница в том, что я использую Java, а не в записной книжке, но, полагаю, это не должно иметь никакого значения?

Мой код выглядит следующим образом:

spark.sql("CREATE TABLE IF NOT EXISTS example_src_table(id int, load_date timestamp) USING DELTA LOCATION '/mnt/delta/events/example_src_table'");

Dataset<Row> exampleDF = spark.sql("SELECT e.id as id, e.load_date as load_date FROM example e");

        try {
            exampleDF
                    .writeStream()
                    .format("delta")
                    .foreachBatch((dataset, batchId) -> {
                        dataset.persist();
                        // Set the dataframe to view name
                        dataset.createOrReplaceTempView("updates");
                        // Use the view name to apply MERGE
                        // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe
                        dataset.sparkSession().sql("MERGE INTO example_src_table e" +
                                " USING updates u" +
                                " ON e.id = u.id" +
                                " WHEN NOT MATCHED THEN INSERT (e.id, e.load_date) VALUES (u.id, u.load_date)");
                    })
                    .outputMode("update")
                    .option("checkpointLocation", "/mnt/delta/events/_checkpoints/example_src_table")
                    .start();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

Этот код работает без проблем, но в дельта-таблицу с URL-адресом '/ mnt / delta / events / example_src_table' не записываются данные. Кто-нибудь знает, что я делаю не так?

Я использую Spark 3.0 и Java 8.

ИЗМЕНИТЬ

Протестировано на записной книжке Databricks с использованием Scala, и затем все заработало.


person RudyVerboven    schedule 04.09.2020    source источник
comment
Вы видели какую-нибудь ошибку? Чтение из таблицы в виде потокового запроса не поддерживается в Apache Spark. Если вы сохраните запрос, возвращенный из start(), и вызовете query.processAllAvailable(), он вызовет исключение, происходящее в фоновом режиме.   -  person zsxwing    schedule 06.09.2020
comment
Нет, я не получаю ошибок. И я не пытаюсь читать здесь дельта-таблицу. Запрос на выборку exampleDF находится во временной таблице, которую я создал из своего потока. Что, по моему опыту, возможно в Apache Spark.   -  person RudyVerboven    schedule 07.09.2020


Ответы (1)


попробуйте следовать синтаксису, подобному следующему, на случай, если вы хотите обновить данные новыми данными

WHEN NOT MATCHED THEN 
    UPDATE SET e.load_date = u.load_date AND  e.id = u.id
    

Если вы хотите добавить только данные, они занимают что-то вроде этого

WHEN NOT MATCHED THEN INSERT *
person Cristián Vargas Acevedo    schedule 09.09.2020