Приводит ли чтение нескольких файлов и сбор их к драйверу в искре

Фрагмент кода:

val inp = sc.textFile("C:\\mk\\logdir\\foldera\\foldera1\\log.txt").collect.mkString(" ")

Я знаю, что приведенный выше код читает весь файл, объединяет их в одну строку и выполняет его узел драйвера (одиночное выполнение, а не параллельное).

 val inp = sc.textFile("C:\\mk\\logdir\\*\\*\\log.txt")
 code block{ }
 sc.stop

Q1) Здесь я читаю несколько файлов (которые присутствуют в структуре папок выше). Я считаю, что в этом случае каждый файл будет создан как раздел и будет отправлен на отдельный узел и выполнен параллельно. Я прав в своем понимании? Кто-нибудь может это подтвердить? Или в любом случае я могу подтвердить это систематически?

val inp = sc.textFile("C:\\mk\\logdir\\*\\*\\log.txt")
val cont = inp.collect.mkString(" ")
 code block{ }
 sc.stop

Q2) Как искра обрабатывает этот случай. хотя я собираю, я предполагаю, что он будет собирать не весь контент из всех файлов, а только один файл. Я прав? Может ли кто-нибудь помочь мне понять это?

Заранее большое спасибо за ваше время и помощь.


person user7264473    schedule 18.12.2016    source источник
comment
Пожалуйста, проверьте мой ответ. Надеюсь, это поможет! Я думаю, вы ищете целые текстовые файлы...   -  person Ram Ghadiyaram    schedule 18.12.2016
comment
ага. Это Рам. Спасибо. Я пытаюсь найти обходной путь для моей ситуации.   -  person user7264473    schedule 18.12.2016
comment
Я считаю, что приведенные здесь ответы в целом правильны, но неверны в данном конкретном случае. Вы читаете с локального диска `C:`. Все операции будут локальными.   -  person maasg    schedule 18.12.2016


Ответы (2)


Q1) Здесь я читаю несколько файлов (которые присутствуют в структуре папок выше). Я считаю, что в этом случае каждый файл будет создан как раздел и будет отправлен на отдельный узел и выполнен параллельно. Я прав в своем понимании? Кто-нибудь может это подтвердить? Или в любом случае я могу подтвердить это систематически?

ОТВЕЧАТЬ :

Метод TextFile SparkContext, т. е. sc.textFile, создает RDD с каждой строкой в ​​качестве элемента. Если в данных 10 файлов, то есть папка yourtextfilesfolder, будет создано 10 разделов. Вы можете проверить количество разделов:

yourtextfilesfolder.partitions.length

Однако секционирование определяется местоположением данных. Это может привести к тому, что по умолчанию будет слишком мало разделов. Насколько я знаю, нет гарантии, что будет создан один раздел, см. код 'SparkContext.textFile'.

& 'minPartitions' - предлагаемое минимальное количество разделов для результирующего RDD

Для лучшего понимания см. ниже метод.

/**
           * Read a text file from HDFS, a local file system (available on all nodes), or any
           * Hadoop-supported file system URI, and return it as an RDD of Strings.
           */
          def textFile(
              path: String,
              minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
            assertNotStopped()
            hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
              minPartitions).map(pair => pair._2.toString).setName(path)
          }

вы можно упомянуть minPartitions, как показано выше, из SparkContext.scala

Q2) Как искра обрабатывает этот случай. хотя я собираю, я предполагаю, что он будет собирать не весь контент из всех файлов, а только один файл. Я прав? Может ли кто-нибудь помочь мне понять это?

ОТВЕТ: Ваш rdd состоит из нескольких текстовых файлов. поэтому collect будет собирать драйвер со всех разделов из разных файлов, а не по одному файлу за раз.

вы можете проверить: используя rdd.collect


Однако, если вы хотите прочитать несколько текстовых файлов, вы также можете использовать wholeTextFiles, см. примечание @в приведенном ниже методе. Маленькие файлы предпочтительны, большие файлы также допустимы, но это может привести к снижению производительности.

См. spark-core-sc-textfile-vs-sc- целые текстовые файлы

Док:

