Я пытаюсь реализовать поток Spring Cloud Dataflow, который считывает записи из базы данных, передает их процессору, который преобразуется в схему Avro, а затем передает это для использования приложением-приемником.
У меня есть данные, поступающие из базы данных SQL в мое исходное приложение и передающие данные через связыватель Kafka без каких-либо проблем, потому что у меня возникают проблемы с отправкой данных через процессор в приложение Sink, сериализацию / десериализацию с помощью Avro.
Я создал схему avro под названием ech.avsc и сгенерировал для нее класс под названием EchRecord, используя плагин avro-maven в процессоре.
Я добавил следующие зависимости в память процессора и приемника
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
Я установил свойства процессора на
spring.cloud.stream.bindings.output.contentType=application/*+avro
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true
spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990
со стороны раковины свойства выглядят как spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990
Код приложения процессора выглядит следующим образом:
@EnableBinding(Processor.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class EchProcessorApplication {
private static Logger logger = LoggerFactory.getLogger(EchProcessorApplication.class);
public static void main(String[] args) {
SpringApplication.run(EchProcessorApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public EchRecord transform(List<Map<String, Object>> record) {
return EchRecord.newBuilder()
.setCallId(11111).build();;
}
Со стороны раковины код в его нынешнем виде выглядит следующим образом:
@EnableBinding(Sink.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class AvroLoggerApplication {
private static Logger LOGGER = LoggerFactory.getLogger(AvroLoggerApplication.class);
public static void main(String[] args) {
SpringApplication.run(AvroLoggerApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void logHandler(Object data) {
LOGGER.info("data='{}'", data.toString());
LOGGER.info("class='{}'", data.getClass());
}
}
У меня запущен сервер реестра Spring Schema, доступный для обоих приложений, и при запросе реестра я вижу, что схема была доставлена на сервер.
Я вижу, включаю ли я ведение журнала отладки в приложении-приемнике, что contentType установлен правильно для полученных сообщений: contentType = application / vnd.echrecord.v1 + avro
В приложении Sink я установил метод с аннотацией @StreamListener для извлечения сообщений, принимающих объект и распечатывающих данные и тип класса, и, похоже, он извлекает массив байтов.
Как мне изменить код приложения Sink для десериализации сообщения Avro во что-то, из чего я могу получить установленные данные?