Исключение при десериализации данных avro с помощью ConfluentSchemaRegistry?

Я новичок в моргании и Кафке. Я пытаюсь десериализовать данные avro с помощью реестра Confluent Schema. Я уже установил flink и Kafka на машину ec2. Кроме того, перед запуском кода была создана «тестовая» тема.

Путь к коду: https://gist.github.com/mandar2174/5dc13350b296abf127b

В рамках реализации код выполняет следующую операцию:

1) Create a flink DataStream object using a list of user element. (User class is avro generated class)
2) Write the Datastream source to Kafka using AvroSerializationSchema.
3) Read the data from Kafka using ConfluentRegistryAvroDeserializationSchema by reading the schema from Confluent Schema registry.

Команда для запуска исполняемого jar-файла flink:

./bin/flink run -c com.streaming.example.ConfluentSchemaRegistryExample /opt/flink-1.7.2/kafka-flink-stream-processing-assembly-0.1.jar

Исключение при выполнении кода:

java.io.IOException: Unknown data format. Magic number does not match
    at org.apache.flink.formats.avro.registry.confluent.ConfluentSchemaRegistryCoder.readSchema(ConfluentSchemaRegistryCoder.java:55)
    at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:66)
    at org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:44)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)

Схема Avro, которую я использую для класса User, выглядит следующим образом:

{
  "type": "record",
  "name": "User",
  "namespace": "com.streaming.example",
  "fields": [
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "favorite_number",
      "type": [
        "int",
        "null"
      ]
    },
    {
      "name": "favorite_color",
      "type": [
        "string",
        "null"
      ]
    }
  ]
}

Может ли кто-нибудь указать, какие шаги мне не хватает в рамках десериализации данных avro с использованием конфлюентного реестра схем Kafka?




Ответы (1)


При написании данных Avro необходимо также использовать реестр, чтобы десериализатор, который от него работает, работал.

Но это открытый пиар во Flink, все же для добавления класса ConfluentRegistryAvroSerializationSchema

Я считаю, что обходным путем было бы использовать AvroDeserializationSchema, который не зависит от реестра.

Если вы действительно хотите использовать реестр в коде производителя, вам придется сделать это за пределами Flink, пока этот PR не будет объединен.

person OneCricketeer    schedule 22.04.2019
comment
Правильно ли я понимаю, что я не могу использовать AvroSerializationSchema напрямую с ConfluentRegistryAvroDeserializationSchema, потому что и сериализация, и десериализация должны относиться к реестру конфлюентных схем? У вас есть справочный пример кода, на который я могу сослаться, чтобы использовать реестр схем в коде производителя? - person mandar; 23.04.2019
comment
Верно. Насколько я могу судить, сериализатор Flink Avro не использует реестр, поэтому схема Avro будет встроена как часть каждого сообщения. Примеры простых производителей Java можно увидеть здесь docs.confluent.io / current / schema-registry / - person OneCricketeer; 23.04.2019
comment
Спасибо за подтверждение. Я попытаюсь сослаться на образец кода и протестировать как сериализацию, так и десериализацию, используя объединенный реестр схем. Я обновляю свой результат после тестирования. - person mandar; 23.04.2019