Извлечь отметку времени из сообщений kafka в потоковой передаче искры?

Пытаюсь читать из источника кафки. Я хочу извлечь метку времени из полученного сообщения, чтобы выполнить структурированную потоковую передачу искры. kafka (версия 0.10.0.0) искровая потоковая передача (версия 2.0.1)


person shivali    schedule 14.11.2016    source источник
comment
Не могли бы вы показать фрагмент вашего текущего кода?   -  person vanekjar    schedule 15.11.2016
comment
@vanekjar val ds1 = spark .readStream .format(kafka) .option(kafka.bootstrap.servers, localhost:9092) .option(подписаться, темаA) .load()   -  person shivali    schedule 15.11.2016


Ответы (2)


Я бы предложил пару вещей:

  1. Предположим, вы создаете поток с помощью последнего Kafka Streaming API (0.10 Кафка)

    Например. вы используете зависимость: "org.apache.spark" %% "spark-streaming-kafka-0-10" % 2.0.1

    Затем вы создаете поток в соответствии с приведенными выше документами:

     val kafkaParams = Map[String, Object](
         "bootstrap.servers" -> "broker1:9092,broker2:9092",
         "key.deserializer" -> classOf[StringDeserializer],
         "value.deserializer" -> classOf[ByteArrayDeserializer],
         "group.id" -> "spark-streaming-test",
         "auto.offset.reset" -> "latest",
         "enable.auto.commit" -> (false: java.lang.Boolean))
    
    val sparkConf = new SparkConf()
    // suppose you have 60 second window
    val ssc = new StreamingContext(sparkConf, Seconds(60))
    ssc.checkpoint("checkpoint")
    
    val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent,
    Subscribe[String, Array[Byte]](topics, kafkaParams))
    
  2. Ваш поток будет DStream из ConsumerRecord[String,Array[Byte]], и вы можете получить метку времени и ключ-значение так же просто, как:

    stream.map { record => (record.timestamp(), record.key(), record.value())  }
    

Надеюсь, это поможет.

person Vlad Vlaskin    schedule 26.02.2017
comment
Как преобразовать приведенный выше stream.map {record =› (record.timestamp(), record.key(), record.value()) в DF. я новичок в искре, я хочу включить временную метку из kafka также при преобразовании в DF. не могли бы вы рассказать мне, как это сделать - person BigD; 08.02.2019
comment
@BigD Вы можете сделать что-то вроде: stream.foreachRDD{ (rdd: RDD[ConsumerRecord[String, Array[Byte]]], time: Time) => rdd.map(//some mapping).toDF } - person Vlad Vlaskin; 09.02.2019
comment
если я делаю rdd.map(//некоторое сопоставление).toDF, я преобразовываю свою часть значений в DF.. я также хочу включить временную метку при преобразовании в DF - person BigD; 09.02.2019

spark.read
  .format("kafka")
  .option("kafka.bootstrap.servers", "your.server.com:9092")
  .option("subscribe", "your-topic")
  .load()
  .select($"timestamp", $"value")

Поле «отметка времени» — это то, что вы ищете. Тип — java.sql.Timestamp. Убедитесь, что вы подключаетесь к серверу Kafka версии 0.10. В более ранних версиях временная метка отсутствует. Полный список описанных здесь полей — http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries

person Gorini4    schedule 18.09.2017