Таблица разделов поверх папок, содержащих подпапки, содержащие файлы json в spark

Я работаю над искрой в Databricks. У меня есть точка монтирования для моего хранилища, указывающая на мой каталог. Назовем каталог как /mnt/abc1/abc2 — путь. Допустим, в этом каталоге abc2 у меня есть 10 папок с именами xyz1 .. xyz10. Все эти папки xyz% содержат файлы json, назовем их xyz1_1.json и так далее. Мне нужно создать таблицу, чтобы я мог получить доступ к своему json в таблице spark, указав ее как путь + abc2.xyz1.xyz1_1.json

var path = "/mnt/abc1/"
var data = spark.read.json(path)

Это работает, когда файлы json находятся непосредственно внутри пути, а не внутри папок на нашем пути. Я хочу найти способ, который может автоматически обнаруживать базовые папки и подпапки, содержащие jsons, и строить поверх них таблицу.


person Himanshu Kaushik    schedule 07.06.2021    source источник


Ответы (2)



Попробуйте код ниже.

import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
import org.apache.spark.sql.SparkSession
import scala.util.{Failure, Success, Try}

case class Hdfs(fs: FileSystem) {
  implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
    case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
      override def hasNext: Boolean = remoteIterator.hasNext
      override def next(): T = remoteIterator.next()
    }
    wrapper(remoteIterator)
  }
  def listFiles(path: String): List[String] = {
    Try(
      fs
        .listFiles(new Path(path), true)
        .toList
        .map(_.getPath)
        .filter(!_.toString.contains("_spark_metadata"))
        .map(_.toString)
    ) match {
      case Success(files) => files
      case Failure(ex) => Nil
    }
  }
}

Получите объект hdfs, используя сеанс spark.

val hdfs = Hdfs(FileSystem.get(spark.sparkContext.hadoopConfiguration))

Получить список файлов рекурсивно, используя функцию listFiles.

val files = hdfs.listFiles("/mnt/abc1/")

Проверьте, доступны ли файлы в пути hdfs.

if(!files.isEmpty) val data = spark.read.json(files:_*)

person Srinivas    schedule 08.06.2021