Apache Spark/Azure Data Lake Storage — обработать файл ровно один раз, пометить файл как обработанный

У меня есть контейнер Azure Data Lake Storage, который служит целевой областью для файлов JSON для обработки Apache Spark.

Там десятки тысяч маленьких (до нескольких мегабайт) файлов. Код Spark регулярно читает эти файлы и выполняет некоторые преобразования.

Я хочу, чтобы файлы читались ровно один раз, а скрипт Spark был идемпотентным. Как сделать так, чтобы файлы не читались снова и снова? Как сделать это эффективно?

Я читаю данные так:

spark.read.json("/mnt/input_location/*.json")

Я думал о следующих подходах:

  1. Создайте дельта-таблицу с именами файлов, которые уже были обработаны, и запустите преобразование EXCEPT во входном кадре данных.
  2. Переместите обработанные файлы в другое место (или переименуйте их). Я бы предпочел этого не делать. В случае, если мне нужно повторно обработать данные, мне нужно снова запустить переименование, эта операция занимает много времени.

Я надеюсь, что есть лучший способ. Пожалуйста, предложите что-нибудь.


person BuahahaXD    schedule 28.04.2021    source источник


Ответы (1)


Вы можете использовать задание структурированной потоковой передачи с включенными контрольными точками и Trigger.Once.

Файлы контрольных точек этого задания будут отслеживать файлы JSON, которые уже использовались заданием. Кроме того, триггер Trigger.Once сделает это потоковое задание, как если бы это было пакетное задание.

Есть хорошая статья от Блок данных, объясняющий, почему потоковая передача и RunOnce лучше, чем пакетная обработка.

Ваше структурированное потоковое задание может выглядеть следующим образом:

val checkpointLocation = "/path/to/checkpoints"
val pathToJsonFiles = "/mnt/input_location/"
val streamDF = spark.readStream.format("json").schema(jsonSchema).load(pathToJsonFiles)

val query = streamDF
  .[...] // apply your processing
  .writeStream
  .format("console") // change sink format accordingly
  .option("checkpointLocation", checkpointLocation)
  .trigger(Trigger.Once)
  .start()

query.awaitTermination()
person mike    schedule 28.04.2021