Эффективный способ чтения паркетных файлов между диапазоном дат в Azure Databricks

Я хотел бы знать, является ли приведенный ниже псевдокод эффективным методом чтения нескольких паркетных файлов между диапазоном дат, хранящимся в Azure Data Lake, из PySpark (Azure Databricks). Примечание: файлы паркета не разбиты на разделы по дате.

Я использую соглашение uat / EntityName / 2019/01/01 / EntityName_2019_01_01_HHMMSS.parquet для хранения данных в ADL, как это предлагается в книге Натана Марца Big Data с небольшой модификацией (с использованием 2019 вместо года = 2019).

Прочтите все данные, используя подстановочный знак *:

df = spark.read.parquet(uat/EntityName/*/*/*/*)

Добавьте столбец FileTimestamp, который извлекает временную метку из EntityName_2019_01_01_HHMMSS.parquet, используя строковую операцию и преобразовывая в TimestampType ()

df.withColumn(add timestamp column)

Используйте фильтр, чтобы получить релевантные данные:

start_date = '2018-12-15 00:00:00'
end_date = '2019-02-15 00:00:00'
df.filter(df.FileTimestamp >= start_date).filter(df.FileTimestamp < end_date)

По сути, я использую PySpark для имитации аккуратного синтаксиса, доступного в U-SQL:

@rs = 
  EXTRACT 
      user    string,
      id      string,
      __date  DateTime
  FROM 
    "/input/data-{__date:yyyy}-{__date:MM}-{__date:dd}.csv"
  USING Extractors.Csv();

@rs = 
  SELECT * 
  FROM @rs
  WHERE 
    date >= System.DateTime.Parse("2016/1/1") AND
    date < System.DateTime.Parse("2016/2/1");

person samratb    schedule 28.02.2019    source источник


Ответы (1)


Правильный способ разделения ваших данных - использовать для ваших данных форму год = 2019, месяц = ​​01 и т. Д.

Когда вы запрашиваете эти данные с помощью такого фильтра, как:

df.filter(df.year >= myYear)

Тогда Spark будет читать только соответствующие папки.

Очень важно, чтобы имя столбца фильтрации отображалось точно в имени папки. Обратите внимание, что когда вы записываете разделенные данные с помощью Spark (например, по году, месяцу, дню), он не будет записывать столбцы разделения в файл паркета. Вместо этого они выводятся из пути. Это означает, что они потребуются вашему фреймворку при записи. Они также будут возвращены в виде столбцов при чтении из разделенных источников.

Если вы не можете изменить структуру папок, вы всегда можете вручную уменьшить количество папок для чтения Spark с помощью регулярного выражения или Glob - эта статья должна предоставить больше контекста Spark SQL-запросы к многораздельным данным с использованием диапазонов дат. Но очевидно, что это более ручной и сложный процесс.

ОБНОВЛЕНИЕ: дополнительный пример Могу ли я читать несколько файлов в Spark Dataframe из S3, передавая несуществующие?

Также из "Spark - The Definitive Guide: Big Data Processing Made Simple" Билла Чемберса:

Разбиение на разделы - это инструмент, который позволяет вам контролировать, какие данные хранятся (и где) по мере их записи. Когда вы записываете файл в разделенный каталог (или таблицу), вы в основном кодируете столбец как папку. Это позволяет вам пропускать много данных, когда вы собираетесь читать их позже, что позволяет вам считывать только те данные, которые имеют отношение к вашей проблеме, вместо того, чтобы сканировать весь набор данных. ...

Это, вероятно, самая низкая оптимизация, которую вы можете использовать, когда у вас есть таблица, которую читатели часто фильтруют перед изменением. Например, дата особенно распространена для раздела, потому что в дальнейшем мы часто хотим смотреть только данные за предыдущую неделю (вместо того, чтобы сканировать весь список записей).

person simon_dmorias    schedule 03.03.2019
comment
Привет, Саймон, спасибо за ответ. Я не мог понять механизм вывода столбцов из пути. Я искал более авторитетную документацию. Есть указатели? Шаблон запроса будет представлять собой диапазон дат, например получить данные за текущий месяц или за последние 3 месяца. Я согласен, что глобусы невозможны. Принятие неявных столбцов year = 2019 ставит меня в то же положение, что и globs. Считаете ли вы, что добавление столбца Timestamp вместе со структурой папок year = 2019 было бы лучше всего для достижения фильтра диапазона дат и эффективного извлечения данных. Где я могу просмотреть совокупное количество операций чтения / задействованных операций ввода-вывода в секунду? - person samratb; 04.03.2019
comment
Я добавил обновление, чтобы попытаться расширить это. Также обратите внимание, что в SparkUI вы можете видеть размер входного набора данных, чтобы увидеть, сколько данных было прочитано, и примененная фильтрация перед вводом. - person simon_dmorias; 05.03.2019
comment
Дай мне проверить и вернуться - person samratb; 06.03.2019
comment
Привет, Саймон, решение, которое вы дали, сработало. Я использовал ADF для построения файловой структуры в формате год / месяц / день, который был неправильным. Вместо этого я прибег к Databricks для построения структуры папок с ключом раздела, указанным при записи. Это создало желаемую структуру папок, и когда я объединяю файлы, он дает неявные столбцы год, месяц, день в объединенном df. Спасибо за вашу помощь. - person samratb; 11.04.2019