Сопоставьте файлы Avro с классом Java с разными именами полей

У меня проблема с простой задачей spark, которая читает файл Avro, а затем сохраняет его как таблицу паркета Hive.

У меня есть 2 типа файла, в целом они одинаковы, но ключевая структура немного отличается - имена полей.

Тип 1

root
|-- pk: strucnt (nullable = true)
    |-- term_id: string (nullale = true)

Тип 2

root
|-- pk: strucnt (nullable = true)
    |-- id: string (nullale = true)

Я читаю Avro, используя spark-avro. А затем сопоставьте этот DF с таким bean-компонентом

Dataset<SomeClass> df = avroDF.as(Encoders.bean(SomeClass.class));

SomeClass — это простой класс с одним полем и геттером и сеттером.

public class SomeClass{
    private String term_id;
    ...
}

Так что если я читаю Авро тип 1 - это нормально. Но если я читаю Авро типа 2 - возникает ошибка. И наоборот, если я меняю имя поля на private String id;

Есть ли универсальное решение моей проблемы? Я нашел @AvroName, но он не позволяет установить несколько имен. Спасибо.


person Danila Zharenkov    schedule 29.01.2018    source источник


Ответы (2)


Только один способ - изменить имя поля набора данных на имя, указанное в схеме. Используйте этот пример, чтобы сделать это:

val newName = Seq("id", "x1", "x2", "x3")
Dataset<SomeClass> df = avroDF.toDF(newNames: _*).as(Encoders.bean(SomeClass.class));

Вы не можете преобразовать кадр данных в BeanClass, который имеет разные имена полей.

person Yehor Krivokon    schedule 29.01.2018

Возможное решение

StructType avroExtendedSchema = avroDF.schema().add("id",DataTypes.StringType);
avroDF.map(row->RowFactory(row.getStruct(0),row.getStruct(0).getString(0)), 
       RowEncoder.apply(avroExtendedSchema)).toDF();

Таким образом, второе поле DF будет называться «id» и содержать строковый ключ. Первую структуру "pk" можно удалить в будущем.

avroDF.drop("pk");

PS Я нашел третий тип схемы:

root
|-- pk: strucnt (nullable = true)
    |-- id: int(nullale = true)

Итак, окончательный код выглядит так:

DataType keyType = avroDF.select("pk.*").schema().fields[0].dataType();
StructType avroExtendedSchema = avroDF.schema().add("id",keyType);
avroDF.map(row->RowFactory(row.getStruct(0),row.getStruct(0).get(0)), 
       RowEncoder.apply(avroExtendedSchema)).drop("pk").toDF();

Этот код подходит для любого примитивного\строкового ключа.

person Danila Zharenkov    schedule 29.01.2018