Как повысить производительность для медленных заданий Spark с использованием соединения DataFrame и JDBC?

Я пытаюсь получить доступ к таблице Teradata среднего размера (~ 100 миллионов строк) через JDBC в автономном режиме на одном узле (local[*]).

Я использую Спарк 1.4.1. и установлен на очень мощной машине (2 процессора, 24 ядра, 126 ГБ ОЗУ).

Я пробовал несколько вариантов настройки и настройки памяти, чтобы она работала быстрее, но ни один из них не оказал существенного влияния.

Я уверен, что есть что-то, что мне не хватает, и ниже моя последняя попытка, которая заняла около 11 минут, чтобы получить этот простой подсчет, по сравнению с тем, что потребовалось всего 40 секунд с использованием соединения JDBC через R, чтобы получить подсчеты.

bin/pyspark --driver-memory 40g --executor-memory 40g

df = sqlContext.read.jdbc("jdbc:teradata://......)
df.count()

Когда я пытался использовать БОЛЬШУЮ таблицу (записи 5B), после завершения запроса результаты не возвращались.


person Dev Patel    schedule 24.08.2015    source источник
comment
Как вы считаете, используя R?   -  person zero323    schedule 24.08.2015
comment
@zero323 - просто используя пакеты RJDBC и teradataR после настройки соединения с помощью Teradata JARS... а затем tdQuery("SELECT COUNT(*) FROM your_table)   -  person Dev Patel    schedule 24.08.2015
comment
Насколько я знаю, источник данных Spark JDBC может нажимать предикаты, но фактическое выполнение выполняется в Spark. Это означает, что вы должны перенести свои данные в кластер Spark. Так что это не то же самое, что выполнение SQL-запроса через JDBC (случай R). Первое, что вам нужно сделать, это кэшировать ваши данные после загрузки. Однако это не улучшит производительность для первого запроса.   -  person zero323    schedule 24.08.2015
comment
@zero323 - спасибо, я понял это, проведя еще несколько исследований по этому вопросу. У меня есть небольшой вопрос: какой самый быстрый способ чтения данных в apache spark? через файловую структуру Parquet?   -  person Dev Patel    schedule 24.08.2015
comment
Вероятно, это хороший выбор, но первое, что вы можете попробовать, прежде чем идти по этому пути, — это использовать коннектор Teradata Hadoop. Похоже, он поддерживает несколько вариантов экспорта, включая таблицы Hive. Тем не менее, сеть с одной машиной и дисковый ввод-вывод могут быть ограничивающим фактором.   -  person zero323    schedule 24.08.2015
comment
Предложите принять ответ Джанмариоса.   -  person samthebest    schedule 11.12.2015


Ответы (3)


Все операции агрегирования выполняются после извлечения всего набора данных в память в виде коллекции DataFrame. Таким образом, подсчет в Spark никогда не будет таким эффективным, как непосредственно в TeraData. Иногда стоит выполнить некоторые вычисления в базе данных, создав представления, а затем сопоставив эти представления с помощью JDBC API.

Каждый раз, когда вы используете драйвер JDBC для доступа к большой таблице, вы должны указать стратегию разделения, иначе вы создадите DataFrame/RDD с одним разделом и перегрузите одно соединение JDBC.

Вместо этого вы хотите попробовать следующий ИИ (начиная со Spark 1.4.0+):

sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  columnName = "<INTEGRAL_COLUMN_TO_PARTITION>", 
  lowerBound = minValue,
  upperBound = maxValue,
  numPartitions = 20,
  connectionProperties = new java.util.Properties()
)

Существует также возможность снизить фильтрацию.

Если у вас нет равномерно распределенного интегрального столбца, вы хотите создать несколько пользовательских разделов, указав пользовательские предикаты (операторы where). Например, предположим, что у вас есть столбец меток времени и вы хотите разделить его по диапазонам дат:

    val predicates = 
  Array(
    "2015-06-20" -> "2015-06-30",
    "2015-07-01" -> "2015-07-10",
    "2015-07-11" -> "2015-07-20",
    "2015-07-21" -> "2015-07-31"
  )
  .map {
    case (start, end) => 
      s"cast(DAT_TME as date) >= date '$start'  AND cast(DAT_TME as date) <= date '$end'"
  }

 predicates.foreach(println) 

// Below is the result of how predicates were formed 
//cast(DAT_TME as date) >= date '2015-06-20'  AND cast(DAT_TME as date) <= date '2015-06-30'
//cast(DAT_TME as date) >= date '2015-07-01'  AND cast(DAT_TME as date) <= date '2015-07-10'
//cast(DAT_TME as date) >= date '2015-07-11'  AND cast(DAT_TME as date) <= date //'2015-07-20'
//cast(DAT_TME as date) >= date '2015-07-21'  AND cast(DAT_TME as date) <= date '2015-07-31'


sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  predicates = predicates,
  connectionProperties = new java.util.Properties()
)

