Заставьте Reactive Kafka работать с Confluent Schema Registry Avro Schema

Как сделать Reactive Kafka (https://github.com/akka/reactive-kafka) работать с Confluent Schema Registry Avro Schema? Вот мой пример кода:

def create(groupId: String)(implicit system: ActorSystem): Source[ConsumerMessage.CommittableMessage[String, Array[Byte]], Consumer.Control] = {
    
    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new ByteArrayDeserializer)
      .withBootstrapServers(bootstrap)
      .withGroupId(groupId)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      .withProperty("schema.registry.url", schemaRegistryUrl)
      .withProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[String].getName)
      .withProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer].getName)
      .withProperty(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
    
    Consumer.committableSource(consumerSettings, Subscriptions.topics(createDataSetJobRequestTopic))
}


person Casel Chen    schedule 19.06.2017    source источник


Ответы (1)


Вам просто нужно настроить потребителя для использования десериализатора Confluent: io.confluent.kafka.serializers.KafkaAvroDeserializer.class

Установите следующие свойства:

  • ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
  • ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG

Кроме того, вы должны добавить свойство schema.registry.url, чтобы указать хотя бы один адрес, указывающий на ваш экземпляр реестра схемы.

Наконец, вы должны добавить в свой проект следующую зависимость:

            <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${io.confluent.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

Confluent Documentation

person fhussonnois    schedule 19.06.2017
comment
Спасибо, я знаю, как использовать сообщение kafka avro с реестром схемы, проблема, которую я спросил, заключается в том, как использовать его с реактивной kafka. Для этого сначала необходимо создать consumerSettings, как в моем вставленном выше коде. Я не знал, можно ли получить Source [ConsumerMessage.CommittableMessage [String, AvroClass] напрямую, потому что KafkaAvroDeseriablizer не является универсальным. В противном случае я получил бы CommittableMessage [String, Object], а затем применил бы его к конкретному? - person Casel Chen; 19.06.2017
comment
Привет, тебе удалось это понять. У меня точная проблема десериализации сообщений kafka через потоки akka kafka. Он возвращает объект, который я не могу привести к своему типу. - person armulator; 18.09.2017
comment
Похоже, проблема не была устранена, и о ней сообщили другие. github.com/akka/reactive-kafka/issues/342. - person Casel Chen; 03.12.2017