Как использовать верблюд-авро-потребитель и производитель?

Я не вижу примера использования компонента camel-avro для создания и использования сообщений kafka avro? В настоящее время мой верблюжий маршрут таков. что его следует изменить, чтобы он работал со схемой-реестром и другими подобными реквизитами, используя camel-kafka-avro consumer & maker.

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);              
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 

public void configure() {
        PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class); 
        pc.setLocation("classpath:application.properties");

        log.info("About to start route: Kafka Server -> Log ");

        from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
                + "&maxPollRecords={{consumer.maxPollRecords}}"
                + "&consumersCount={{consumer.consumersCount}}"
                + "&seekTo={{consumer.seekTo}}"
                + "&groupId={{consumer.group}}"
                +"&valueDeserializer="+KafkaAvroDeserializer.class
                +"&keyDeserializer="+StringDeserializer.class
                )
                .routeId("FromKafka")
            .log("${body}");

person driven_spider    schedule 15.03.2019    source источник
comment
Почему это отклонено? не могли бы они объяснить? даже с этой конфигурацией - ›не работает value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer.   -  person driven_spider    schedule 15.03.2019
comment
Не могли бы вы прояснить, что происходит?   -  person OneCricketeer    schedule 16.03.2019
comment
@ cricket_007: приложение запускается и устанавливает для потребителя kafka, что это значение означает, что AvroDeserializer - это мой собственный класс. Мне не удалось установить соединение kafka от потребителя avro confluent. - ›value.deserializer = class org.apache.camel.example.kafka.AvroDeserializer 18.03.2019 07: 56: 40,663 [nsumer [avro-t1]] ПРЕДУПРЕЖДЕНИЕ KafkaConsumer - Исключение KafkaException, использующее avro-t1-Thread 0 из темы avro-t1. Будет пытаться повторно подключиться при следующем запуске ----- это исключение, приложение продолжает повторно подключаться к брокеру kafka и, следовательно, терпит неудачу.   -  person driven_spider    schedule 18.03.2019
comment
Хм. Я не понимаю этого org.apache.camel.example.kafka.AvroDeserializer. Теоретически любой десериализатор в пути к классам должен работать. Не могли бы вы включить ведение журнала отладки?   -  person OneCricketeer    schedule 18.03.2019
comment
@ cricket_007: спасибо, что помогли мне в мыслительном процессе.   -  person driven_spider    schedule 19.03.2019


Ответы (3)


Я отвечаю на свой вопрос, потому что пару дней сидел над этой проблемой. Надеюсь, этот ответ будет полезен другим.

Я попытался использовать десериализатор io.confluent.kafka.serializers.KafkaAvroDeserializer и получил исключение kafka. поэтому мне пришлось написать собственный десериализатор, чтобы делать следующие вещи:

  1. установить реестр схемы
  2. использовать специальный читатель avro (что означает не стандартный stringDeserializer)

тогда мы должны получить доступ к «schemaRegistry», «useSpecificAvroReader» и установить эти поля AbstractKafkaAvroDeserializer (io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer)

Вот решение ...

КАМЕЛ-КАФКА-АВРО-МАРШРУТ-СТРОИТЕЛЬ

public static void main(String[] args) throws Exception {
    LOG.info("About to run Kafka-camel integration...");
    CamelContext camelContext = new DefaultCamelContext();
    // Add route to send messages to Kafka
    camelContext.addRoutes(new RouteBuilder() {
        public void configure() throws Exception {                
            PropertiesComponent pc = getContext().getComponent("properties", 
                                      PropertiesComponent.class);
            pc.setLocation("classpath:application.properties");

            log.info("About to start route: Kafka Server -> Log ");

            from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
                    + "&maxPollRecords={{consumer.maxPollRecords}}"
                    + "&consumersCount={{consumer.consumersCount}}"
                    + "&seekTo={{consumer.seekTo}}"
                    + "&groupId={{consumer.group}}"
                    + "&keyDeserializer="+ StringDeserializer.class.getName() 
                    + "&valueDeserializer="+CustomKafkaAvroDeserializer.class.getName()
                    )
                    .routeId("FromKafka")
                .log("${body}");

        }
    });
    camelContext.start();
    // let it run for 5 minutes before shutting down
    Thread.sleep(5 * 60 * 1000);
    camelContext.stop();
}

