Avro: преобразовать схему UNION в схему RECORD

У меня есть автоматически сгенерированная схема Avro для простой иерархии классов:

trait T {def name: String}
case class A(name: String, value: Int) extends T
case class B(name: String, history: Array[String]) extends T

Выглядит это так:

 [{
  "name": "org.example.schema.raw.A",
  "type": "record",
  "fields": [{
    "name": "name",
    "type": "string"
  }, {
    "name": "value",
    "type": "int"
  }]
}, {
  "name": "org.example.schema.raw.B",
  "type": "record",
  "fields": [{
    "name": "name",
    "type": "string"
  }, {
    "name": "history",
    "type": {
      "type": "array",
      "items": "string"
    }
  }]
}]

Эта схема хорошо подходит для чтения данных из JSON в GenericRecord с использованием простого Avro API. Следующее, что я пытаюсь достичь, - это сохранить все такие GenericRecord объекты в одном паркетном файле с помощью AvroParquetWriter:

val writer = new AvroParquetWriter[GenericRecord](file, schema)
writer.write(record)
writer.close()

Этот код не работает в первой строке с

java.lang.IllegalArgumentException: Avro schema must be a record.
at parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:96)
at parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:137)
at parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:54)
at parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:86)

Неудивительно, AvroSchemaConverter содержит следующие строки:

if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
      throw new IllegalArgumentException("Avro schema must be a record.");
}

И мой тип схемы - UNION. Любые идеи / помощь в отображении (слиянии) этой схемы UNION в схему RECORD или любые другие предложения чрезвычайно приветствуются.

РЕШЕНИЕ

1) Прочитать JSON из ввода с использованием схемы объединения в GenericRecord 2) Получить или создать AvroParquetWriter для типа:

val writer = writers.getOrElseUpdate(record.getType, new AvroParquetWriter[GenericRecord](getPath(record.getType), record.getShema)

3) Записываем запись в файл:

writer.write(record)

4) Закройте все писатели, когда все данные будут использованы из ввода:

writers.values.foreach(_.close())

5) Загрузите данные из каталога в Spark SQL DataFrame:

sqlContext.option("mergeSchema", "true").parquet("/tmp/data/")

6) Данные можно обрабатывать или хранить как есть - они уже объединены Spark:

df.write.format("parquet").save("merged.parquet")

person Vitalii Kotliarenko    schedule 01.07.2016    source источник


Ответы (2)


Чтобы ответить на ваш вопрос о слиянии: вы можете использовать следующий case class Merged(name: String, value: Option[Int], history: Option[Array[String]]) и использовать сгенерированную схему для записи ваших данных. В общем, если у вас есть прямая совместимость схемы как для A, так и для B, она будет записывать оба правильно.

Или, поскольку, как вы сказали, avro не позволит вам записывать все ваши данные в один и тот же файл, может быть, вы можете разделить вывод по типу и записать один файл для каждого типа? Я знаю, что, вероятно, сделаю это в большинстве случаев использования, которые я могу придумать, но, возможно, это не применимо к вам.

person C4stor    schedule 01.07.2016
comment
Спасибо за Ваш ответ. Что касается описанных вами вариантов: 1) да, я могу, но в реальной жизни у нас есть большая иерархия (›100 классов кейсов, расширяющих 5-6 базовых признаков) и даже есть собственный генератор кода, который сглаживает и объединяет подиерархии, но этот код довольно специфичен и его трудно поддерживать 2) это тоже вариант, записать файл для каждого конкретного класса, а затем объединить его; в любом случае мне нужно загрузить его в одну таблицу Spark в конце, но я не уверен, насколько хорошо Spark справится с этим. - person Vitalii Kotliarenko; 01.07.2016
comment
AvroParquetReader имеет два общих аргумента: один для класса чтения и один для класса проекции. У меня нет подходящего хорошего примера синтаксиса (и iirc, его немного раздражает найти: D), но в основном вы можете прочитать [B, T], чтобы прочитать файл, написанный с помощью схемы B, и все еще иметь RDD [T ]. Я полагаю, что сделает вариант 2 выполнимым. - person C4stor; 01.07.2016
comment
Это даже проще, чем я думал: напишите файл для каждого конкретного класса, а затем используйте Spark SQL, чтобы загрузить весь такой файл в DataFrame. Spark SQL автоматически объединяет схемы. - person Vitalii Kotliarenko; 02.07.2016

Вы можете обернуть свой признак классом case, который является Record.

case class Reord [K] (ключ: K, значение: T)

person yohait    schedule 26.01.2020