Как загрузить данные истории при запуске процесса Spark Streaming и рассчитать текущие агрегаты

У меня есть некоторые данные JSON, связанные с продажами, в моем кластере ElasticSearch, и я хотел бы использовать Spark Streaming (используя Spark 1.4.1) для динамической агрегации входящих событий продаж с моего веб-сайта электронной коммерции через Kafka, чтобы иметь текущее представление об общем количестве пользователей. продаж (с точки зрения выручки и продукции).

Что мне не совсем понятно из документов, которые я прочитал, так это то, как я могу загрузить данные истории из ElasticSearch при запуске приложения Spark и рассчитать, например, общий доход на пользователя (на основе истории и входящих продаж от Кафка).

У меня есть следующий (рабочий) код для подключения к моему экземпляру Kafka и получения документов JSON:

import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.SQLContext

object ReadFromKafka {
  def main(args: Array[String]) {

    val checkpointDirectory = "/tmp"
    val conf = new SparkConf().setAppName("Read Kafka JSONs").setMaster("local[2]")
    val topicsSet = Array("tracking").toSet

    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(10))

    // Create direct kafka stream with brokers and topics
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    //Iterate
    messages.foreachRDD { rdd =>

      //If data is present, continue
      if (rdd.count() > 0) {

        //Create SQLContect and parse JSON
        val sqlContext = new SQLContext(sc)
        val trackingEvents = sqlContext.read.json(rdd.values)

        //Sample aggregation of incoming data
        trackingEvents.groupBy("type").count().show()

      }

    }

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

Я знаю, что для ElasticSearch существует подключаемый модуль (https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-read), но мне не совсем понятно, как интегрировать чтение при запуске и потоковый расчет процесс объединения данных истории с потоковыми данными.

Помощь очень приветствуется! Заранее спасибо.




Ответы (1)


RDD являются неизменяемыми, поэтому после их создания вы не можете добавлять в них данные, например обновлять доход новыми событиями.

Что вы можете сделать, так это объединить существующие данные с новыми событиями, чтобы создать новый RDD, который затем можно использовать в качестве текущей суммы. Например...

var currentTotal: RDD[(Key, Value)] = ... //read from ElasticSearch
messages.foreachRDD { rdd =>
    currentTotal = currentTotal.union(rdd)
}

В этом случае мы делаем currentTotal var, так как он будет заменен ссылкой на новый RDD, когда он будет объединен с входящими данными.

После объединения вы можете захотеть выполнить некоторые дополнительные операции, такие как уменьшение значений, принадлежащих одному и тому же ключу, но вы получите картину.

Если вы используете этот метод, обратите внимание, что родословная ваших RDD будет расти, так как каждый вновь созданный RDD будет ссылаться на своего родителя. Это может вызвать проблему происхождения стиля переполнения стека. Чтобы исправить это, вы можете периодически вызывать checkpoint() на RDD.

person Patrick McGloin    schedule 27.07.2015
comment
Большое спасибо за ответ. Я уже подозревал, что rdd.union() будет хорошим началом. Будет ли updateStateByKey() также способом сделать это? Поскольку я хочу сохранить агрегаты, я подумал, что это тоже может пригодиться... - person Tobi; 27.07.2015
comment
Я думаю, вы могли бы также использовать updateStateByKey(). Раньше не было способа указать начальное состояние для updateStateByKey(), но я считаю, что это было добавлено, так что это также должно быть жизнеспособным решением. - person Patrick McGloin; 27.07.2015
comment
Можно ли также сделать это с DataFrames? Что-то у меня не получается... - person Tobi; 28.07.2015
comment
Я предполагаю, что вы имеете в виду, возможен ли шаблон union -> reduceByKey с DataFrames, и я думаю, что это не так. Причина в том, что вы хотите использовать функции PairRDDFunctions, такие как union и reduceByKey. Изучая это, я нашел сообщение о группировке наборов данных с использованием DataFrames, так что вы можете изучить это. Но я думаю, что в этом случае проще использовать RDD. forums.databricks.com/questions/956/ - person Patrick McGloin; 28.07.2015
comment
Как убедиться, что вы не обрабатываете одну и ту же запись дважды? т.е. во время чтения данных elasticsearch данные все еще передаются в потоковом режиме, возможно, вы получите потоковое сообщение, которое также существует в конце исходного запроса на загрузку. Надеюсь, это имеет смысл, и я думаю, что это потенциально распространенная проблема? - person ShaunO; 02.12.2015