Буфер протокола Google (protobuf) – это хорошо известный формат сериализации данных, который эффективен (меньше и быстрее, чем json) и не зависит как от платформы, так и от языка. Этот пост посвящен тому, как мы можем использовать формат сериализации protobuf для отправки сообщений в тему kafka и их чтения. Я также вставил ссылку на github в конце для полного рабочего кода.

Вот мой другой пост о программировании kafka с сериализацией Avro: https://medium.com/@rramiz.rraza/kafka-programming-in-java-with-avro-serialization-f0121db5a5a1

Моя локальная установка:-

  1. kafka (версия 3.3.2), установленная на виртуальной машине Linux (Fedora)
  2. Компилятор Profobuf установлен на Windows
  3. Java-программа в IntelliJ IDE для Windows
  4. Java подключается к kafka (установлена ​​на виртуальной машине)

Модель :-

Ниже представлена ​​модель, написанная в формате protobuf. После успешной компиляции будет сгенерирован соответствующий класс Java ProtMessage с полями id и name внутри ExchangeProtoMessage.

syntax = "proto3";

package exchange_message_def;

option java_package = "com.kafka.message";
option java_outer_classname = "ExchangeProtoMessage";
option optimize_for = SPEED;

message ProtMessage {
  optional int32 id = 1;
  optional string name = 2;
}

Сериализатор модели:-

Чтобы отправлять объекты ProtMessage в качестве значений в топик kafka, нам нужен соответствующий сериализатор, который может преобразовывать объекты ProtMessage в массив байтов. Этот сериализатор передается производителю kafka при инициализации производителя.

package com.kafka.model;

import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import org.apache.kafka.common.serialization.Serializer;

public class ProtMessageSerializer implements Serializer<ProtMessage>{
    @Override
    public byte[] serialize(String topic, ProtMessage data) {
        return data.toByteArray();
    }
}

Десериализатор модели:-

Чтобы читать объекты ProtMessage как значения из темы kafka, нам нужен соответствующий десериализатор, который может преобразовывать массив байтов в объекты ProtMessage. Этот десериализатор передается потребителю kafka при инициализации потребителя.

package com.kafka.model;

import com.google.protobuf.InvalidProtocolBufferException;
import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import org.apache.kafka.common.serialization.Deserializer;

public class ProtMessageDeserializer implements Deserializer<ProtMessage>{
    @Override
    public ProtMessage deserialize(String topic, byte[] data) {
        try {
            return ProtMessage.parseFrom(data);
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
            throw new RuntimeException("excepiton while parsing");
        }
    }
}

Продюсер Кафки :-

Здесь мы инициализируем производителя кафки с ключом целочисленного типа и значением типа ProtMessage. Затем мы отправляем несколько сообщений в тему кафки.

package com.kafka.producer;

import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import com.kafka.model.ProtMessageSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;

import java.util.Properties;

public class MyKafkaProducerWithProtobufModel {

    public static void main(String[] args) {

        System.out.println("going to publish messages");

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("linger.ms", 1);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<Integer, ProtMessage> producer = new KafkaProducer<>(props, new IntegerSerializer(), new ProtMessageSerializer());
        for (int i = 1; i <= 10; i++){
            producer.send(new ProducerRecord<>("myFirstTopic", 0, i, ProtMessage.newBuilder().setId(i).setName(i + "proto value").build()));
        }

        producer.close();
    }
}

Потребитель Kafka :-

Здесь мы инициализируем потребителя кафки ключом целочисленного типа и значением типа ProtMessage. Затем читаем сообщения, опубликованные производителем из темы кафки.

package com.kafka.consumer;

import com.kafka.message.ExchangeProtoMessage.ProtMessage;
import com.kafka.model.ProtMessageDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MyKafkaConsumerWithProtobufModel {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<Integer, ProtMessage> consumer = new KafkaConsumer<>(props, new IntegerDeserializer(), new ProtMessageDeserializer());
        consumer.subscribe(Arrays.asList("myFirstTopic"));

        while (true) {
            ConsumerRecords<Integer, ProtMessage> records = consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<Integer, ProtMessage> record : records) {
                System.out.println("Received message: (" + record.key() + ", " + record.value().toString() + ") at offset " + record.offset());
            }
        }
    }
}

Ссылка на Github: https://github.com/ramizgit/kafka-programming

Оформить заказ: https://medium.com/@rramiz.rraza

Я ценю вас и время, которое вы потратили на чтение этого дня! Пожалуйста, следите (и следите) за новыми блогами о больших данных и других новейших технологиях. Здоровья!