Я пытаюсь прочитать последнее сообщение, доступное в теме kafka, используя ConsumerSeekAware. Тип сообщения - Список объектов Avro. Я умею это делать успешно. Но когда во время десериализации это не удается. Сообщение было создано с использованием фреймворка spring-cloud-stream-kafka. Сообщение имеет contentType contentType=application/x-java-object;type=java.util.ArrayList
.
Я знаю, что сообщение avro можно десериализовать, как показано ниже.
DatumReader<GenericRecord> datumReader =
new SpecificDatumReader<>(targetType.newInstance().getSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
result = (T) datumReader.read(null, decoder);
Но это не работает. Это может быть связано с двумя причинами.
Сообщение представляет собой список объектов avro. Но я пытаюсь создать datamReader со схемой Avro. Но я попытался создать схему типа Schema.createArray (UserDTO.class). Но это не работает.
Я думаю, что тип содержимого, ожидаемый для сообщения avro, - это application / avro, но когда сообщение создается s-c-s, это
contentType=application/x-java-object;
Я пытаюсь создать десериализатор, реализовавorg.apache.kafka.common.serialization.Deserializer
и построив KafkaConsumerFactory
. Может кто поможет?