Он сгенерирует DataFrame, где каждый раздел будет содержать записи каждого подзапроса, связанного с различными предикатами.

Проверьте исходный код по адресу DataFrameReader.scala

person Gianmario Spacagna    schedule 30.09.2015
comment
@zero323, @Gianmario Spacagna если мне на самом деле нужно прочитать всю таблицу MySQL (а не только получить count), то как я могу улучшить вялость производительность Spark-SQL? Я уже распараллеливаю операцию чтения, используя spark.read.jdbc(..numPartitions..). - person y2k-shubham; 06.03.2018
comment
В моей таблице MySQL (InnoDB) ~ 186M записей весом около 149 ГБ (согласно статистике, показанной phpMyAdmin), и я использую numPartitions = 32. [Spark 2.2.0] У меня EMR 5.12.0 с 1 master, 1 task и 1 core (все r3.xlarge, 8 vCore, 30,5 GiB памяти, 80 SSD GB памяти). Я обнаружил, что чтение таблицы MySQL в DataFrame завершается ошибкой, если я НЕ limit записываю ~ 1,5-2M. Он дает длинную трассировку стека с javax.servlet.ServletException: java.util.NoSuchElementException: None.get и java.sql.SQLException: Incorrect key file for table.. - person y2k-shubham; 06.03.2018

Вмещается ли несериализованная таблица в 40 ГБ? Если он начнет подкачку на диске, производительность резко снизится.

В любом случае, когда вы используете стандартный JDBC с синтаксисом ansi SQL, вы используете механизм БД, поэтому, если teradata (я не знаю teradata) содержит статистику о вашей таблице, классический «выбрать количество (*) из таблицы» будет очень быстрым. Вместо этого spark загружает ваши 100 миллионов строк в память чем-то вроде «выбрать * из таблицы», а затем выполняет подсчет строк RDD. Это совсем другая нагрузка.

person axlpado - Agile Lab    schedule 24.08.2015
comment
Я думаю, что да, и я также попытался увеличить память до 100 ГБ, но не увидел никаких улучшений. Я не пытаюсь загрузить 100 миллионов строк в память, но запускаю некоторую агрегированную операцию, такую ​​как count() в кадре данных или count(*) во временной таблице, но Spark занимает слишком много времени. Я также попытался зарегистрировать DF как временную таблицу и провел простой подсчет, но это занимает примерно столько же времени. ra1.registerTempTable("ra_dt"); total = sqlContext.sql("select count(*) from ra_dt") - person Dev Patel; 24.08.2015
comment
Да, но я думаю, что spark не сокращает операцию подсчета в движке БД, поэтому он загружает все строки в память, а затем выполняет подсчет в DF. - person axlpado - Agile Lab; 24.08.2015
comment
Сколько столбцов у вас есть в этой таблице, со 100 миллионами строк довольно легко получить 100 ГБ несериализованных объектов. Не могли бы вы опубликовать схему вашей таблицы? - person axlpado - Agile Lab; 24.08.2015
comment
Я думаю, вы правы, я читал несколько других сообщений в Интернете и обнаружил, что Spark пытается загрузить данные перед применением операции подсчета. В таком случае, что было бы идеальным способом быстрее читать этот тип данных в Spark? Другими словами какой самый быстрый способ прочитать данные в apache spark? Вот схема моей таблицы: root |-- field1: decimal(18,0) (nullable = true) |-- field2 : строка (nullable = true) |-- field3: дата (nullable = true) |-- field4: дата (nullable = true) |-- field5: integer (nullable = true) |-- field6: string (nullable = true) ) - person Dev Patel; 24.08.2015
comment
Spark — это механизм распределенной обработки, поэтому лучший способ загрузки данных в spark — из распределенной файловой системы или dbms. В вашем случае, работая над экземпляром signle, я думаю, вы можете улучшить производительность, указав только partitionColumn, lowerBound, upperBound, numPartition, чтобы улучшить параллелизм чтения. Если вам нужно выполнить другие запросы после подсчета, вы можете кэшировать DF перед его подсчетом, поэтому первый подсчет займет время, но затем следующие запросы будут в памяти и будут быстрее. - person axlpado - Agile Lab; 24.08.2015
comment
Имеет смысл! Спасибо за ответ! - person Dev Patel; 26.08.2015
comment
Сколько исполнителей вы используете и сколько --executor-cores? - person Boggio; 28.08.2015

Одним из решений, которое отличается от других, является сохранение данных из таблицы оракула в файле avro (разделенном на множество файлов), сохраненном в Hadoop. Таким образом, чтение этих файлов avro с помощью spark будет проще простого, поскольку вы больше не будете вызывать базу данных.

person Vasile Surdu    schedule 19.08.2019