Я пытаюсь использовать собственный кодер для обработки данных в потоке данных. Я сделал следующее:
- Экспорт данных из BigQuery в файлы avro
- Автоматически сгенерировал класс из схемы в этих файлах с помощью avro-tools-1.7.7.jar.
- Написал собственный кодер для класса, используя Kryo
- Аннотировал класс с помощью
@DefaultCoder(MyCustomCoder.class)
- Зарегистрировал свой кодер с помощью
p.getCoderRegistry().registerCoder(MyCustomClass.class, MyCustomCoder.class);
- Чтение данных из файлов avro с помощью
PCollection<MyCustomClass> pc = p.apply(AvroIO.Read.named("Name").from("gs://bucket/path/to/*.avro").withSchema(MyCustomClass.class));
Дело в том, что если у меня есть ошибка в кодере, моя работа завершается сбоем только на шаге перетасовки. Не похоже, чтобы Dataflow использовал мой собственный кодер для загрузки данных из файлов avro. Это действительно так? И если да, то есть ли способ переопределить кодер, используемый для загрузки данных?