Kafka не получает сообщения при указании group_id в Python

Я использую Kafka (kafka-python) версии 3.0.0-1.3.0.0.p0.40. Мне нужно настроить потребителя для темы «моделирование» в Python. Когда я не указываю group_id, то есть group_id = None, он получает сообщения нормально. Однако, если я укажу group_id, он не получит никаких сообщений.

Вот мой код на Python:

consumer = KafkaConsumer(bootstrap_servers='XXX.XXX.XXX.XXX:9092',
                         group_id = 'myTestGroupID', enable_auto_commit = True)
consumer.subscribe(['simulation'])
# not using assign method here as auto_commit is enabled
# partitions = [TopicPartition('simulation',num) for num in range(0,9)]
# consumer.assign([TopicPartition('simulation', partitions[0])])

while not self.stop_event.is_set():
    for message in consumer:
        print(message)

Я попытался найти некоторые значения по умолчанию для group_id в файлах свойств потребителей, я нашел один cloudera_mirrormaker, но ничего не изменилось. Мне нужно будет использовать несколько потребителей, поэтому важно, чтобы у меня был group_id, и они использовали один и тот же group_id. Во многих источниках я обнаружил, что group_id может быть любой строкой ...

Когда я запускаю потребителя для этой темы в консоли, он работает и получает сообщения

./kafka-console-consumer.sh --bootstrap-server XXX.XXX.XXX.XXX:9092 --topic simulation --from-beginning --consumer-property group.id=myTestGroupID  --partition 0

когда я запускаю kafka-consumer-groups.sh, чтобы перечислить все доступные группы, он пуст.

Если у кого-то есть идея, почему он застрял в Python, он был бы очень признателен. Большое спасибо

Вот код для производителя (я уменьшил его для простоты, так как в данном случае это не меняет проблемы)

from kafka import KafkaProducer
class Producer(threading.Thread):
    ...
    def run(self):
        producer = KafkaProducer(bootstrap_servers='XXX.XXX.XXX.XXX:9092')
        while not self.stop_event.is_set():
            string = 'test %s' %time.time()
            producer.send('simulation', string.encode())
            time.sleep(0.5)
        producer.close()

person Alexander Komarov    schedule 21.09.2018    source источник
comment
можешь описать эту группу на кафке bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group   -  person Deadpool    schedule 21.09.2018
comment
Если вы использовали все смещения в консоли для одной и той же группы, тогда python не получил бы никаких сообщений, иначе что такое self.stop_event.is_set()? Покажите минимальный воспроизводимый пример   -  person OneCricketeer    schedule 21.09.2018
comment
@Deadpool он всегда выдает мне эту ошибку: выполнение команды группы потребителей не удалось из-за истечения времени ожидания команды группы потребителей при ожидании инициализации группы   -  person Alexander Komarov    schedule 21.09.2018
comment
@stovfl Я отредактировал свой вопрос, добавив часть производителя. Я упростил его, так как это не меняет проблемы. Я только что проверил, у меня есть kafka-python версии 1.4.3, он определенно должен поддерживать это   -  person Alexander Komarov    schedule 21.09.2018
comment
Ошибка от groups.sh --describe не должна происходить. Повторите попытку с другой темой и group_id. Также упростите свой потребительский тестовый сценарий, чтобы использовать его без части while not....   -  person stovfl    schedule 21.09.2018
comment
Удачи в этом? У меня такая же проблема.   -  person noname    schedule 19.03.2019
comment
@noname, да, я наконец решил это. Это была моя проблема: файл конфигурации omkafka partitions.number attr было 1 по умолчанию, мы изменили его на 100 по мере необходимости, и он начал работать! Я надеюсь это поможет тебе   -  person Alexander Komarov    schedule 20.03.2019


Ответы (2)


Вопрос: Кафка не получает сообщения при указании group_id


Попробуйте передать 'topic' при KafkaConsumer создании экземпляра, как в Документация:

# join a consumer group for dynamic partition assignment and offset commits
from kafka import KafkaConsumer
consumer = KafkaConsumer('simulation', group_id='myTestGroupID')
for msg in consumer:
    print (msg)

Документация: KafkaConsumer четко описывает тип group-id:

group_id (str или None) - имя группы потребителей, к которой нужно присоединиться для динамического назначения разделов (если включено) и использовать для выборки и фиксации смещений. Если None, автоматическое назначение разделов (через координатора группы) и фиксация смещения отключены. По умолчанию: Нет

person stovfl    schedule 21.09.2018
comment
Я тоже пробовал это, указывая тему при создании экземпляра, но это не помогло ... from kafka import KafkaConsumer consumer = KafkaConsumer ('Simulation', group_id = 'myTestGroupID', bootstrap_servers = 'XXX.XXX.XXX.XXX: 9092 ', enable_auto_commit = True) для сообщения в потребителе: print (msg) - person Alexander Komarov; 21.09.2018

У меня была такая же проблема, я частично не получал сообщения (большинство сообщений теряется) при работе со средой с высокой задержкой и большими сообщениями (›1 МБ).

Я не приложил много усилий для поиска основной причины, но я предполагаю, что перебалансировка потребителя инициируется до завершения обработки сообщения, что, кажется, вызывает проблемы, когда другой потребитель недоступен (в моем случае у меня был один потребитель или два потребителя, которые столкнулись с одной и той же проблемой).

Уловка для меня заключалась в увеличении max_poll_interval_ms и установке max_poll_records = 1

consumer = KafkaConsumer(bootstrap_servers='XXX.XXX.XXX.XXX:9092',
                     group_id = 'myTestGroupID', 
                     enable_auto_commit = True,
                     max_poll_interval_ms=5000,
                     max_poll_records=1)

дополнительную информацию можно найти по адресу: https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html в разделе Обнаружение сбоев потребителей.

person Assaf    schedule 07.07.2020