Как я могу подключиться к базе данных postgreSQL в Apache Spark с помощью scala?

Я хочу знать, как я могу выполнять следующие действия в scala?

  1. Подключитесь к базе данных postgreSQL с помощью Spark scala.
  2. Напишите SQL-запросы, такие как SELECT, UPDATE и т. Д., Чтобы изменить таблицу в этой базе данных.

Я знаю, как это сделать с помощью scala, но как импортировать jar-коннектор psql scala в sbt при его упаковке?


person febinsathar    schedule 23.07.2014    source источник
comment
Почему отрицательные голоса? Я думаю, это отличный вопрос. Это довольно общий вопрос, но ответ также может быть общим и помочь многим пользователям.   -  person Daniel Darabos    schedule 24.07.2014
comment
вы в конечном итоге использовали mysql или postgres? Если postgres, можно ли взглянуть на ваш пример sbt и кода?   -  person Irene    schedule 05.05.2015


Ответы (1)


Наша цель - запускать параллельные SQL-запросы от рабочих Spark.

Настройка сборки

Добавьте соединитель и JDBC в libraryDependencies в build.sbt. Я пробовал это только с MySQL, поэтому я буду использовать это в своих примерах, но Postgres должен быть примерно таким же.

libraryDependencies ++= Seq(
  jdbc,
  "mysql" % "mysql-connector-java" % "5.1.29",
  "org.apache.spark" %% "spark-core" % "1.0.1",
  // etc
)

Код

Когда вы создаете SparkContext, вы указываете ему, какие банки копировать исполнителям. Включите соединительную банку. Красивый способ сделать это:

val classes = Seq(
  getClass,                   // To get the jar with our own code.
  classOf[mysql.jdbc.Driver]  // To get the connector.
)
val jars = classes.map(_.getProtectionDomain().getCodeSource().getLocation().getPath())
val conf = new SparkConf().setJars(jars)

Теперь Spark готов подключиться к базе данных. Каждый исполнитель будет выполнять часть запроса, чтобы результаты были готовы для распределенных вычислений.

Для этого есть два варианта. Более старый подход заключается в использовании org.apache.spark.rdd.JdbcRDD:

val rdd = new org.apache.spark.rdd.JdbcRDD(
  sc,
  () => {
    sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
  },
  "SELECT * FROM BOOKS WHERE ? <= KEY AND KEY <= ?",
  0, 1000, 10,
  row => row.getString("BOOK_TITLE")
)

Ознакомьтесь с документацией по параметрам. Вкратце:

  • У вас есть SparkContext.
  • Затем функция, которая создает соединение. Это будет вызываться для каждого рабочего для подключения к базе данных.
  • Затем SQL-запрос. Он должен быть похож на пример и содержать заполнители для начального и конечного ключа.
  • Затем вы указываете диапазон ключей (от 0 до 1000 в моем примере) и количество разделов. Ассортимент будет разделен между разделами. Таким образом, один поток-исполнитель в конечном итоге выполнит SELECT * FROM FOO WHERE 0 <= KEY AND KEY <= 100 в этом примере.
  • И, наконец, у нас есть функция, которая преобразует ResultSet во что-то. В этом примере мы преобразуем его в String, так что вы получите RDD[String].

Начиная с Apache Spark версии 1.3.0, через API DataFrame доступен другой метод. Вместо JdbcRDD вы должны создать _14 _:

val df = sqlContext.load("jdbc", Map(
  "url" -> "jdbc:mysql://mysql.example.com/?user=batman&password=alfred",
  "dbtable" -> "BOOKS"))

См. https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#jdbc-to-other-databases, чтобы просмотреть полный список параметров (диапазон ключей и количество разделов можно установить так же, как с JdbcRDD).

Обновления

JdbcRDD не поддерживает обновления. Но вы можете просто сделать это за foreachPartition.

rdd.foreachPartition { it =>
  val conn = sql.DriverManager.getConnection("jdbc:mysql://mysql.example.com/?user=batman&password=alfred")
  val del = conn.prepareStatement("DELETE FROM BOOKS WHERE BOOK_TITLE = ?")
  for (bookTitle <- it) {
    del.setString(1, bookTitle)
    del.executeUpdate
  }
}

(Это создает одно соединение для каждого раздела. Если это вызывает беспокойство, используйте пул соединений!)

DataFrames поддерживают обновления с помощью методов createJDBCTable и insertIntoJDBC.

person Daniel Darabos    schedule 24.07.2014
comment
Ваше обновление создает новое соединение для каждого раздела. - person BAR; 04.10.2015
comment
да. Каждый раздел может обрабатываться на другом компьютере, поэтому они не могут использовать одно соединение. Однако вы можете использовать пул соединений, чтобы, если два раздела обрабатывались на одном компьютере в одном потоке, один за другим, они могли повторно использовать соединение. Насколько мне известно, в стандартном API Java нет пула соединений, поэтому это значительно усложнит пример. Но дайте мне знать, если знаете хорошее решение! - person Daniel Darabos; 04.10.2015
comment
Право на. В новой документации Spark для 1.5.1 показаны 3 примера того, что можно и чего нельзя делать в этом случае. У них есть довольно элегантное решение с использованием пула соединений. - person BAR; 04.10.2015
comment
Ой, фантастика! У вас есть ссылка на эту страницу в документации? Спасибо! - person Daniel Darabos; 04.10.2015
comment
Извините, я ничего не могу найти о пуле подключений по этому URL-адресу. Что мне не хватает? - person Daniel Darabos; 05.10.2015
comment
Вот и все: искра. apache.org/docs/latest/ - person BAR; 05.10.2015
comment
Спасибо! Это довольно элегантно, но это псевдокод. Используемый ими объект ConnectionPool - это вымышленный API. Поэтому я бы предпочел не включать его в свой ответ. Но я добавил параграф, предлагающий использовать пул соединений. - person Daniel Darabos; 06.10.2015