У меня есть тема Kafka (test-topic) с 3 разделами и набор сообщений, который содержит ключ, который может принимать только 3 типа значений, я хочу, чтобы эти сообщения отправлялись в отдельные разделы на основе их ценность.
from kafka import KafkaProducer
from kafka.partitioner import DefaultPartitioner
messages = [{"partition_key":"k1", "x":1},
{"partition_key":"k2", "x":2},
{"partition_key":"k3", "x":3},
{"partition_key":"k1", "x":4},
{"partition_key":"k2", "x":5}]
partitioner = DefaultPartitioner()
all_partitions = list(range(100))
available = all_partitions
dataPartitioner = partitioner(b'partition_key', all_partitions, available)
producer = KafkaProducer(bootstrap_servers="localhost:9092", value_serializer=lambda v: json.dumps(v).encode('utf-8'), partitioner = dataPartitioner)
for m in messages:
producer.send('test-topic', m)
producer.flush()
В приведенном выше коде я хочу, чтобы сообщения с одинаковым значением partition_key направлялись в один и тот же раздел.