Прочитать путь к файлу из темы Kafka, а затем прочитать файл и записать в DeltaLake в структурированной потоковой передаче

У меня есть случай использования, когда путь к файлу json-записей, хранящихся в s3, поступает как сообщение kafka в kafka. Мне нужно обработать данные с помощью искровой структурированной потоковой передачи.

Дизайн, который я подумал, выглядит следующим образом:

  1. В структурированной потоковой передаче kafka Spark прочтите сообщение, содержащее путь к данным.
  2. Собрать запись сообщения в драйвере. (Сообщения небольшие по размеру)
  3. Создайте фрейм данных из местоположения данных.
kafkaDf.select($"value".cast(StringType))
       .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {
  //rough code
  //collect to driver
  val records = batchDf.collect()
  //create dataframe and process
  records foreach((rec: Row) =>{
    println("records:######################", rec.toString())
    val path = rec.getAs[String]("data_path")
    val dfToProcess = spark.read.json(path)
    ....
  })
}

Хотелось бы узнать ваше мнение, подходит ли такой подход? В частности, если есть проблема с созданием Dataframe после вызова collect. Если есть лучший подход, дайте мне знать то же самое.


person Amit Joshi    schedule 18.01.2021    source источник


Ответы (1)


Ваша идея отлично работает.

Собственно, сбор данных для драйвера является обязательным. В противном случае вы не сможете создать распределенный набор данных, вызывая SparkSession для каждого исполнителя. Без collect вы получите исключение NullPointerException.

Я немного переписал ваш скелет кода, а также реализовал часть о том, как записать ваш Dataframe в дельта-таблицу (на основе вашего другого вопрос). Вдобавок я использую Dataset[String] вместо Dataframe[Row], что немного облегчает жизнь.

Использование Spark 3.0.1 с delta-core 0.7.0 работает нормально. В качестве примера мой тестовый файл выглядит как

{"a":"foo1","b":"bar1"}
{"a":"foo2","b":"bar2"}

Я отправил расположение этого файла в тему Kafka под названием test и применил следующий код для анализа файла и записи его столбцов (на основе заданной схемы) в дельта-таблицу, используя приведенный ниже код:

  val spark = SparkSession.builder()
    .appName("KafkaConsumer")
    .master("local[*]")
    .getOrCreate()

  val jsonSchema = new StructType()
    .add("a", StringType)
    .add("b", StringType)

  val deltaPath = "file:///tmp/spark/delta/test"

  import spark.implicits._
  val kafkaDf = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test")
    .option("startingOffsets", "latest")
    .option("failOnDataLoss", "false")
    .load()
    .selectExpr("CAST(value AS STRING) as data_path")
    .as[String]

  kafkaDf.writeStream.foreachBatch((batchDf:Dataset[String], batchId:Long) => {
    // collect to driver
    val records = batchDf.collect()

    // create dataframe based on file location and process and write to Delta-Lake
    records.foreach((path: String) => {
      val dfToProcess = spark.read.schema(jsonSchema).json(path)
      dfToProcess.show(false) // replace this line with your custom processing logic
      dfToProcess.write.format("delta").save(deltaPath)
    })
  }).start()

  spark.streams.awaitAnyTermination()

Результат вызова show такой, как ожидалось:

+----+----+
|a   |b   |
+----+----+
|foo1|bar1|
|foo2|bar2|
+----+----+

и данные были записаны в виде дельта-таблицы в место, указанное в deltaPath

/tmp/spark/delta/test$ ll
total 20
drwxrwxr-x 3 x x 4096 Jan 20 13:37 ./
drwxrwxr-x 3 x x 4096 Jan 20 13:37 ../
drwxrwxr-x 2 x x 4096 Jan 20 13:37 _delta_log/
-rw-r--r-- 1 x x  595 Jan 20 13:37 part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet
-rw-r--r-- 1 x x   16 Jan 20 13:37 .part-00000-b6a540ec-7e63-4d68-a09a-405142479cc1-c000.snappy.parquet.crc

person mike    schedule 20.01.2021
comment
извините, стек не разрешил редактировать комментарий. Если вы можете высказать свое мнение о ресурсах. Я имею в виду, поскольку мы, по сути, создаем множество независимых искровых заданий внутри потокового микропакета. И он не может запускать все эти параллельно. Как распределяются ресурсы внутри этих рабочих мест? Другое, что это может привести к задержкам в темах Kafka, поскольку следующий триггер будет ждать завершения текущего. - person Amit Joshi; 20.01.2021