Почему файлы Spark Parquet для агрегата больше оригинала?

Я пытаюсь создать сводный файл для использования конечными пользователями, чтобы они не обрабатывали несколько источников с файлами гораздо большего размера. Для этого я: A) перебираю все исходные папки, удаляя 12 наиболее часто запрашиваемых полей, раскручивая паркетные файлы в новом месте, где эти результаты совмещены. Б) Я пытаюсь вернуться к файлам, созданным на шаге А, и повторно объединить их, сгруппировав по 12 полям, чтобы свести их к итоговой строке для каждой уникальной комбинации.

Я обнаружил, что шаг A снижает полезную нагрузку 5: 1 (примерно 250 гигабайт становятся 48,5 гигабайтами). Шаг B, однако, вместо дальнейшего уменьшения, увеличьте это на 50% по сравнению с шагом A. Однако мои подсчеты совпадают.

Здесь используется Spark 1.5.2. Мой код, измененный только для замены имен полей на field1 ... field12, чтобы сделать его более читабельным, приведен ниже с результатами, которые я отметил.

Хотя я не обязательно ожидаю еще одного сокращения 5: 1, я не знаю, что делаю неправильно, чтобы увеличить объем хранилища для меньшего количества строк с той же схемой. Кто-нибудь может помочь мне понять, что я сделал не так?

Спасибо!

//for each eventName found in separate source folders, do the following:
//spit out one row with key fields from the original dataset for quicker availability to clients 
//results in a 5:1 reduction in size
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table"
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/")
//results in over 700 files with a total of  16,969,050,506 rows consuming 48.65 gigs of storage space in S3, compressed 

//after all events are processed, aggregate the results
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, sum(rCount) as rCount from results group by field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12"
//Use a wildcard to search all sub-folders created above
sqlContext.read.parquet("<aws folder>" + dt + "/*/").registerTempTable("results")
sqlContext.sql(sqlStatement).coalesce(20).saveAsParquetFile("<a new aws folder>" + dt + "/")
//This results in  3,295,206,761 rows with an aggregate value of 16,969,050,506 for rCount but consumes 79.32 gigs of storage space in S3, compressed

//The parquet schemas created (both tables match):
 |-- field1: string (nullable = true) (10 characters)
 |-- field2: string (nullable = true) (15 characters)
 |-- field3: string (nullable = true) (50 characters max)
 |-- field4: string (nullable = true) (10 characters)
 |-- field5: string (nullable = true) (10 characters)
 |-- field6: string (nullable = true) (10 characters)
 |-- field7: string (nullable = true) (16 characters)
 |-- field8: string (nullable = true) (10 characters)
 |-- field9 string (nullable = true)  (15 characters)
 |-- field10: string (nullable = true)(20 characters)
 |-- field11: string (nullable = true)(14 characters)
 |-- field12: string (nullable = true)(14 characters)
 |-- rCount: long (nullable = true)   
 |-- dt: string (nullable = true)

person Steve Drew    schedule 01.07.2016    source источник


Ответы (1)


Обычно столбчатые форматы хранения, такие как Parquet, очень чувствительны, когда дело доходит до распределения данных (организации данных) и количества отдельных столбцов. Чем более организованы данные и чем меньше количество элементов, тем эффективнее хранилище.

Агрегация, как та, которую вы применяете, должна перемешивать данные. Когда вы проверите план выполнения, вы увидите, что он использует хэш-разделитель. Это означает, что после агрегирования распределение может быть менее эффективным, чем распределение исходных данных. В то же время sum может уменьшить количество строк, но увеличить количество элементов rCount столбца.

Вы можете попробовать разные инструменты, чтобы исправить это, но не все из них доступны в Spark 1.5.2:

  • Сортировка полного набора данных по столбцам с низкой мощностью (довольно дорого из-за полного перемешивания) или sortWithinPartitions.
  • Используйте partitionBy метод DataFrameWriter для разделения данных с использованием столбцов с низкой мощностью.
  • Используйте bucketBy и sortBy методы DataFrameWriter (Spark 2.0.0+), чтобы улучшить распределение данных с помощью сегментирования и локальной сортировки.
person zero323    schedule 01.07.2016
comment
bucketBy кажется невозможным для использования с DataFrameWriter в Spark 2.0.0 - person eliasah; 10.02.2017
comment
Но почему сортировка на основе низкой мощности помогает сжатию? Если я правильно понимаю, сортировка помогает при кодировании длин серий, тогда как столбцы с относительно низкой мощностью сжимаются с использованием кодирования словаря - так почему же сортировка имеет значение? если ... группирование этих значений вместе помогает сделать вывод о низкой мощности, и если они распределены, этого вывода может не произойти (это вопрос, а не утверждение) - person Vitaliy; 13.05.2018