Использование Spark для записи паркетного файла в s3 поверх s3a очень медленно

Я пытаюсь записать parquet файл в Amazon S3 с помощью Spark 1.6.1. Небольшой parquet, который я генерирую, ~2GB однажды записан, так что данных не так уж и много. Я пытаюсь доказать Spark как платформу, которую я могу использовать.

В основном я собираюсь установить star schema с dataframes, а затем я напишу эти таблицы на паркет. Данные поступают из файлов csv, предоставленных поставщиком, и я использую Spark в качестве ETL платформы. В настоящее время у меня есть кластер с 3 узлами в ec2(r3.2xlarge) Итак 120GB памяти на исполнителях и всего 16 ядер.

Размер входных файлов составляет около 22 ГБ, и сейчас я извлекаю около 2 ГБ этих данных. В конце концов, когда я начну загружать полный набор данных, это будет много терабайт.

Вот моя искра / scala pseudocode:

  def loadStage(): Unit = {
    sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
    sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
    sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
    var sqlCtx = new SQLContext(sc)


    val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")

    //Setup header table/df
    val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
    val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
    val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
    header.registerTempTable("header")
    sqlCtx.cacheTable("header")


    //Setup fact table/df
    val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
    val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
    val df = sqlCtx.createDataFrame(records, factSchema)
    df.registerTempTable("fact")

    val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")


    println(results.count())



    results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")


  }

Для 465884512 строк подсчет занимает около 2 минут. Запись на паркет занимает 38 минут.

Я понимаю, что coalesce выполняет перемешивание с драйвером, который выполняет запись .... но время, которое на это требуется, заставляет меня думать, что я делаю что-то серьезно не так. Без coalesce это все равно займет 15 минут, что, согласно ИМО, все еще слишком долго и дает мне тонну небольших parquet файлов. Я бы хотел иметь один большой файл данных в день, который у меня будет. У меня есть код для разделения по значению поля, и он такой же медленный. Я также пытался вывести это на csv, и это занимает ~ 1 час.

Кроме того, я на самом деле не устанавливаю реквизиты времени выполнения, когда отправляю свою работу. Статистика моей консоли для одной работы:

  • Живые рабочие: 2
  • Количество используемых ядер: всего 16, занято 16
  • Используемая память: всего 117,5 ГБ, занято 107,5 ГБ
  • Заявки: 1 запущено, 5 завершено
  • Драйверы: 0 работает, 0 завершено
  • Статус: ЖИВОЙ

person Brutus35    schedule 29.04.2016    source источник
comment
coalesce не перемещается к драйверу, а перемещается между исполнителями, но это не имеет отношения к проблеме, которую вы наблюдаете. Вы используете EMR? в таком случае используйте s3: //, а не s3a: //. в любом случае в Spark 1.6 вы должны использовать Direct OutputCommitter, как говорит @David. Еще одно возможное улучшение - установить для parquet.enable.summary-metadata значение false.   -  person Tal Joffe    schedule 23.08.2016
comment
Ускоряет ли вообще использование Alluxio перед S3?   -  person James Moore    schedule 13.04.2017


Ответы (4)


Значения по умолчанию Spark вызывают большое количество (возможно) ненужных накладных расходов во время операций ввода-вывода, особенно при записи в S3. В этой статье это обсуждается более подробно, но есть 2 параметра, которые вы захотите изменить.

  • Использование DirectParquetOutputCommitter. По умолчанию Spark сохраняет все данные во временную папку, а затем перемещает эти файлы. Использование DirectParquetOutputCommitter сэкономит время за счет прямой записи в выходной путь S3

    • No longer available in Spark 2.0+
      • As stated in the jira ticket, the current solution is to
        1. Switch your code to using s3a and Hadoop 2.7.2+ ; it's better all round, gets better in Hadoop 2.8, and is the basis for s3guard
        2. Используйте Hadoop FileOutputCommitter и установите для mapreduce.fileoutputcommitter.algorithm.version значение 2

    -Слияние схем отключено по умолчанию в Spark 1.5 Отключите слияние схем. Если объединение схем включено, узел драйвера просканирует все файлы, чтобы обеспечить согласованность схемы. Это особенно дорого, потому что это не распределенная операция. Убедитесь, что это выключено, выполнив

    val file = sqx.read.option("mergeSchema", "false").parquet(path)

person David    schedule 02.05.2016
comment
в Spark 2.0 DirectParquetOutputCommitter больше не доступен. см. SPARK-10063 для нового решения - person Tal Joffe; 23.08.2016
comment
@TalJoffe вы пробовали их решение? Если да, то как это работало? А можно как? - person David; 23.08.2016
comment
Я попробовал, это работает очень хорошо. Я провел небольшой тест с папкой размером 30 г, и производительность была примерно такой же. - person Tal Joffe; 24.08.2016
comment
Если производительность была примерно такой же, разве это не значит, что это не совсем верное решение? - person zzztimbo; 13.10.2016
comment
@zzztimbo Я понял, что его комментарий означал, что указанное обходное решение выполнено, а также устаревший DirectParquetOutputCommitter (и, следовательно, лучше, чем стандартный способ написания паркетных файлов). Но я еще не пробовал. - person David; 13.10.2016
comment
Схема слияния также ложна по умолчанию, начиная с версии 1.5.0 spark .apache.org / docs / latest / - person Kamil Sindi; 10.11.2016

Коммиттер прямого вывода исчез из кодовой базы Spark; вы должны написать свой собственный / воскресить удаленный код в своем собственном JAR. ЕСЛИ вы это сделаете, выключите спекуляции в своей работе и знайте, что другие сбои тоже могут вызвать проблемы, когда проблема заключается в «неверных данных».

Более того, Hadoop 2.8 собирается добавить некоторые ускорения S3A специально для чтения оптимизированных двоичных форматов (ORC, Parquet) из S3; подробности см. в HADOOP-11694. И некоторые люди работают над использованием Amazon Dynamo для согласованного хранилища метаданных, которое должно иметь возможность выполнять надежную фиксацию O (1) в конце работы.

person stevel    schedule 18.10.2016

Один из непосредственных подходов к ускорению записи Spark в S3 - использовать Коммиттер, оптимизированный для EMRFS S3.

Однако, если вы используете s3a, этот коммиттер не может быть использовано:

Когда коммиттер, оптимизированный для EMRFS S3, не используется

Коммиттер не используется при следующих обстоятельствах:

When writing to HDFS

-> When using the S3A file system

When using an output format other than Parquet, such as ORC or text

When using MapReduce or Spark's RDD API

Я тестировал эту разницу на AWS EMR 5.26, и использование s3: // было на 15% -30% быстрее, чем s3a: // (но все равно медленным).

Самый быстрый способ выполнить такое копирование / запись - записать Parquet в локальную файловую систему HDFS, а затем использовать s3distcp для копирования на S3; в одном конкретном сценарии (несколько сотен небольших файлов) это было в 5 раз быстрее, чем запись DataFrame в Parquet непосредственно в S3.

person jmng    schedule 18.09.2019
comment
+1 к идее сначала записать в HDFS, а потом переместить эти файлы на s3 (хотя я использую gnu parallel + команду aws cli вместо s3distcp). Определенно зависит от ваших данных, но это не решение для всего. - person James Moore; 05.05.2020

У меня тоже была эта проблема. В дополнение к сказанному остальными, вот полное объяснение от AWS: https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-форматыпаркетас-the-emrfs-s3-optimized-committer/

Во время моего эксперимента простой переход на FileOutCommiter v2 (с v1) улучшил запись в 3-4 раза.

self.sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")
person Sebastian Brestin    schedule 18.09.2019