Я работаю с потребителем Python Kafka (пытаюсь использовать kafka.consumer.SimpleConsumer или kafka.consumer.simple.SimpleConsumer в http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html). Когда я запускаю следующий фрагмент кода, он будет выполняться все время, даже если будут использованы все сообщения. Я надеюсь, что потребитель остановится, если поглотит все сообщения. Как это сделать? Также я понятия не имею, как использовать функцию stop() (которая находится в базовом классе kafka.consumer.base.Consumer).
ОБНОВЛЕНИЕ
Я использовал обработчик сигналов для вызова Consumer.stop(). Некоторые сообщения об ошибках выводились на экран. Но программа все равно застряла в цикле for. Когда поступали новые сообщения, потребитель потреблял их и печатал. Я также пробовал client.close(). Но тот же результат.
Мне нужно несколько способов изящно остановить цикл for.
client = KafkaClient("localhost:9092")
consumer = SimpleConsumer(client, "test-group", "test")
consumer.seek(0, 2)# (0,2) and (0,0)
for message in consumer:
print "Offset:", message.offset
print "Value:", message.message.value
Любая помощь приветствуется. Спасибо.
consumer_timeout_ms
в конструкторKafkaConsumer
, таким образом, если в теме больше нет сообщений, ваш циклfor
завершится правильно. Дополнительная информация здесь: (stackoverflow.com/a/45183265/922457) - person Edenshaw   schedule 19.05.2020