Коннектор Spark Cassandra: SQLContext.read + SQLContext.write против ручного синтаксического анализа и вставки (JSON - ›Cassandra)

Доброе утро,

я только начал исследовать Apache Spark и Apache Cassandra. Первый шаг - это очень простой вариант использования: взять файл, содержащий, например, клиент + оценка.

Таблица Cassandra имеет клиента как PrimaryKey. Кассандра просто работает локально (так что никакого кластера!).

Итак, SparkJob (автономный локальный [2]) анализирует файл JSON и затем записывает все в Cassandra.

Первое решение было

val conf = new SparkConf().setAppName("Application").setMaster("local[2]")
val sc = new SparkContext(conf)
val cass = CassandraConnector(conf)

val customerScores = sc.textFile(file).cache()

val customerScoreRDD = customerScores.mapPartitions(lines => {
  val mapper = new ObjectMapper with ScalaObjectMapper
  mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
  mapper.registerModule(DefaultScalaModule)
  lines
    .map(line => {
      mapper.readValue(line, classOf[CustomerScore])
    })
    //Filter corrupt ones: empty values
    .filter(customerScore => customerScore.customer != null && customerScore.score != null)
})


customerScoreRDD.foreachPartition(rows => cass.withSessionDo(session => {
  val statement: PreparedStatement = session.prepare("INSERT INTO playground.customer_score (customer,score) VALUES (:customer,:score)")
  rows.foreach(row => {
    session.executeAsync(statement.bind(row.customer.asInstanceOf[Object], row.score))
  })
}))

sc.stop()

означает делать все вручную, разбирая строки и затем вставляя в Кассандру.

Это примерно занимает около 714020 мс всего для 10000000 записей (включая создание SparkContext и т. Д. ...).

Затем я прочитал о соединителе искры-кассандры и сделал следующее:

val conf = new SparkConf().setAppName("Application").setMaster("local[2]")
val sc = new SparkContext(conf)
var sql = new SQLContext(sc)

val customerScores = sql.read.json(file)

val customerScoresCorrected = customerScores
  //Filter corrupt ones: empty values
  .filter("customer is not null and score is not null")
  //Filter corrupt ones: invalid properties
  .select("customer", "score")

customerScoresCorrected.write
  .format("org.apache.spark.sql.cassandra")
  .mode(SaveMode.Append)
  .options(Map("keyspace" -> "playground", "table" -> "customer_score"))
  .save()

sc.stop()

Намного проще в смысле необходимого кода и использования заданного API.

Это решение примерно занимает 1232871 мс для 10000000 записей (опять же в целом, так же как и точки измерения).

(Было также третье решение, разбор вручную плюс использование saveToCassandra, которое занимает 1530877 мс)

Теперь мой вопрос:

Какой способ является «правильным» для выполнения этого сценария использования, а какой из них является «наилучшей практикой» (а в реальном сценарии, кластерная кассандра и искра, наиболее эффективная) в настоящее время? Потому что, судя по моим результатам, я бы использовал «ручной» материал вместо SQLContext.read + SQLContext.write.

Заранее благодарим за комментарии и подсказки.


comment
У нас были хорошие результаты при использовании коннектора Cassandra при написании RDD (с использованием saveToCassandra). Использование RDD вместо DataFrames дает вам возможность перераспределить в соответствии с диапазонами токенов Cassandra (с использованием repartitionByCassandraReplica), что приведет к тому, что большинство записей будет локальным, избегая большого количества работы координатора Cassandra.   -  person LiMuBei    schedule 01.07.2016
comment
Спасибо @LiMuBei, это действительно немного уменьшает saveToCassandra, даже в локальном тестовом сценарии. В целом кажется, что ручное решение (первый фрагмент кода) по-прежнему является самым быстрым.   -  person markush81    schedule 04.07.2016
comment
Может быть, в вашем первом решении вы не дожидаетесь фактического завершения асинхронных операций? Мне кажется, что этот способ не может гарантировать, что все операции вставки завершатся успешно.   -  person LiMuBei    schedule 05.07.2016
comment
Ха ... ты прав в моем примере, я не знаю, все ли прошло хорошо.   -  person markush81    schedule 05.07.2016


Ответы (1)


На самом деле, после долгих экспериментов, нужно подумать о следующем.

  • Конечно количество данных
  • Тип ваших данных: особенно разнообразие ключей разделов (каждый из которых отличается от множества дубликатов)
  • Среда: исполнители Spark, узлы Cassandra, репликация ...

В моем случае использования

def initSparkContext: SparkContext = {
    val conf = new SparkConf().setAppName("Application").setMaster("local[2]")
        // since we have nearly totally different PartitionKeys, default: 1000
        .set("spark.cassandra.output.batch.grouping.buffer.size", "1")
        // write as much concurrently, default: 5
       .set("spark.cassandra.output.concurrent.writes", "1024")
       // batch same replica, default: partition
       .set("spark.cassandra.output.batch.grouping.key", "replica_set") 
    val sc = new SparkContext(conf)
    sc
}

действительно сильно увеличил скорость в моем локальном пробеге.

Поэтому очень важно опробовать различные параметры, чтобы получить ВАШЕ наилучшее решение. По крайней мере, таков вывод, который я получил.

person markush81    schedule 15.07.2016