Я использую Kafka (kafka-python
) версии 3.0.0-1.3.0.0.p0.40. Мне нужно настроить потребителя для темы «моделирование» в Python. Когда я не указываю group_id, то есть group_id = None, он получает сообщения нормально. Однако, если я укажу group_id, он не получит никаких сообщений.
Вот мой код на Python:
consumer = KafkaConsumer(bootstrap_servers='XXX.XXX.XXX.XXX:9092',
group_id = 'myTestGroupID', enable_auto_commit = True)
consumer.subscribe(['simulation'])
# not using assign method here as auto_commit is enabled
# partitions = [TopicPartition('simulation',num) for num in range(0,9)]
# consumer.assign([TopicPartition('simulation', partitions[0])])
while not self.stop_event.is_set():
for message in consumer:
print(message)
Я попытался найти некоторые значения по умолчанию для group_id в файлах свойств потребителей, я нашел один cloudera_mirrormaker, но ничего не изменилось. Мне нужно будет использовать несколько потребителей, поэтому важно, чтобы у меня был group_id, и они использовали один и тот же group_id. Во многих источниках я обнаружил, что group_id может быть любой строкой ...
Когда я запускаю потребителя для этой темы в консоли, он работает и получает сообщения
./kafka-console-consumer.sh --bootstrap-server XXX.XXX.XXX.XXX:9092 --topic simulation --from-beginning --consumer-property group.id=myTestGroupID --partition 0
когда я запускаю kafka-consumer-groups.sh, чтобы перечислить все доступные группы, он пуст.
Если у кого-то есть идея, почему он застрял в Python, он был бы очень признателен. Большое спасибо
Вот код для производителя (я уменьшил его для простоты, так как в данном случае это не меняет проблемы)
from kafka import KafkaProducer
class Producer(threading.Thread):
...
def run(self):
producer = KafkaProducer(bootstrap_servers='XXX.XXX.XXX.XXX:9092')
while not self.stop_event.is_set():
string = 'test %s' %time.time()
producer.send('simulation', string.encode())
time.sleep(0.5)
producer.close()
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
- person Deadpool   schedule 21.09.2018self.stop_event.is_set()
? Покажите минимальный воспроизводимый пример - person OneCricketeer   schedule 21.09.2018groups.sh --describe
не должна происходить. Повторите попытку с другой темой и group_id. Также упростите свой потребительский тестовый сценарий, чтобы использовать его без частиwhile not...
. - person stovfl   schedule 21.09.2018