Все операции агрегирования выполняются после извлечения всего набора данных в память в виде коллекции DataFrame
. Таким образом, подсчет в Spark никогда не будет таким эффективным, как непосредственно в TeraData. Иногда стоит выполнить некоторые вычисления в базе данных, создав представления, а затем сопоставив эти представления с помощью JDBC API.
Каждый раз, когда вы используете драйвер JDBC для доступа к большой таблице, вы должны указать стратегию разделения, иначе вы создадите DataFrame
/RDD
с одним разделом и перегрузите одно соединение JDBC.
Вместо этого вы хотите попробовать следующий ИИ (начиная со Spark 1.4.0+):
sqlctx.read.jdbc(
url = "<URL>",
table = "<TABLE>",
columnName = "<INTEGRAL_COLUMN_TO_PARTITION>",
lowerBound = minValue,
upperBound = maxValue,
numPartitions = 20,
connectionProperties = new java.util.Properties()
)
Существует также возможность снизить фильтрацию.
Если у вас нет равномерно распределенного интегрального столбца, вы хотите создать несколько пользовательских разделов, указав пользовательские предикаты (операторы where
). Например, предположим, что у вас есть столбец меток времени и вы хотите разделить его по диапазонам дат:
val predicates =
Array(
"2015-06-20" -> "2015-06-30",
"2015-07-01" -> "2015-07-10",
"2015-07-11" -> "2015-07-20",
"2015-07-21" -> "2015-07-31"
)
.map {
case (start, end) =>
s"cast(DAT_TME as date) >= date '$start' AND cast(DAT_TME as date) <= date '$end'"
}
predicates.foreach(println)
// Below is the result of how predicates were formed
//cast(DAT_TME as date) >= date '2015-06-20' AND cast(DAT_TME as date) <= date '2015-06-30'
//cast(DAT_TME as date) >= date '2015-07-01' AND cast(DAT_TME as date) <= date '2015-07-10'
//cast(DAT_TME as date) >= date '2015-07-11' AND cast(DAT_TME as date) <= date //'2015-07-20'
//cast(DAT_TME as date) >= date '2015-07-21' AND cast(DAT_TME as date) <= date '2015-07-31'
sqlctx.read.jdbc(
url = "<URL>",
table = "<TABLE>",
predicates = predicates,
connectionProperties = new java.util.Properties()
)
Он сгенерирует DataFrame
, где каждый раздел будет содержать записи каждого подзапроса, связанного с различными предикатами.
Проверьте исходный код по адресу DataFrameReader.scala
person
Gianmario Spacagna
schedule
30.09.2015
tdQuery("SELECT COUNT(*) FROM your_table)
- person Dev Patel   schedule 24.08.2015