У меня есть некоторые данные JSON, связанные с продажами, в моем кластере ElasticSearch, и я хотел бы использовать Spark Streaming (используя Spark 1.4.1) для динамической агрегации входящих событий продаж с моего веб-сайта электронной коммерции через Kafka, чтобы иметь текущее представление об общем количестве пользователей. продаж (с точки зрения выручки и продукции).
Что мне не совсем понятно из документов, которые я прочитал, так это то, как я могу загрузить данные истории из ElasticSearch при запуске приложения Spark и рассчитать, например, общий доход на пользователя (на основе истории и входящих продаж от Кафка).
У меня есть следующий (рабочий) код для подключения к моему экземпляру Kafka и получения документов JSON:
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext
object ReadFromKafka {
def main(args: Array[String]) {
val checkpointDirectory = "/tmp"
val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[2]")
val topicsSet = Array("tracking").toSet
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
// Create direct kafka stream with brokers and topics
val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
//Iterate
messages.foreachRDD { rdd =>
//If data is present, continue
if (rdd.count() > 0) {
//Create SQLContect and parse JSON
val sqlContext = new SQLContext(sc)
val trackingEvents = sqlContext.read.json(rdd.values)
//Sample aggregation of incoming data
trackingEvents.groupBy("type").count().show()
}
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
Я знаю, что для ElasticSearch существует подключаемый модуль (https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-read), но мне не совсем понятно, как интегрировать чтение при запуске и потоковый расчет процесс объединения данных истории с потоковыми данными.
Помощь очень приветствуется! Заранее спасибо.