Класс разделителя KafKa, назначьте сообщение разделу внутри темы с помощью ключа

Я новичок в kafka, поэтому извиняюсь, если я звучу глупо, но до сих пор я понял ... Поток сообщений можно определить как тему, например категорию. И каждая тема разделена на один или несколько разделов (каждый раздел может иметь несколько реплик). поэтому они действуют параллельно

Они говорят, что с главного сайта Kafka

Производитель может выбрать, какое сообщение назначить какому разделу в теме. Это можно сделать циклически просто для балансировки нагрузки или в соответствии с некоторой функцией семантического разделения (скажем, на основе некоторого ключа в сообщении).

Означает ли это, что при потреблении я смогу выбрать смещение сообщения для определенного раздела? Можно ли при запуске нескольких разделов выбрать один конкретный раздел, например раздел 0?

Говорят, что в Kafka 0.7 быстрый старт

Отправьте сообщение с ключом раздела. Сообщения с одинаковым ключом отправляются в один и тот же раздел.

И ключ может быть предоставлен при создании производителя, как показано ниже.

    ProducerData<String, String> data = new ProducerData<String, String>("test-topic", "test-key", "test-message");
    producer.send(data);

Как теперь использовать сообщение на основе этого ключа? каково реальное влияние использования этого ключа при производстве в Kafka?

При создании производителя в 0.8beta мы можем предоставить атрибут класса разделителя через файл конфигурации. Пользовательский класс разделителя, возможно, может быть создан с использованием интерфейса разделителя kafka. Но меня немного смущает, как именно это работает. 0.8 doc тоже многого не объясняет. Любой совет, или я что-то упускаю?


person Hild    schedule 13.08.2013    source источник


Ответы (3)


Каждая тема в Kafka разбита на множество разделов. Разделение позволяет параллельное потребление, увеличивая пропускную способность.

Producer публикует сообщение в теме, используя клиентскую библиотеку производителя Kafka, которая балансирует сообщения по доступным разделам с помощью Partitioner. Брокер, к которому подключается производитель, заботится об отправке сообщения брокеру, который является лидером этого раздела, используя информацию о владельце раздела в zookeeper. Потребители используют клиентскую библиотеку высокого уровня Kafka (которая обрабатывает смену лидера брокера, управляет информацией смещения в zookeeper и неявно выясняет информацию о владельце раздела и т. Д.) Для получения сообщений из разделов в потоках; каждый поток может быть сопоставлен с несколькими разделами в зависимости от того, как потребитель решает создавать потоки сообщений.

Например, если есть 10 разделов для темы и 3 экземпляра-потребителя (C1, C2, C3, запущенные в этом порядке), все принадлежащие одной группе потребителей, у нас могут быть разные модели потребления, которые допускают параллелизм чтения, как показано ниже.

Каждый потребитель использует один поток. В этой модели, когда C1 запускается, все 10 разделов темы сопоставляются с одним и тем же потоком, и C1 начинает потреблять из этого потока. Когда запускается C2, Kafka повторно балансирует разделы между двумя потоками. Таким образом, каждый поток будет назначен на 5 разделов (в зависимости от алгоритма перебалансировки он также может быть 4 против 6), и каждый потребитель потребляет из своего потока. Точно так же, когда запускается C3, разделы снова балансируются между 3 потоками. Обратите внимание, что в этой модели при потреблении из потока, назначенного более чем одному разделу, порядок сообщений между разделами будет беспорядочным.

Каждый потребитель использует более одного потока (скажем, C1 использует 3, C2 использует 3, а C3 использует 4). В этой модели при запуске C1 все 10 разделов назначаются 3 потокам, а C1 может потребляют из 3 потоков одновременно, используя несколько потоков. Когда запускается C2, разделы перебалансируются между 6 потоками, и аналогично, когда запускается C3, разделы перебалансируются между 10 потоками. Каждый потребитель может одновременно использовать несколько потоков. Обратите внимание, что количество потоков и разделов здесь одинаковое. Если количество потоков превышает количество разделов, некоторые потоки не получат никаких сообщений, так как им не будут назначены какие-либо разделы.

