Внедрение Avro в Spring Cloud Dataflow

Я пытаюсь реализовать поток Spring Cloud Dataflow, который считывает записи из базы данных, передает их процессору, который преобразуется в схему Avro, а затем передает это для использования приложением-приемником.

У меня есть данные, поступающие из базы данных SQL в мое исходное приложение и передающие данные через связыватель Kafka без каких-либо проблем, потому что у меня возникают проблемы с отправкой данных через процессор в приложение Sink, сериализацию / десериализацию с помощью Avro.

Я создал схему avro под названием ech.avsc и сгенерировал для нее класс под названием EchRecord, используя плагин avro-maven в процессоре.

Я добавил следующие зависимости в память процессора и приемника

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-schema</artifactId>
    <version>1.2.2.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.8.2</version>
</dependency>

Я установил свойства процессора на

spring.cloud.stream.bindings.output.contentType=application/*+avro
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true
spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990

со стороны раковины свойства выглядят как spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990

Код приложения процессора выглядит следующим образом:

@EnableBinding(Processor.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class EchProcessorApplication {

private static Logger logger = LoggerFactory.getLogger(EchProcessorApplication.class);

public static void main(String[] args) {
    SpringApplication.run(EchProcessorApplication.class, args);
}


@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public EchRecord transform(List<Map<String, Object>> record) {
    return EchRecord.newBuilder()
            .setCallId(11111).build();;
}

Со стороны раковины код в его нынешнем виде выглядит следующим образом:

@EnableBinding(Sink.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class AvroLoggerApplication {



    private static Logger LOGGER = LoggerFactory.getLogger(AvroLoggerApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(AvroLoggerApplication.class, args);
    }


    @StreamListener(Sink.INPUT)
    public void logHandler(Object data) {

        LOGGER.info("data='{}'", data.toString());
        LOGGER.info("class='{}'", data.getClass());


    }
}

У меня запущен сервер реестра Spring Schema, доступный для обоих приложений, и при запросе реестра я вижу, что схема была доставлена ​​на сервер.

Я вижу, включаю ли я ведение журнала отладки в приложении-приемнике, что contentType установлен правильно для полученных сообщений: contentType = application / vnd.echrecord.v1 + avro

В приложении Sink я установил метод с аннотацией @StreamListener для извлечения сообщений, принимающих объект и распечатывающих данные и тип класса, и, похоже, он извлекает массив байтов.

Как мне изменить код приложения Sink для десериализации сообщения Avro во что-то, из чего я могу получить установленные данные?


person Bah91    schedule 20.11.2017    source источник
comment
Не могли бы вы предоставить небольшой образец приложения (источник и приемник), в котором мы можем воспроизвести проблему? Нет необходимости использовать какой-либо источник базы данных, просто сериализацию базового источника с использованием Avro и десериализацию потребителя. Таким образом, устранить неполадки будет проще.   -  person sobychacko    schedule 20.11.2017


Ответы (1)


Здесь можно попробовать пару вещей. С производственной стороны, поскольку ваш тип уже является типом Avro (SpecificRecord или GenericRecord), вам не нужен флаг dynamicSchemaGeneration, который предназначен для писателей, основанных на отражении, в основном для тестирования, поскольку он влияет на производительность.

Поскольку ваша раковина может видеть правильный тип, как вы разместили, теперь вам нужно иметь свой тип на раковине. Так, например, добавьте тип в раковину и аннотируйте метод правильным типом: EchRecord, который даст вам правильный тип.

Вы также можете установить для него значение GenericRecord, чтобы иметь возможность обращаться к нему как к контейнеру объекта, используя record.get(<propertyname>)

person Vinicius Carvalho    schedule 20.11.2017