Spark 2.2 struct streaming foreach Writer задержка приемника jdbc

Я участвую в проекте, использующем потоковую передачу структуры Spark 2.2 для чтения сообщения kafka в базу данных Oracle. поток сообщений в кафку составляет около 4000-6000 сообщений в секунду.


при использовании файловой системы hdfs в качестве приемника она просто отлично работает. при использовании foreach jdbc writer со временем будет происходить огромная задержка. Я думаю, что задержка вызвана циклом foreach.

класс приемника jdbc (автономный файл класса):

class JDBCSink(url: String, user: String, pwd: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
  val driver = "oracle.jdbc.driver.OracleDriver"
  var connection: java.sql.Connection = _
  var statement: java.sql.PreparedStatement = _
  val v_sql = "insert INTO sparkdb.t_cf(EntityId,clientmac,stime,flag,id) values(?,?,to_date(?,'YYYY-MM-DD HH24:MI:SS'),?,stream_seq.nextval)"

  def open(partitionId: Long, version: Long): Boolean = {
    Class.forName(driver)
    connection = java.sql.DriverManager.getConnection(url, user, pwd)
    connection.setAutoCommit(false)
    statement = connection.prepareStatement(v_sql)
    true
  }

  def process(value: org.apache.spark.sql.Row): Unit = {
    statement.setString(1, value(0).toString)
    statement.setString(2, value(1).toString)
    statement.setString(3, value(2).toString)
    statement.setString(4, value(3).toString)
    statement.executeUpdate()        
  }

  def close(errorOrNull: Throwable): Unit = {
    connection.commit()
    connection.close
  }
}

раковина:

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "namenode:9092").option("fetch.message.max.bytes", "50000000").option("kafka.max.partition.fetch.bytes", "50000000")
  .option("subscribe", "rawdb.raw_data")
  .option("startingOffsets", "latest")
  .load()
  .select($"value".as[Array[Byte]])
  .map(avroDeserialize(_))
  .filter(some logic).select(some logic) 
  .writeStream.format("csv").option("checkpointLocation", "/user/root/chk").option("path", "/user/root/testdir").start()

если я изменю последнюю строку

.writeStream.format("csv")...

в приемник jdbc foreach следующим образом:

val url = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=x.x.x.x)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=fastdb)))"
val user = "user";
val pwd = "password";
val writer = new JDBCSink(url, user, pwd)
.writeStream.foreach(writer).outputMode("append").start()

задержка обнаруживается.

Я предполагаю, что проблема, скорее всего, вызвана механикой цикла foreach - он не в пакетном режиме работает с несколькими тысячами строк в пакете, как администратор базы данных Oracle, я точно настроил сторону базы данных Oracle, в основном база данных ждет неактивных событий. чрезмерной фиксации стараются избежать, установив уже connection.setAutoCommit(false), любое предложение будет очень признательно.


person dalin qin    schedule 06.11.2017    source источник


Ответы (3)


Хотя у меня нет фактического профиля того, что занимает больше всего времени в вашем приложении, я предполагаю, что это связано с тем, что использование ForeachWriter будет эффективно закрывать и повторно открывать ваше соединение JDBC при каждом запуске, потому что так работает ForeachWriter .

Я бы посоветовал вместо его использования написать собственный Sink для JDBC, в котором вы управляете тем, как открывается или закрывается соединение.

Существует открытый запрос на добавление драйвера JDBC в Spark, который можно выполнить загляните, чтобы увидеть возможный подход к реализации.

person Yuval Itzchakov    schedule 06.11.2017
comment
вы совершенно правы, я только что проверил журнал слушателя оракула, тяжелое соединение и закрытие там. так что очевидно, что искра не медленная, ни оракул плохой, только писатель foreach не предназначен для этой бизнес-потребности. и поскольку под капотом искра использует rdd (неизменяемый), я думаю, что нет возможности установить внутренний буфер или что-то еще для кеширования данных foreach и отправки их в пакетном режиме. Я думаю переписать полученный результат в другую кафку и использовать другие инструменты с пакетным режимом для подключения к СУБД. - person dalin qin; 06.11.2017
comment
@dalinqin Обратите внимание, что использование пользовательского Sink решит эту проблему, так как у вас не будет проблем с сохранением соединения. - person Yuval Itzchakov; 06.11.2017
comment
не могли бы вы поделиться образцом сценария? если в любом случае используется потоковая передача структуры, нужно перейти к записи foreach для записи в пункт назначения jdbc, верно? поправьте меня если я ошибаюсь . звуки мы должны реализовать интерфейс ForeachWriter. - person dalin qin; 06.11.2017
comment
@dalinqin Нет, вы можете реализовать собственный приемник, см. ссылку, которую я предоставил в своем ответе. - person Yuval Itzchakov; 07.11.2017
comment
Теперь я понимаю вашу точку зрения, это означает, что вместо того, чтобы использовать foreach writer, я разрабатываю свою собственную раковину :), довольно сложно для меня, я попробую. - person dalin qin; 07.11.2017

Проблема решена путем внедрения результата в другую тему Kafka, затем написана другая программа, прочитанная из новой темы, записывает их в базу данных по партиям.

Я думаю, что в следующем выпуске искры они могут предоставить приемник jdbc и иметь размер пакета настройки некоторых параметров.

основной код выглядит следующим образом:

напишите в другую тему:

  .writeStream.format("kafka")
  .option("kafka.bootstrap.servers", "x.x.x.x:9092")
  .option("topic", "fastdbtest")
  .option("checkpointLocation", "/user/root/chk")
  .start()

читайте тему и пишите в базы данных, я использую пул соединений c3p0

lines.foreachRDD(rdd => {
  if (!rdd.isEmpty) {
    rdd.foreachPartition(partitionRecords => {
      //get a connection from connection pool
      val conn = ConnManager.getManager.getConnection
      val ps = conn.prepareStatement("insert into sparkdb.t_cf(ENTITYID,CLIENTMAC,STIME,FLAG) values(?,?,?,?)")
      try {
        conn.setAutoCommit(false)
        partitionRecords.foreach(record => {
          insertIntoDB(ps, record)
        }
        )
        ps.executeBatch()
        conn.commit()
      } catch {
        case e: Exception =>{}
        // do some log
      } finally {
        ps.close()
        conn.close()
      }
    })
  }
})
person dalin qin    schedule 10.11.2017

Вы пробовали использовать триггер?

Я заметил, что когда я не использовал триггер, мой приемник по каждому элементу несколько раз открывал и закрывал соединение с базой данных.

writeStream.foreach(writer).start()

Но когда я использовал триггер, Foreach открывал и закрывал соединение только один раз, обрабатывая, например, 200 запросов, а когда микропакет был завершен, он закрыл соединение до тех пор, пока не был получен новый микропакет.

writeStream.trigger(Trigger.ProcessingTime("3 seconds")).foreach(writer).start()

Мой вариант использования - чтение из темы Kafka только с одним разделом, поэтому я думаю, что Spark использует один раздел. Я не знаю, работает ли это решение одинаково с несколькими разделами Spark, но мой вывод заключается в том, что Foreach обрабатывает все микропакеты одновременно (строка за строкой) в методе процесса и не вызывает open () и close () для каждой строки, как думают многие.

person David Zamora    schedule 08.02.2019