Наша цель - запускать параллельные SQL-запросы от рабочих Spark.
Настройка сборки
Добавьте соединитель и JDBC в libraryDependencies
в build.sbt
. Я пробовал это только с MySQL, поэтому я буду использовать это в своих примерах, но Postgres должен быть примерно таким же.
libraryDependencies ++= Seq(
jdbc,
"mysql" % "mysql-connector-java" % "5.1.29",
"org.apache.spark" %% "spark-core" % "1.0.1",
// etc
)
Код
Когда вы создаете SparkContext
, вы указываете ему, какие банки копировать исполнителям. Включите соединительную банку. Красивый способ сделать это:
val classes = Seq(
getClass, // To get the jar with our own code.
classOf[mysql.jdbc.Driver] // To get the connector.
)
val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath())
val conf = new SparkConf().setJars(jars)
Теперь Spark готов подключиться к базе данных. Каждый исполнитель будет выполнять часть запроса, чтобы результаты были готовы для распределенных вычислений.
Для этого есть два варианта. Более старый подход заключается в использовании org.apache.spark.rdd.JdbcRDD
:
val rdd = new org.apache.spark.rdd.JdbcRDD(
sc,
() => {
sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
},
"SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?",
0, 1000, 10,
row => row.getString("BOOK_TITLE")
)
Ознакомьтесь с документацией по параметрам. Вкратце:
- У вас есть
SparkContext
.
- Затем функция, которая создает соединение. Это будет вызываться для каждого рабочего для подключения к базе данных.
- Затем SQL-запрос. Он должен быть похож на пример и содержать заполнители для начального и конечного ключа.
- Затем вы указываете диапазон ключей (от 0 до 1000 в моем примере) и количество разделов. Ассортимент будет разделен между разделами. Таким образом, один поток-исполнитель в конечном итоге выполнит
SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100
в этом примере.
- И, наконец, у нас есть функция, которая преобразует
ResultSet
во что-то. В этом примере мы преобразуем его в String
, так что вы получите RDD[String]
.
Начиная с Apache Spark версии 1.3.0, через API DataFrame доступен другой метод. Вместо JdbcRDD
вы должны создать _14 _:
val df = sqlContext.load("jdbc", Map(
"url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred",
"dbtable" -> "BOOKS"))
См. https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases, чтобы просмотреть полный список параметров (диапазон ключей и количество разделов можно установить так же, как с JdbcRDD
).
Обновления
JdbcRDD
не поддерживает обновления. Но вы можете просто сделать это за foreachPartition
.
rdd.foreachPartition { it =>
val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?")
for (bookTitle <- it) {
del.setString(1, bookTitle)
del.executeUpdate
}
}
(Это создает одно соединение для каждого раздела. Если это вызывает беспокойство, используйте пул соединений!)
DataFrame
s поддерживают обновления с помощью методов createJDBCTable
и insertIntoJDBC
.
person
Daniel Darabos
schedule
24.07.2014