Как записать каждый раздел фрейма данных в разные таблицы

Я использую Databricks для подключения к Eventhub, где каждое сообщение, поступающее из EventHub, может сильно отличаться от другого.

В сообщении у меня есть тело и идентификатор.

Я ищу производительность, поэтому я избегаю сбора данных или ненужных обработок, а также хочу делать сохранение параллельно по разделам. Однако я не уверен, как это сделать должным образом.

Я хочу добавить тело каждого идентификатора в отдельную И КОНКРЕТНУЮ таблицу партиями. Идентификатор предоставит мне информацию, которую мне нужно сохранить в нужной таблице. Итак, для этого я пробовал 2 подхода:

  1. Разбиение на разделы: повторное разделение (numPartitions, ID) -> ForeachPartition
  2. Группировка: groupBy ('ID'). apply (myFunction) # @ pandas_udf GROUPED_MAP

Подход 1 не выглядит очень привлекательным для меня, процесс перераспределения выглядит ненужным, и я видел в документации, что даже если я установлю столбец в качестве раздела, он может сохранить многие идентификаторы этого столбца в одном разделе. Это только гарантирует, что все данные, связанные с этим идентификатором, находятся в разделе, а не разделены.

Подход 2 вынуждает меня выводить данные из pandas_udf, фрейма данных с той же схемой ввода, чего не произойдет, поскольку я преобразовываю сообщение eventhub из CSV в фрейм данных, чтобы сохранить его в таблице. Я мог бы вернуть тот же фрейм данных, который получил, но это звучит странно.

Есть ли какой-нибудь хороший подход, которого я не вижу?


person Flavio Pegas    schedule 03.07.2019    source источник


Ответы (1)


Если ваш идентификатор имеет различное количество значений (тип столбца типа / страны), вы можете использовать partitionBy для хранения, и, таким образом, их сохранение в другой таблице будет быстрее. В противном случае создайте производный столбец (используя withColumn) из столбца идентификатора, используя ту же логику, которую вы хотите использовать при разделении данных по таблицам. Затем вы можете использовать этот производный столбец в качестве столбца раздела, чтобы ускорить загрузку.

person Saswat    schedule 04.07.2019
comment
Если я вас правильно понял, с помощью partitionBy для сохранения в файлы читать файлы и сохранять в таблицу будет быстрее? Это из-за polybase? - person Flavio Pegas; 04.07.2019
comment
Сейчас я думаю просто добавить все сообщения, независимо от идентификатора, в общую дельта-таблицу, а затем я могу выбрать каждый идентификатор индивидуально в цикле foreach. Я считаю, что это не создаст кучу файлов, я позабочусь о том, чтобы каждый идентификатор обрабатывался индивидуально, и я смогу работать параллельно - person Flavio Pegas; 04.07.2019
comment
Помещая его в дельта-таблицу, вы можете выполнить разделение по идентификатору, если хотите снова получить оттуда и записать его в отдельные таблицы. Это позволит импровизировать ваше исполнение. - person Saswat; 05.07.2019
comment
Именно это я и решил сделать! Спасибо! - person Flavio Pegas; 05.07.2019