person java_geek    schedule 15.12.2014
comment
У меня вопрос. Я делаю это для потребителя высокого уровня: Consumer.createJavaConsumerConnector (config), topicCountMap.put (mytopic, 1); а затем я получаю список потоков, но поскольку я указал там 1, это означает, что я получаю только список из 1 потока - значит, когда есть тема с 10 разделами, этот поток будет получать сообщения из всех 10 разделов, верно? Итак, когда я делаю это все 10 раз, я получаю по 10 потоков, каждый из которых извлекается из одного раздела? - person stewenson; 12.07.2015
comment
Если предположить, что все потребители являются частью одной группы потребителей, да. Каждый поток будет сопоставлен с 1 разделом - person java_geek; 18.11.2015

Это то, что я нашел до сих пор.

Определите наш собственный класс разделителя, реализовав интерфейс kafka Partitioner. Реализованный метод будет иметь два аргумента: первый ключ, который мы предоставляем от производителя, а затем количество доступных разделов. Таким образом, мы можем определить нашу собственную логику, чтобы установить, какой ключ сообщения к какому разделу переходит.

Теперь при создании производителя мы можем указать наш собственный класс разделителя, используя атрибут "partitioner.class".

    props.put("partitioner.class", "path.to.custom.partitioner.class");

Если мы не упомянем об этом, Kafka будет использовать свой класс по умолчанию и попытается равномерно распределить сообщение между доступными разделами.

Также сообщите Kafka, как сериализовать ключ

    props.put("key.serializer.class", "kafka.serializer.StringEncoder");

Теперь, если мы отправим какое-то сообщение с использованием ключа в производителе, сообщение будет доставлено в конкретный раздел (на основе нашей логики, написанной в пользовательском классе разделителя), а на уровне потребителя (SimpleConsumer) мы можем указать раздел для получения конкретные сообщения.

В случае, если нам нужно передать String в качестве ключа, то же самое следует обработать в пользовательском классе разделителя (взять хеш-значение ключа, а затем взять первые две цифры и т. Д.)

person Hild    schedule 14.08.2013
comment
Не могли бы вы упомянуть, на каком языке написаны ваши фрагменты кода и какие пакеты вы импортируете? - person Andreas; 26.09.2016

Означает ли это, что при потреблении я смогу выбрать смещение сообщения для определенного раздела? Можно ли при запуске нескольких разделов выбрать один конкретный раздел, например раздел 0?

Да, вы можете выбрать сообщение из одного конкретного раздела от вашего потребителя, но если вы хотите, чтобы оно определялось динамически, это зависит от логики того, как вы реализовали класс Partitioner в своем производителе.

Как теперь использовать сообщение на основе этого ключа? каково реальное влияние использования этого ключа при производстве в Kafka?

Есть два способа использования сообщения. Один использует хост Zookeeper, а другой - статический хост. Хост Zookeper получает сообщения со всех разделов. Однако, если вы используете статический хост, вы можете предоставить брокеру номер раздела, который необходимо использовать.

Пожалуйста, проверьте ниже пример Kafka 0.8

Продюсер

KeyedMessage<String, String> data = new KeyedMessage<String, String>(<<topicName>>, <<KeyForPartition>>, <<Message>>);

Класс раздела

   public int partition(Object arg0, int arg1) {
        // arg0 is the key given while producing, arg1 is the number of
        // partition the broker has
        long organizationId = Long.parseLong((String) arg0);
        // if the given key is less than the no of partition available then send
        // it according to the key given Else send it to the last partition
        if (arg1 < organizationId) {

            return (arg1 - 1);
        }
        // return (int) (organizationId % arg1);
        return Integer.parseInt((String) arg0);
    }

Таким образом, класс partiotioner решает, куда отправить сообщение, исходя из вашей логики.

Потребитель (PN: я использовал интеграцию Storm Kafka 0.8)

        HostPort hosts = new HostPort("10.**.**.***",9092);

        GlobalPartitionInformation gpi = new GlobalPartitionInformation();
        gpi.addPartition(0, hosts);
        gpi.addPartition(2, hosts);

        StaticHosts statHost = new StaticHosts(gpi);

        SpoutConfig spoutConf = new SpoutConfig(statHost, <<topicName>>, "/kafkastorm", <<spoutConfigId>>);
person Biks    schedule 28.08.2013