РЕДАКТИРОВАТЬ: извините за качество предыдущего вопроса, я надеюсь, что этот будет более ясным: с помощью приложения Spark я загружаю весь каталог следующих файлов JSON:
{
"type": "some_type",
"payload": {
"data1": {
"id": "1"
},
"data2": {
"id": "1",
},
"data3": {
"id": "1"
},
"dataset1": [{
"data11": {
"id": "1",
},
"data12": {
"id": "1",
}
}],
"masterdata": {
"md1": [{
"id": "1"
},
{
"id": "2"
},
{
"id": "3"
}],
"md2": [{
"id": "1",
},
{
"id": "2",
},
{
"id": "3",
}]
}
}
}
в DataFrame
и сохраните как временную таблицу, чтобы использовать ее позже. В этом Json всегда присутствуют поля из узла «payload», но подузлы в «masterdata» необязательны. Следующим шагом является создание нескольких DataFrames для каждого подузла Json следующим образом: DataFrame data1 содержит данные узла «data1» из всех файлов и выглядит как обычная таблица со столбцом «id». После первой части обработки мое состояние Spark выглядит следующим образом: DataFrames: data1 (id), data2 (id), data3 (id), data11 (id), data12 (id), md1 (id), md2 (id)
Здесь возникает проблема - если один из файлов JSON в каталоге не содержит узла md2, я не могу запустить ни show()
, ни collect()
на фрейме данных «md2» из-за исключения NullPointException. Я бы понял, если во всех файлах отсутствует узел "md2", поэтому он не может создать md2 DataFrame, но в этом случае я ожидаю, что md2 DataFrame просто не будет иметь данных из файла json, который не имеет узла md2, но содержит все остальные.
Технические детали: для чтения данных из вложенного узла я использую rdd.map & rdd.flatmap, затем конвертирую его в DataFrame
с пользовательскими именами столбцов
Если я запускаю приложение, когда все файлы в каталоге содержат все узлы, все работает, но если один файл отсутствует, узел md2 Приложение не работает при .show () или .collect ()
Кстати, если узел существует, но он пуст, все работает нормально.
Есть ли способ заставить Spark поддерживать дополнительные узлы Json или обрабатывать отсутствующие узлы в rdd.map и flatmap?
Надеюсь, это более ясно, чем в предыдущем вопросе
По запросу @Beryllium вот операции rdd, которые я использую для получения md2 DataFrame
val jsonData = hiveContext.sql("SELECT `payload`.masterdata.md2 FROM jsonData")
val data = jsonData.rdd.flatMap(row => row.getSeq[Row](0)).map(row => (
row.getString(row.fieldIndex("id"))
)).distinct
val dataDF = data.toDF("id")
sqlContext.read.json
. - person Beryllium   schedule 27.11.2015