Kafka Streams – SerializationException: неизвестный магический байт

Я пытаюсь создать приложение Kafka Streams, которое обрабатывает записи Avro, но получаю следующую ошибку:

Exception in thread "streams-application-c8031218-8de9-4d55-a5d0-81c30051a829-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:567)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:900)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:801)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Я не уверен, что вызывает эту ошибку. Я просто пытаюсь сначала загрузить записи Avro в приложение, где они затем будут обработаны, а затем выведены в другую тему, но, похоже, это не работает. Я включил код из приложения ниже. Может ли кто-нибудь увидеть, почему он не работает?

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    Serde<String> stringSerde = Serdes.String();
    Serde<trackingReport> specificAvroTrackingReportSerde = new SpecificAvroSerde<trackingReport>();

    specificAvroTrackingReportSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false);


    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, trackingReport> inputreports = builder.stream("intesttopic", Consumed.with(stringSerde, specificAvroTrackingReportSerde));


    KStream<String, trackingReport> outputreports = inputreports;

    String outputTopic = "outtesttopic";
    outputreports.to(outputTopic, Produced.with(stringSerde, specificAvroTrackingReportSerde));

    Topology topology = builder.build();

    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();

person R. B    schedule 18.12.2018    source источник


Ответы (1)


Неизвестный магический байт!

Означает, что ваши данные не соответствуют формату передачи, ожидаемому для реестра схем.

Или, другими словами, данные, которые вы пытаетесь прочитать, не являются данными Avro, как этого ожидает десериализатор Confluent Avro.

Между прочим, вы можете ожидать ту же ошибку, запустив kafka-avro-console-consumer, так что вы можете также использовать ее для отладки.

Если вы уверены, что ваши данные действительно являются Avro, и схема фактически отправляется как часть сообщения (потребуется увидеть ваш код производителя), то вам не следует использовать десериализаторы Confluent Avro, которые ожидают определенный формат байта в сообщении. сообщение. Вместо этого вы можете использовать ByteArrayDesrializer и прочитать запись Avro самостоятельно, а затем передать ее в Apache Avro BinaryDecoder class. В качестве бонуса вы можете извлечь эту логику в свой собственный класс Deserialzer.

Кроме того, если входной темой является Avro, я не думаю, что вам следует использовать это свойство для чтения строк.

DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
person OneCricketeer    schedule 18.12.2018
comment
Да, я только что проверил запуск команды, и она тоже не сработала. Мой производитель такой же, как и по адресу: stackoverflow.com/questions/53781639/ - person R. B; 18.12.2018
comment
Да, я понимаю это свойство, но подумал, что можно переопределить, как я сделал с Consumed.with. - person R. B; 18.12.2018
comment
intesttopic не та тема, в которую отправлено в предыдущем посте - person OneCricketeer; 18.12.2018
comment
Кстати, outputreports — ненужная переменная. Нет необходимости копировать переменную KStream в новое имя. - person OneCricketeer; 18.12.2018
comment
Если я не использую десериализатор Confluent Avro, должен ли я создавать собственный? - person R. B; 18.12.2018
comment
Ваш десериализатор должен инвертировать любой сериализатор, который вы использовали в производителе. В Kafka Streams у вас есть класс Serde, который объединяет два... Я не уверен, что это отвечает на ваш вопрос. - person OneCricketeer; 18.12.2018
comment
Re Или, если это так, и схема отправляется как часть сообщения (необходимо увидеть код вашего производителя), то вам не следует использовать десериализаторы Confluent Avro — какой десериализатор следует использовать в этом случае? Мы пытаемся прочитать сообщения Avro от Oracle Golden Gate и получаем ту же ошибку при попытке десериализации с помощью KafkaAvroDeserializer. - person Kevin Hooke; 28.12.2019
comment
@Kevin С какими сериализаторами / преобразователями вы настроили GoldenGate? Если вы использовали Confluent, то они работают с KafkaAvroDeserializer. Если бы вы использовали конвертеры JSON, вы бы не использовали Avro. Также убедитесь, что вы проверили как ключ, так и значение настроек конвертера. - person OneCricketeer; 28.12.2019
comment
@cricket_007 KafkaAvroSerializer/KafkaAvroDeserializer с сообщениями, отправленными с помощью GG Kafka Handler, получают ошибку Unknown Magic Byte, но тот же KafkaAvroSerializer/KafkaAvroDeserializer для сообщений, отправленных GG с помощью Kafka Connect Handler, работает должным образом. Не уверен, что у нас неправильно настроен обработчик Kafka, но Kafka Connect Handler работает на нас. - person Kevin Hooke; 21.01.2020
comment
@Kevin, на самом деле я не знаком с GoldenGate, но буду рад изучить его, если вы создадите полный пост с проблемой, а не просто комментарий. Обработчик Kafka Connect будет использовать AvroConverter, который оборачивает упомянутые вами seializer. Confluent предлагает только одну комбинацию этих классов, поэтому я не уверен, какие имена пакетов этих классов вы имеете в виду. - person OneCricketeer; 21.01.2020