Буфер протокола Google (protobuf) – это хорошо известный формат сериализации данных, который эффективен (меньше и быстрее, чем json) и не зависит как от платформы, так и от языка. Этот пост посвящен тому, как мы можем использовать формат сериализации protobuf для отправки сообщений в тему kafka и их чтения. Я также вставил ссылку на github в конце для полного рабочего кода.
Вот мой другой пост о программировании kafka с сериализацией Avro: https://medium.com/@rramiz.rraza/kafka-programming-in-java-with-avro-serialization-f0121db5a5a1
Моя локальная установка:-
- kafka (версия 3.3.2), установленная на виртуальной машине Linux (Fedora)
- Компилятор Profobuf установлен на Windows
- Java-программа в IntelliJ IDE для Windows
- Java подключается к kafka (установлена на виртуальной машине)
Модель :-
Ниже представлена модель, написанная в формате protobuf. После успешной компиляции будет сгенерирован соответствующий класс Java ProtMessage с полями id и name внутри ExchangeProtoMessage.
syntax = "proto3"; package exchange_message_def; option java_package = "com.kafka.message"; option java_outer_classname = "ExchangeProtoMessage"; option optimize_for = SPEED; message ProtMessage { optional int32 id = 1; optional string name = 2; }
Сериализатор модели:-
Чтобы отправлять объекты ProtMessage в качестве значений в топик kafka, нам нужен соответствующий сериализатор, который может преобразовывать объекты ProtMessage в массив байтов. Этот сериализатор передается производителю kafka при инициализации производителя.
package com.kafka.model; import com.kafka.message.ExchangeProtoMessage.ProtMessage; import org.apache.kafka.common.serialization.Serializer; public class ProtMessageSerializer implements Serializer<ProtMessage>{ @Override public byte[] serialize(String topic, ProtMessage data) { return data.toByteArray(); } }
Десериализатор модели:-
Чтобы читать объекты ProtMessage как значения из темы kafka, нам нужен соответствующий десериализатор, который может преобразовывать массив байтов в объекты ProtMessage. Этот десериализатор передается потребителю kafka при инициализации потребителя.
package com.kafka.model; import com.google.protobuf.InvalidProtocolBufferException; import com.kafka.message.ExchangeProtoMessage.ProtMessage; import org.apache.kafka.common.serialization.Deserializer; public class ProtMessageDeserializer implements Deserializer<ProtMessage>{ @Override public ProtMessage deserialize(String topic, byte[] data) { try { return ProtMessage.parseFrom(data); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); throw new RuntimeException("excepiton while parsing"); } } }
Продюсер Кафки :-
Здесь мы инициализируем производителя кафки с ключом целочисленного типа и значением типа ProtMessage. Затем мы отправляем несколько сообщений в тему кафки.
package com.kafka.producer; import com.kafka.message.ExchangeProtoMessage.ProtMessage; import com.kafka.model.ProtMessageSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerSerializer; import java.util.Properties; public class MyKafkaProducerWithProtobufModel { public static void main(String[] args) { System.out.println("going to publish messages"); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("linger.ms", 1); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<Integer, ProtMessage> producer = new KafkaProducer<>(props, new IntegerSerializer(), new ProtMessageSerializer()); for (int i = 1; i <= 10; i++){ producer.send(new ProducerRecord<>("myFirstTopic", 0, i, ProtMessage.newBuilder().setId(i).setName(i + "proto value").build())); } producer.close(); } }
Потребитель Kafka :-
Здесь мы инициализируем потребителя кафки ключом целочисленного типа и значением типа ProtMessage. Затем читаем сообщения, опубликованные производителем из темы кафки.
package com.kafka.consumer; import com.kafka.message.ExchangeProtoMessage.ProtMessage; import com.kafka.model.ProtMessageDeserializer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.IntegerDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class MyKafkaConsumerWithProtobufModel { public static void main(String[] args) { Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<Integer, ProtMessage> consumer = new KafkaConsumer<>(props, new IntegerDeserializer(), new ProtMessageDeserializer()); consumer.subscribe(Arrays.asList("myFirstTopic")); while (true) { ConsumerRecords<Integer, ProtMessage> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<Integer, ProtMessage> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value().toString() + ") at offset " + record.offset()); } } } }
Ссылка на Github: https://github.com/ramizgit/kafka-programming
Оформить заказ: https://medium.com/@rramiz.rraza
Я ценю вас и время, которое вы потратили на чтение этого дня! Пожалуйста, следите (и следите) за новыми блогами о больших данных и других новейших технологиях. Здоровья!