Я пытаюсь записать parquet
файл в Amazon S3
с помощью Spark 1.6.1
. Небольшой parquet
, который я генерирую, ~2GB
однажды записан, так что данных не так уж и много. Я пытаюсь доказать Spark
как платформу, которую я могу использовать.
В основном я собираюсь установить star schema
с dataframes
, а затем я напишу эти таблицы на паркет. Данные поступают из файлов csv, предоставленных поставщиком, и я использую Spark в качестве ETL
платформы. В настоящее время у меня есть кластер с 3 узлами в ec2(r3.2xlarge)
Итак 120GB
памяти на исполнителях и всего 16 ядер.
Размер входных файлов составляет около 22 ГБ, и сейчас я извлекаю около 2 ГБ этих данных. В конце концов, когда я начну загружать полный набор данных, это будет много терабайт.
Вот моя искра / scala pseudocode
:
def loadStage(): Unit = {
sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
var sqlCtx = new SQLContext(sc)
val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")
//Setup header table/df
val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
header.registerTempTable("header")
sqlCtx.cacheTable("header")
//Setup fact table/df
val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
val df = sqlCtx.createDataFrame(records, factSchema)
df.registerTempTable("fact")
val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")
println(results.count())
results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")
}
Для 465884512 строк подсчет занимает около 2 минут. Запись на паркет занимает 38 минут.
Я понимаю, что coalesce
выполняет перемешивание с драйвером, который выполняет запись .... но время, которое на это требуется, заставляет меня думать, что я делаю что-то серьезно не так. Без coalesce
это все равно займет 15 минут, что, согласно ИМО, все еще слишком долго и дает мне тонну небольших parquet
файлов. Я бы хотел иметь один большой файл данных в день, который у меня будет. У меня есть код для разделения по значению поля, и он такой же медленный. Я также пытался вывести это на csv
, и это занимает ~ 1 час.
Кроме того, я на самом деле не устанавливаю реквизиты времени выполнения, когда отправляю свою работу. Статистика моей консоли для одной работы:
- Живые рабочие: 2
- Количество используемых ядер: всего 16, занято 16
- Используемая память: всего 117,5 ГБ, занято 107,5 ГБ
- Заявки: 1 запущено, 5 завершено
- Драйверы: 0 работает, 0 завершено
- Статус: ЖИВОЙ