Ваша идея отлично работает.
Собственно, сбор данных для драйвера является обязательным. В противном случае вы не сможете создать распределенный набор данных, вызывая SparkSession для каждого исполнителя. Без collect
вы получите исключение NullPointerException.
Я немного переписал ваш скелет кода, а также реализовал часть о том, как записать ваш Dataframe в дельта-таблицу (на основе вашего другого вопрос). Вдобавок я использую Dataset[String]
вместо Dataframe[Row]
, что немного облегчает жизнь.
Использование Spark 3.0.1 с delta-core 0.7.0 работает нормально. В качестве примера мой тестовый файл выглядит как
{"a":"foo1","b":"bar1"}
{"a":"foo2","b":"bar2"}
Я отправил расположение этого файла в тему Kafka под названием test и применил следующий код для анализа файла и записи его столбцов (на основе заданной схемы) в дельта-таблицу, используя приведенный ниже код:
val spark = SparkSession.builder()
.appName("KafkaConsumer")
.master("local[*]")
.getOrCreate()
val jsonSchema = new StructType()
.add("a", StringType)
.add("b", StringType)
val deltaPath = "file:///tmp/spark/delta/test"
import spark.implicits._
val kafkaDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(value AS STRING) as data_path")
.as[String]
kafkaDf.writeStream.foreachBatch((batchDf:Dataset[String], batchId:Long) => {
// collect to driver
val records = batchDf.collect()
// create dataframe based on file location and process and write to Delta-Lake
records.foreach((path: String) => {
val dfToProcess = spark.read.schema(jsonSchema).json(path)
dfToProcess.show(false) // replace this line with your custom processing logic
dfToProcess.write.format("delta").save(deltaPath)
})
}).start()
spark.streams.awaitAnyTermination()
Результат вызова show
такой, как ожидалось:
+----+----+
|a |b |
+----+----+
|foo1|bar1|
|foo2|bar2|
+----+----+
и данные были записаны в виде дельта-таблицы в место, указанное в deltaPath
/tmp/spark/delta/test$ ll
total 20
drwxrwxr-x 3 x x 4096 Jan 20 13:37 ./
drwxrwxr-x 3 x x 4096 Jan 20 13:37 ../
drwxrwxr-x 2 x x 4096 Jan 20 13:37 _delta_log/
-rw-r--r-- 1 x x 595 Jan 20 13:37 part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet
-rw-r--r-- 1 x x 16 Jan 20 13:37 .part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet.crc
person
mike
schedule
20.01.2021