Отправка сообщений в kafka для разделения разделов по ключу

У меня есть тема Kafka (test-topic) с 3 разделами и набор сообщений, который содержит ключ, который может принимать только 3 типа значений, я хочу, чтобы эти сообщения отправлялись в отдельные разделы на основе их ценность.

from kafka import KafkaProducer
from kafka.partitioner import DefaultPartitioner

messages = [{"partition_key":"k1", "x":1},
            {"partition_key":"k2", "x":2},
            {"partition_key":"k3", "x":3},
            {"partition_key":"k1", "x":4},
            {"partition_key":"k2", "x":5}]

partitioner = DefaultPartitioner()
all_partitions = list(range(100))
available = all_partitions
dataPartitioner = partitioner(b'partition_key', all_partitions, available)

producer = KafkaProducer(bootstrap_servers="localhost:9092", value_serializer=lambda v: json.dumps(v).encode('utf-8'), partitioner = dataPartitioner)

for m in messages:
  producer.send('test-topic', m)
producer.flush()

В приведенном выше коде я хочу, чтобы сообщения с одинаковым значением partition_key направлялись в один и тот же раздел.


person Devaraj Phukan    schedule 25.07.2018    source источник


Ответы (1)


Вам нужно написать собственную реализацию Partitioner и передайте этот класс KafkaProducer во время инициализации.

E.g.,

 private static Properties createProducerConfig(String brokers) {
    Properties props = new Properties();
    props.put("bootstrap.servers", brokers);
    //more properties
    props.put("partitioner.class","com.app.KafkaUserCustomPatitioner");
    return props;
    }
person hoodakaushal    schedule 26.07.2018