КЛАСС ДЕСЕРИАЛИЗАТОРА - устанавливает schema.registry.url и use.specific.avro.reader на абстрактном уровне AbstractKafkaAvroDeserializer. Если я не установлю это, я получу исключение kafka-config.

package com.example.camel.kafka.avro;

import java.util.Collections;
import java.util.List;
import java.util.Map;


import io.confluent.common.config.ConfigException;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.serialization.Deserializer;



public class CustomKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer
    implements Deserializer<Object> {

    private static final String SCHEMA_REGISTRY_URL = "http://localhost:8081";

    @Override
    public void configure(KafkaAvroDeserializerConfig config) {

     try {
          final List<String> schemas = 
                              Collections.singletonList(SCHEMA_REGISTRY_URL);
          this.schemaRegistry = new CachedSchemaRegistryClient(schemas, 
                                  Integer.MAX_VALUE);
          this.useSpecificAvroReader = true;

       } catch (ConfigException e) {
              throw new org.apache.kafka.common.config.ConfigException(e.getMessage());
     }
   }

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    configure(null);
  }

  @Override
  public Object deserialize(String s, byte[] bytes) {
    return deserialize(bytes);
  }

  @Override
  public void close() {
  }
}
person driven_spider    schedule 19.03.2019
comment
кстати, он был добавлен недавно - stackoverflow.com/a/55437784/2308683 - person OneCricketeer; 09.04.2019
comment
@ cricket_007: В то время я не мог видеть, когда искал и хотел найти решение. Спасибо за ваш тщательный поиск. - person driven_spider; 22.04.2019
comment
Эй, просто интересно, был ли это рабочий проект или что-то, чем вы могли бы поделиться? Я хотел бы увидеть полный пример на github или где-нибудь в этом роде? Благодарность - person AnonymousAlias; 17.06.2019
comment
@ MickO'Gorman: извините, что я не смотрел на это раньше. Посмотрите на мой маршрут: github.com/hykavitha/camel-kafka-microservice/blob/master/src/ Посмотрите CustomKafkaAvroDeserializer github.com/hykavitha/camel-kafka-microservice / blob / master / src / - person driven_spider; 26.06.2019

Используя camel-kafka-starter (для весенней загрузки) version: 3.6.0, вам не нужно определять CustomKafkaAvroDeserializer. Вместо этого добавьте следующие сведения о конфигурации в ваш application.yaml или application.properties файл, и компонент camel-kafka (как производитель, так и потребитель) применит соответствующий SerDe к обрабатываемым объектам / байтам.

camel:
  springboot:
    main-run-controller: true # to keep the JVM running
  component:
    kafka:
      brokers: http://localhost:9092
      schema-registry-u-r-l: http://localhost:8081

      #Consumer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

      #Producer
      key-serializer-class: org.apache.kafka.common.serialization.StringSerializer
      serializer-class: io.confluent.kafka.serializers.KafkaAvroSerializer

      specific-avro-reader: true

Вам также необходимо убедиться, что вы загрузили соответствующие файлы avro-schema-json на свой сервер реестра схем, например, confluent-schema-registry перед запуском производителя / потребителя.

person Muhammad Arslan Akhtar    schedule 28.10.2020
comment
Могли бы вы поделиться соответствующим примером маршрута? - person tstuber; 28.10.2020

Некоторое время я боролся с тем же вопросом. Я сделал полный пример с camel-quarkus и schema-registry из confluent: https://github.com/tstuber/camel-quarkus-kafka-schema-registry

person tstuber    schedule 04.01.2021