Несколько типов сообщений в одной теме Kafka с Avro

У меня есть приложение с источником событий, созданное поверх Kafka. В настоящее время у меня есть одна тема, в которой есть несколько типов сообщений. Все сериализованы / десериализованы с помощью JSON.

Реестр схемы из confluent выглядит как хороший подход к обслуживанию типов сообщений, а в режиме полной совместимости Avro он также предоставляет механизм управления версиями сообщений в моем приложении с источником событий.

С недавним патчем - запись в блоге в сливающуюся версию 4.1.1. Вы можете иметь несколько разных типов сообщений в одной теме с сериализатором / десериализатором Avro.

Однако я не видел ни одного рабочего примера этого. Ни одного.

Мой вопрос: действительно ли вышеупомянутый патч работает без использования Avro Union Types (помещая все разные типы сообщений в одну единую схему и используя union)?

И как этот подход будет работать с приложением Kafka Streaming, где вам нужно указать ключ и значение Serde?

Должен ли я просто забыть об Avro и вместо этого использовать protobuff?


comment
После дополнительных исследований и запуска POC. Я пошел с protobuff3. Мне удалось создать одну схему сообщения protobuff, и у этого сообщения было одно свойство типа Any. Тип Any был моей полезной нагрузкой, и благодаря этому мне удалось создать общую тему, универсальный сериализатор и десериализатор, которые я могу использовать в своем коде для восстановления различных типов сообщений, сериализованных в параметре Any.   -  person J S    schedule 29.08.2018


Ответы (1)


Это пример потребителя, который получает данные из темы, в которой публикуются события разных типов:

package com.kafka.schema;

import com.phonebook.Employee;
import com.phonebook.Milestone;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.IntStream;

public class AvroConsumer {

    private static Consumer<Long, GenericRecord> createConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Const.BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        // Use Kafka Avro Deserializer.
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // Use Specific Record or else you get Avro GenericRecord.
        // props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");

        // Schema registry location.
        // Run Schema Registry on 8081
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, Const.SCHEMA_REGISTRY);
        props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
        return new KafkaConsumer<>(props);
    }

    public static void main(String... args) {
        final Consumer<Long, GenericRecord> consumer = createConsumer();
        consumer.subscribe(Collections.singletonList(Const.TOPIC));
        IntStream.range(1, 100).forEach(index -> {
            final ConsumerRecords<Long, GenericRecord> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
            if (records.count() == 0) {
                System.out.println("None found");
            } else {
                records.forEach(record -> {
                    GenericRecord recValue = record.value();
                    System.out.printf("%s %d %d %s \n", record.topic(), record.partition(), record.offset(), recValue);
                });
            }
        });
    }
}

Важная часть здесь заключается в следующем:

props.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
person Evgeny Semionov    schedule 06.11.2018