Переопределить кодер AvroIO по умолчанию в потоке данных

Я пытаюсь использовать собственный кодер для обработки данных в потоке данных. Я сделал следующее:

  • Экспорт данных из BigQuery в файлы avro
  • Автоматически сгенерировал класс из схемы в этих файлах с помощью avro-tools-1.7.7.jar.
  • Написал собственный кодер для класса, используя Kryo
  • Аннотировал класс с помощью @DefaultCoder(MyCustomCoder.class)
  • Зарегистрировал свой кодер с помощью p.getCoderRegistry().registerCoder(MyCustomClass.class, MyCustomCoder.class);
  • Чтение данных из файлов avro с помощью PCollection<MyCustomClass> pc = p.apply(AvroIO.Read.named("Name").from("gs://bucket/path/to/*.avro").withSchema(MyCustomClass.class));

Дело в том, что если у меня есть ошибка в кодере, моя работа завершается сбоем только на шаге перетасовки. Не похоже, чтобы Dataflow использовал мой собственный кодер для загрузки данных из файлов avro. Это действительно так? И если да, то есть ли способ переопределить кодер, используемый для загрузки данных?


person Nira    schedule 24.01.2017    source источник


Ответы (1)


В настоящее время AvroIO всегда использует встроенный AvroCoder для чтения из входных файлов. Вы можете изменить кодировщик позже в конвейере, как вы описываете. Если ваши данные на самом деле не закодированы так, чтобы AvroIO мог их прочитать, вместо этого вам следует использовать другой источник, например, новый подкласс FileBasedSource.

person danielm    schedule 24.01.2017
comment
AvroCoder отлично читает данные, но довольно медленно по сравнению со Spark. Использует ли AvroCoder сериализацию Java по умолчанию? Потому что в Spark я использую Kryo. - person Nira; 25.01.2017
comment
AvroCoder использует сериализацию Avro, которая значительно быстрее сериализации Java по умолчанию. Однако вполне возможно, что Kryo еще быстрее. - person danielm; 26.01.2017