Доброе утро,
я только начал исследовать Apache Spark и Apache Cassandra. Первый шаг - это очень простой вариант использования: взять файл, содержащий, например, клиент + оценка.
Таблица Cassandra имеет клиента как PrimaryKey. Кассандра просто работает локально (так что никакого кластера!).
Итак, SparkJob (автономный локальный [2]) анализирует файл JSON и затем записывает все в Cassandra.
Первое решение было
val conf = new SparkConf().setAppName("Application").setMaster("local[2]")
val sc = new SparkContext(conf)
val cass = CassandraConnector(conf)
val customerScores = sc.textFile(file).cache()
val customerScoreRDD = customerScores.mapPartitions(lines => {
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.registerModule(DefaultScalaModule)
lines
.map(line => {
mapper.readValue(line, classOf[CustomerScore])
})
//Filter corrupt ones: empty values
.filter(customerScore => customerScore.customer != null && customerScore.score != null)
})
customerScoreRDD.foreachPartition(rows => cass.withSessionDo(session => {
val statement: PreparedStatement = session.prepare("INSERT INTO playground.customer_score (customer,score) VALUES (:customer,:score)")
rows.foreach(row => {
session.executeAsync(statement.bind(row.customer.asInstanceOf[Object], row.score))
})
}))
sc.stop()
означает делать все вручную, разбирая строки и затем вставляя в Кассандру.
Это примерно занимает около 714020 мс всего для 10000000 записей (включая создание SparkContext и т. Д. ...).
Затем я прочитал о соединителе искры-кассандры и сделал следующее:
val conf = new SparkConf().setAppName("Application").setMaster("local[2]")
val sc = new SparkContext(conf)
var sql = new SQLContext(sc)
val customerScores = sql.read.json(file)
val customerScoresCorrected = customerScores
//Filter corrupt ones: empty values
.filter("customer is not null and score is not null")
//Filter corrupt ones: invalid properties
.select("customer", "score")
customerScoresCorrected.write
.format("org.apache.spark.sql.cassandra")
.mode(SaveMode.Append)
.options(Map("keyspace" -> "playground", "table" -> "customer_score"))
.save()
sc.stop()
Намного проще в смысле необходимого кода и использования заданного API.
Это решение примерно занимает 1232871 мс для 10000000 записей (опять же в целом, так же как и точки измерения).
(Было также третье решение, разбор вручную плюс использование saveToCassandra
, которое занимает 1530877 мс)
Теперь мой вопрос:
Какой способ является «правильным» для выполнения этого сценария использования, а какой из них является «наилучшей практикой» (а в реальном сценарии, кластерная кассандра и искра, наиболее эффективная) в настоящее время? Потому что, судя по моим результатам, я бы использовал «ручной» материал вместо SQLContext.read
+ SQLContext.write
.
Заранее благодарим за комментарии и подсказки.
saveToCassandra
). Использование RDD вместо DataFrames дает вам возможность перераспределить в соответствии с диапазонами токенов Cassandra (с использованиемrepartitionByCassandraReplica
), что приведет к тому, что большинство записей будет локальным, избегая большого количества работы координатора Cassandra. - person LiMuBei   schedule 01.07.2016saveToCassandra
, даже в локальном тестовом сценарии. В целом кажется, что ручное решение (первый фрагмент кода) по-прежнему является самым быстрым. - person markush81   schedule 04.07.2016