Потребление сообщения avro с помощью spring-kafka, созданного с помощью spring-cloud-stream-kafka-binder

Я пытаюсь прочитать последнее сообщение, доступное в теме kafka, используя ConsumerSeekAware. Тип сообщения - Список объектов Avro. Я умею это делать успешно. Но когда во время десериализации это не удается. Сообщение было создано с использованием фреймворка spring-cloud-stream-kafka. Сообщение имеет contentType contentType=application/x-java-object;type=java.util.ArrayList.

Я знаю, что сообщение avro можно десериализовать, как показано ниже.

 DatumReader<GenericRecord> datumReader =
            new SpecificDatumReader<>(targetType.newInstance().getSchema());
        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);

        result = (T) datumReader.read(null, decoder);

Но это не работает. Это может быть связано с двумя причинами.

  1. Сообщение представляет собой список объектов avro. Но я пытаюсь создать datamReader со схемой Avro. Но я попытался создать схему типа Schema.createArray (UserDTO.class). Но это не работает.

  2. Я думаю, что тип содержимого, ожидаемый для сообщения avro, - это application / avro, но когда сообщение создается s-c-s, это contentType=application/x-java-object;

Я пытаюсь создать десериализатор, реализовавorg.apache.kafka.common.serialization.Deserializer и построив KafkaConsumerFactory. Может кто поможет?


person Hariharan    schedule 28.09.2017    source источник


Ответы (1)


См. Свойство ...producer.useNativeEncoding, как показано в Свойства производителя.

useNativeEncoding

Если установлено значение true, исходящее сообщение сериализуется непосредственно клиентской библиотекой, которую необходимо соответствующим образом настроить (например, установить соответствующий сериализатор значений производителя Kafka). Когда используется эта конфигурация, маршаллинг исходящих сообщений не основан на contentType привязки. Когда используется собственное кодирование, потребитель несет ответственность за использование соответствующего декодера (например, десериализатора потребительских значений Kafka) для десериализации входящего сообщения. Кроме того, когда используется собственное кодирование / декодирование, свойство headerMode игнорируется, и заголовки не будут встроены в сообщение.

По умолчанию: false.

person Gary Russell    schedule 28.09.2017
comment
Если я использую собственную кодировку, заголовок не включается в запись производителя. Есть ли способ использовать нативную кодировку с заголовками? - person Hariharan; 25.05.2018
comment
Какую версию ты используешь? 2.0 использует Kafka 1.0.1, который изначально поддерживает заголовки, поэтому заголовки могут передаваться даже с использованием собственной кодировки. - person Gary Russell; 25.05.2018