У меня есть автоматически сгенерированная схема Avro для простой иерархии классов:
trait T {def name: String}
case class A(name: String, value: Int) extends T
case class B(name: String, history: Array[String]) extends T
Выглядит это так:
[{
"name": "org.example.schema.raw.A",
"type": "record",
"fields": [{
"name": "name",
"type": "string"
}, {
"name": "value",
"type": "int"
}]
}, {
"name": "org.example.schema.raw.B",
"type": "record",
"fields": [{
"name": "name",
"type": "string"
}, {
"name": "history",
"type": {
"type": "array",
"items": "string"
}
}]
}]
Эта схема хорошо подходит для чтения данных из JSON в GenericRecord
с использованием простого Avro API. Следующее, что я пытаюсь достичь, - это сохранить все такие GenericRecord
объекты в одном паркетном файле с помощью AvroParquetWriter
:
val writer = new AvroParquetWriter[GenericRecord](file, schema)
writer.write(record)
writer.close()
Этот код не работает в первой строке с
java.lang.IllegalArgumentException: Avro schema must be a record.
at parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:96)
at parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:137)
at parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:54)
at parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:86)
Неудивительно, AvroSchemaConverter содержит следующие строки:
if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
throw new IllegalArgumentException("Avro schema must be a record.");
}
И мой тип схемы - UNION. Любые идеи / помощь в отображении (слиянии) этой схемы UNION в схему RECORD или любые другие предложения чрезвычайно приветствуются.
РЕШЕНИЕ
1) Прочитать JSON из ввода с использованием схемы объединения в GenericRecord
2) Получить или создать AvroParquetWriter
для типа:
val writer = writers.getOrElseUpdate(record.getType, new AvroParquetWriter[GenericRecord](getPath(record.getType), record.getShema)
3) Записываем запись в файл:
writer.write(record)
4) Закройте все писатели, когда все данные будут использованы из ввода:
writers.values.foreach(_.close())
5) Загрузите данные из каталога в Spark SQL DataFrame:
sqlContext.option("mergeSchema", "true").parquet("/tmp/data/")
6) Данные можно обрабатывать или хранить как есть - они уже объединены Spark:
df.write.format("parquet").save("merged.parquet")