добавлен потребитель kafka для динамического обнаружения тем

Я использую KafkaConsumer для получения сообщений с сервера Kafka (темы) ..

  • Он отлично работает для тем, созданных до запуска Consumer code ...

Но проблема в том, что это не будет работать, если темы создаются динамически (я имею в виду, что после запуска кода потребителя), но API говорит, что будет поддерживать создание динамических тем .. Вот ссылка для справки ..

Используемая версия Kafka: 0.9.0.1

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html.

Вот код JAVA ...

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    Pattern r = Pattern.compile("siddu(\\d)*");

    consumer.subscribe(r, new HandleRebalance());
    try {
         while(true) {
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             for (TopicPartition partition : records.partitions()) {
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(partition.partition()  + ": "  +record.offset() + ": " + record.value());
                 }
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
       consumer.close();
     }

ПРИМЕЧАНИЕ. Имена моих тем совпадают с регулярным выражением .. И если я перезапущу пользователя, он начнет читать сообщения, отправленные в тему ...

Любая помощь очень ценится ...


person siddu    schedule 22.03.2016    source источник


Ответы (3)


На это был ответ в почтовых архивах apache kafka. Копирую ниже:

Потребитель поддерживает параметр конфигурации «metadata.max.age.ms», который в основном определяет, как часто выбираются метаданные темы. По умолчанию установлено довольно высокое значение (5 минут), а это значит, что на обнаружение новых тем, соответствующих вашему регулярному выражению, потребуется до 5 минут. Вы можете установить это значение ниже, чтобы быстрее находить темы.

Итак, в вашем реквизите вы можете:

props.put("metadata.max.age.ms", 5000);

Это заставит вашего потребителя узнавать о новых темах каждые 5 секунд.

person bhspencer    schedule 25.08.2016
comment
это также зависит от того, как вы устанавливаете потребительское свойство auto.offset.reset. если он самый последний, они будут выбирать последние / [не использованные ранее] сообщения из известных тем (после запуска потребителя), но не динамические темы. если вы установите его на самое раннее, а также положите consumer.seekToBeginning (consumer.assignment ()); перед опросом - сделайте это только один раз, тогда он распознает динамические / новые темы, но также каждый раз будет получать все записи с самого начала - person Sasha Bond; 31.10.2018
comment
можем ли мы каким-то образом заставить запрос на выборку метаданных? например consumer.fetchMeta () или что-то в этом роде? - person andrii; 20.07.2020

Вы можете подключиться к Zookeeper. Ознакомьтесь с примером кода. По сути, вы создадите наблюдателя на узле Zookeeper /brokers/topics. Когда сюда добавляются новые дочерние элементы, это добавляется новая тема, и ваш наблюдатель запускается.

Обратите внимание, что разница между этим и другим ответом заключается в том, что это триггер, а другой - опрос - этот будет как можно ближе к реальному времени, а другой будет в пределах любого вашего интервала опроса в лучшем случае. .

person David Griffin    schedule 22.03.2016
comment
Спасибо за ваш ответ и помощь ... в основном я хотел использовать KafkaConsumer api для достижения этой цели, и я решил это сам .. - person siddu; 26.03.2016
comment
@madlad см. мой ответ ниже. - person bhspencer; 25.08.2016
comment
Ссылка 'образец кода' недействительна, также вопрос касался использования сообщений, а не только знания новой темы .... новая тема будет доступна в consumer.listTopics (). keySet () в любом случае - person Sasha Bond; 31.10.2018
comment
Ссылка исправлена ​​- также добавлена ​​строка о различии двух подходов. - person David Griffin; 01.11.2018

Вот решение, которое сработало для меня, используя KafkaConsumer api. Вот код Java для этого.

private static Consumer<Long, String> createConsumer(String topic) {
    final Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG,
            "KafkaExampleConsumer");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    // Create the consumer using props.
    final Consumer<Long, String> consumer =
            new KafkaConsumer<>(props);
    // Subscribe to the topic.
    consumer.subscribe(Collections.singletonList(topic));
    return consumer;
}

public static void runConsumer(String topic) throws InterruptedException {
    final Consumer<Long, String> consumer = createConsumer(topic);

    ConsumerRecords<Long, String> records = consumer.poll(100);
    for (ConsumerRecord<Long, String> record : records)
        System.out.printf("hiiiii offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    consumer.commitAsync();
    consumer.close();
    //System.out.println("DONE");
}

используя это, мы можем получать сообщения из динамически создаваемых тем.

person Aakash aggarwal    schedule 20.06.2019