Spark текстовое поле JSON для RDD

У меня есть таблица cassandra с полем типа text с именем snapshot, содержащим объекты JSON:

[identifier, timestamp, snapshot]

Я понял, что для того, чтобы иметь возможность выполнять преобразования в этом поле с помощью Spark, мне нужно преобразовать это поле этого RDD в другой RDD, чтобы выполнить преобразования в схеме JSON.

Это правильно? Как мне к этому приступить?

Изменить: на данный момент мне удалось создать RDD из одного текстового поля:

val conf = new SparkConf().setAppName("signal-aggregation")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots")
val first = snapshots.first()
val firstJson = sqlContext.jsonRDD(sc.parallelize(Seq(first._3)))
firstJson.printSchema()

Что показывает мне схему JSON. Хороший!

Как сообщить Spark, что эта схема должна применяться ко всем строкам моментальных снимков таблицы, чтобы получить RDD для этого поля моментального снимка из каждой строки?


person galex    schedule 04.05.2015    source источник
comment
Если я правильно понимаю, у вас есть несколько объектов JSON внутри каждого поля в таблице cassandra, и вам нужно вычислять каждый объект независимо.   -  person Mikel Urkia    schedule 04.05.2015
comment
Да, вы правы, но я где-то читал, что Spark может понимать это текстовое поле как json и что я могу выполнять преобразования для некоторых значений этих jsons, это правильно?   -  person galex    schedule 04.05.2015


Ответы (1)


Почти готово, вы просто хотите передать свой RDD[String] с вашим json в метод jsonRDD

val conf = new SparkConf().setAppName("signal-aggregation")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots")
val jsons = snapshots.map(_._3) // Get Third Row Element Json(RDD[String]) 
val jsonSchemaRDD = sqlContext.jsonRDD(jsons) // Pass in RDD directly
jsonSchemaRDD.registerTempTable("testjson")
sqlContext.sql("SELECT * FROM testjson where .... ").collect 

Быстрый пример

val stringRDD = sc.parallelize(Seq(""" 
  { "isActive": false,
    "balance": "$1,431.73",
    "picture": "http://placehold.it/32x32",
    "age": 35,
    "eyeColor": "blue"
  }""",
   """{
    "isActive": true,
    "balance": "$2,515.60",
    "picture": "http://placehold.it/32x32",
    "age": 34,
    "eyeColor": "blue"
  }""", 
  """{
    "isActive": false,
    "balance": "$3,765.29",
    "picture": "http://placehold.it/32x32",
    "age": 26,
    "eyeColor": "blue"
  }""")
)
sqlContext.jsonRDD(stringRDD).registerTempTable("testjson")
csc.sql("SELECT age from testjson").collect
//res24: Array[org.apache.spark.sql.Row] = Array([35], [34], [26])
person RussS    schedule 04.05.2015