Q1) Здесь я читаю несколько файлов (которые присутствуют в структуре папок выше). Я считаю, что в этом случае каждый файл будет создан как раздел и будет отправлен на отдельный узел и выполнен параллельно. Я прав в своем понимании? Кто-нибудь может это подтвердить? Или в любом случае я могу подтвердить это систематически?
ОТВЕЧАТЬ :
Метод TextFile SparkContext, т. е. sc.textFile, создает RDD с каждой строкой в качестве элемента. Если в данных 10 файлов, то есть папка yourtextfilesfolder
, будет создано 10 разделов. Вы можете проверить количество разделов:
yourtextfilesfolder.partitions.length
Однако секционирование определяется местоположением данных. Это может привести к тому, что по умолчанию будет слишком мало разделов. Насколько я знаю, нет гарантии, что будет создан один раздел, см. код 'SparkContext.textFile'.
& 'minPartitions' - предлагаемое минимальное количество разделов для результирующего RDD
Для лучшего понимания см. ниже метод.
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
вы можно упомянуть minPartitions
, как показано выше, из SparkContext.scala
Q2) Как искра обрабатывает этот случай. хотя я собираю, я предполагаю, что он будет собирать не весь контент из всех файлов, а только один файл. Я прав? Может ли кто-нибудь помочь мне понять это?
ОТВЕТ: Ваш rdd состоит из нескольких текстовых файлов. поэтому collect будет собирать драйвер со всех разделов из разных файлов, а не по одному файлу за раз.
вы можете проверить: используя rdd.collect
Однако, если вы хотите прочитать несколько текстовых файлов, вы также можете использовать wholeTextFiles
, см. примечание @в приведенном ниже методе. Маленькие файлы предпочтительны, большие файлы также допустимы, но это может привести к снижению производительности.
См. spark-core-sc-textfile-vs-sc- целые текстовые файлы
Док:
RDD> fullTextFiles(String path, int minPartitions) Чтение каталога текстовых файлов из HDFS, локальной файловой системы (доступной на всех узлах) или любого URI файловой системы, поддерживаемого Hadoop.
/**
* Read a directory of text files from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI. Each file is read as a single record and returned in a
* key-value pair, where the key is the path of each file, the value is the content of each file.
*
* <p> For example, if you have the following files:
* {{{
* hdfs://a-hdfs-path/part-00000
* hdfs://a-hdfs-path/part-00001
* ...
* hdfs://a-hdfs-path/part-nnnnn
* }}}
*
* Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`,
*
* <p> then `rdd` contains
* {{{
* (a-hdfs-path/part-00000, its content)
* (a-hdfs-path/part-00001, its content)
* ...
* (a-hdfs-path/part-nnnnn, its content)
* }}}
*
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
* @note On some filesystems, `.../path/*` can be a more efficient way to read all files
* in a directory rather than `.../path/` or `.../path`
* @note Partitioning is determined by data locality. This may result in too few partitions
* by default.
*
* @param path Directory to the input data files, the path can be comma separated paths as the
* list of inputs.
* @param minPartitions A suggestion value of the minimal splitting number for input data.
* @return RDD representing tuples of file path and the corresponding file content
*/
def wholeTextFiles(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
.....
}
Примеры :
val distFile = sc.textFile("data.txt")
Above command returns the content of the file:
scala> distFile.collect()
res16: Array[String] = Array(1,2,3, 4,5,6)
SparkContext.wholeTextFiles can return (filename, content).
val distFile = sc.wholeTextFiles("/tmp/tmpdir")
scala> distFile.collect()
res17: Array[(String, String)] =
Array((maprfs:/tmp/tmpdir/data3.txt,"1,2,3
4,5,6
"), (maprfs:/tmp/tmpdir/data.txt,"1,2,3
4,5,6
"), (maprfs:/tmp/tmpdir/data2.txt,"1,2,3
4,5,6
"))
В вашем случае я бы предпочел SparkContext.wholeTextFiles
, где вы можете получить имя файла и его содержимое после сбора, как описано выше, если это то, что вы хотели.
person
Ram Ghadiyaram
schedule
18.12.2016