оптимизация чтения из паркетных файлов в s3 bucket

У меня есть большой набор данных в формате паркета (размером ~ 1 ТБ), который разделен на 2 иерархии: CLASS и DATE. Всего 7 классов. Но с 01.01.2020 Дата постоянно увеличивается. Мои данные сначала разделяются на CLASS, а затем на DATE

Так что-то вроде:

CLASS1---DATE 1
      ---DATE 2
      ---  .
      ---  .
      ---  .
      ---DATE N

CLASS2---DATE 1
      ---DATE 2
      ---  .
      ---  .
      ---  .
      ---DATE N

Я загружаю свои данные CLASS в цикле for. Если я загружаю весь паркетный файл, YARN завершает работу, поскольку перегружает экземпляры памяти. Но я загружаю все дни, так как я выполняю расчет процентилей в своем моделировании. Этот метод занимает около 23 часов.

Однако, если я переразбиваю так, что у меня есть только раздел CLASS, работа занимает около 10 часов. Слишком большое количество подразделов замедляет работу искрового исполнителя? Я сохраняю иерархию разделов как CLASS - ›DATE только потому, что мне нужно добавлять новые данные до DATE каждый день. Если иметь только 1 раздел более эффективно, то после загрузки новых данных мне пришлось бы перераспределять только раздел CLASS каждый день. Может ли кто-нибудь объяснить, почему один раздел работает быстрее? И если да, то как лучше всего ежедневно разбивать данные путем добавления и без повторного разбиения всего набора данных?

Спасибо

РЕДАКТИРОВАТЬ: Я использую цикл for в файловой структуре для выполнения цикла по разделам CLASS следующим образом:

fs = s3fs.S3FileSystem(anon=False)    
inpath="s3://bucket/file.parquet/"

Dirs= fs.ls(inpath)
for paths in Dirs:
    customPath='s3://' + uvapath + '/'
    class=uvapath.split('=')[1]
    df=spark.read.parquet(customPath)
    outpath="s3://bucket/Output_" + class + ".parquet"
#Perform calculations
df.write.mode('overwrite').parquet(outpath)

В загруженном df будут все даты на CLASS=1. Затем я вывожу файл в виде отдельных паркетных файлов для каждого CLASS, так что у меня есть 7 паркетных файлов:

Output_1.parquet
Output_2.parquet
Output_3.parquet
Output_4.parquet
Output_5.parquet
Output_6.parquet
Output_7.parquet

Затем я объединяю 7 паркетов в один паркет, это не проблема, так как полученные паркетные файлы намного меньше.


person thentangler    schedule 25.09.2020    source источник
comment
ваша структура разделов организована не так, как CLASS=1/DATE=2020-01-01? Тогда это будет тяжело.   -  person Lamanus    schedule 25.09.2020
comment
Это не проблема одного или нескольких разделов. Даже если существует много разделов, и если вы просто загружаете все целиком, все должно быть в порядке, когда вы фильтруете только некоторую часть раздела и используете ее для выполнения своей задачи, что не должно вызывать прерывание OOM или задания.   -  person Lamanus    schedule 25.09.2020
comment
@Lamanus да, это так. Но, похоже, на выполнение той же работы уходит больше времени, чем если бы у меня было только CLASS=1   -  person thentangler    schedule 25.09.2020
comment
Вам не нужен цикл for для его загрузки, а просто загрузите корневой путь. Кстати, столбец раздела должен быть в нижнем регистре.   -  person Lamanus    schedule 25.09.2020


Ответы (1)


У меня есть разделенные данные с тремя столбцами, годом, месяцем и идентификатором. Иерархия путей к папкам

year=2020/month=08/id=1/*.parquet
year=2020/month=08/id=2/*.parquet
year=2020/month=08/id=3/*.parquet
...
year=2020/month=09/id=1/*.parquet
year=2020/month=09/id=2/*.parquet
year=2020/month=09/id=3/*.parquet

и я могу прочитать DataFrame, загрузив корневой путь.

val df = spark.read.parquet("s3://mybucket/")

Затем секционированный столбец автоматически добавляется в DataFrame. Теперь вы можете отфильтровать данные для секционированного столбца таким образом, чтобы

val df_filtered = df.filter("year = '2020' and month = '09'")

и сделайте что-нибудь с df_filtered, тогда искра будет использовать только разделенные данные!


Для повторной обработки вы можете использовать fair scheduler искры. Добавьте файл fair.xml в src / main / resources вашего проекта с помощью приведенного ниже кода,

<?xml version="1.0"?>

<allocations>
    <pool name="fair">
        <schedulingMode>FAIR</schedulingMode>
        <weight>10</weight>
        <minShare>0</minShare>
    </pool>
</allocations>

и установите конфигурацию искры после создания сеанса искры.

spark.sparkContext.setLocalProperty("spark.scheduler.mode", "FAIR")
spark.sparkContext.setLocalProperty("spark.scheduler.allocation.file", getClass.getResource("/fair.xml").getPath)
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "fair")

Тогда вы сможете заниматься своей работой параллельно. Вы можете захотеть распараллелить задание в зависимости от КЛАССА, поэтому

val classes = (1 to 7).par
val date = '2020-09-25'

classes foreach { case i =>

    val df_filtered = df.filter(s"CLASS == '$i' and DATE = '$date'")
    
    // Do your job

}

код будет работать одновременно с разными значениями CLASS.

person Lamanus    schedule 25.09.2020
comment
Я обновил свой пост. Я использую цикл for в файловой системе перед загрузкой фрейма данных. Согласно вашему предложению в вашем примере, я бы загрузил весь паркет (~ 1 ТБ) в фрейм данных и правильно отфильтровал по классам? Если да, то не перегружает ли это память? И мне все равно нужно использовать цикл for, чтобы перейти к следующему классу? - person thentangler; 25.09.2020
comment
Он не загрузит все данные, потому что искра выполнит действие, когда «действие» произойдет с несколькими преобразованиями. Фильтр фрейма данных логичен и загружает только необходимые разделы, а не все данные. - person Lamanus; 25.09.2020
comment
Понял. Но мне все равно нужно пропустить фильтр через цикл for, чтобы перебрать CLASS и правильно ли выполнить синтаксический анализ? Так что-то вроде for I in class: filter(col('class')==i) # Calculations here - person thentangler; 25.09.2020
comment
Если в вашем кластере достаточно ресурсов, вам не нужно выполнять цикл, а просто выполнять его параллельно. - person Lamanus; 26.09.2020
comment
К сожалению, это не так. У меня всего 3 экземпляра кодовых узлов r5a.2xlarge с 64 ГБ оперативной памяти - person thentangler; 26.09.2020
comment
Ты должен попытаться. для работы достаточно максимум 2 часов. - person Lamanus; 26.09.2020
comment
Можно ли с вами связаться через личку или что-то в этом роде? Я хотел бы обсудить с вами свой код. У меня куча объединений, groupBys и статистических расчетов! Это никак не закончится за 2 часа :) - person thentangler; 26.09.2020