RDD> fullTextFiles(String path, int minPartitions) Чтение каталога текстовых файлов из HDFS, локальной файловой системы (доступной на всех узлах) или любого URI файловой системы, поддерживаемого Hadoop.

/**
   * Read a directory of text files from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI. Each file is read as a single record and returned in a
   * key-value pair, where the key is the path of each file, the value is the content of each file.
   *
   * <p> For example, if you have the following files:
   * {{{
   *   hdfs://a-hdfs-path/part-00000
   *   hdfs://a-hdfs-path/part-00001
   *   ...
   *   hdfs://a-hdfs-path/part-nnnnn
   * }}}
   *
   * Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`,
   *
   * <p> then `rdd` contains
   * {{{
   *   (a-hdfs-path/part-00000, its content)
   *   (a-hdfs-path/part-00001, its content)
   *   ...
   *   (a-hdfs-path/part-nnnnn, its content)
   * }}}
   *
   * @note Small files are preferred, large file is also allowable, but may cause bad performance.
   * @note On some filesystems, `.../path/&#42;` can be a more efficient way to read all files
   *       in a directory rather than `.../path/` or `.../path`
   * @note Partitioning is determined by data locality. This may result in too few partitions
   *       by default.
   *
   * @param path Directory to the input data files, the path can be comma separated paths as the
   *             list of inputs.
   * @param minPartitions A suggestion value of the minimal splitting number for input data.
   * @return RDD representing tuples of file path and the corresponding file content
   */
  def wholeTextFiles(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
.....
  }

Примеры :

val distFile = sc.textFile("data.txt")
Above command returns the content of the file:
scala> distFile.collect()
res16: Array[String] = Array(1,2,3, 4,5,6)


 SparkContext.wholeTextFiles can return (filename, content).
    val distFile = sc.wholeTextFiles("/tmp/tmpdir")

scala> distFile.collect()
res17: Array[(String, String)] =
Array((maprfs:/tmp/tmpdir/data3.txt,"1,2,3
4,5,6
"), (maprfs:/tmp/tmpdir/data.txt,"1,2,3
4,5,6
"), (maprfs:/tmp/tmpdir/data2.txt,"1,2,3
4,5,6
"))

В вашем случае я бы предпочел SparkContext.wholeTextFiles, где вы можете получить имя файла и его содержимое после сбора, как описано выше, если это то, что вы хотели.

person Ram Ghadiyaram    schedule 18.12.2016

Spark — это быстрый универсальный движок для крупномасштабной обработки данных. Он обрабатывает все данные параллельно. Итак, чтобы ответить на первый вопрос, затем Да в следующем случае:

val inp = sc.textFile("C:\\mk\\logdir\\*\\*\\log.txt")
code block{ }
sc.stop

Каждый файл будет создан как раздел и будет отправлен на отдельный узел и выполнен параллельно. Но, в зависимости от размера файла, количество разделов может быть больше, чем количество обрабатываемых файлов. Например, если log.txt в folder1 и folder2 имеют размер несколько КБ, то будет создано только 2 раздела, так как будет 2 файла и они будут обрабатываться параллельно.

Но если log.txt в folder1 имеет размер в ГБ, то для него будет создано несколько разделов, и количество разделов будет больше, чем количество файлов.

Однако мы всегда можем изменить количество разделов RDD, используя метод repartition() или coalesce().

Чтобы ответить на второй вопрос, тогда в следующем случае:

val inp = sc.textFile("C:\\mk\\logdir\\*\\*\\log.txt")
val cont = inp.collect.mkString(" ")
code block{ }
sc.stop

Spark будет собирать контент из всех файлов, а не только из одного файла. Так как collect() означает получить весь контент из сохраненного RDD и вернуть его драйверу в виде коллекции.

person himanshuIIITian    schedule 18.12.2016
comment
Спасибо, Химаншу. Во втором случае все мои файлы весят всего около 50 МБ. Но мне нужно обработать около 600 файлов. Есть ли способ выполнить один файл/исполнитель? - person user7264473; 18.12.2016
comment
пожалуйста, проверьте вариант полнотекстовых файлов. - person Ram Ghadiyaram; 18.12.2016
comment
Согласен, в этом случае wholeTextFiles будет лучшим вариантом. - person himanshuIIITian; 18.12.2016