Как остановить Python Kafka Consumer в программе?

Я работаю с потребителем Python Kafka (пытаюсь использовать kafka.consumer.SimpleConsumer или kafka.consumer.simple.SimpleConsumer в http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html). Когда я запускаю следующий фрагмент кода, он будет выполняться все время, даже если будут использованы все сообщения. Я надеюсь, что потребитель остановится, если поглотит все сообщения. Как это сделать? Также я понятия не имею, как использовать функцию stop() (которая находится в базовом классе kafka.consumer.base.Consumer).

ОБНОВЛЕНИЕ

Я использовал обработчик сигналов для вызова Consumer.stop(). Некоторые сообщения об ошибках выводились на экран. Но программа все равно застряла в цикле for. Когда поступали новые сообщения, потребитель потреблял их и печатал. Я также пробовал client.close(). Но тот же результат.

Мне нужно несколько способов изящно остановить цикл for.

        client = KafkaClient("localhost:9092")
        consumer = SimpleConsumer(client, "test-group", "test")

        consumer.seek(0, 2)# (0,2) and (0,0)

        for message in consumer:
            print "Offset:", message.offset
            print "Value:", message.message.value

Любая помощь приветствуется. Спасибо.


person BAE    schedule 05.08.2015    source источник
comment
Если все сообщения были использованы, добавьте свойство consumer_timeout_ms в конструктор KafkaConsumer, таким образом, если в теме больше нет сообщений, ваш цикл for завершится правильно. Дополнительная информация здесь: (stackoverflow.com/a/45183265/922457)   -  person Edenshaw    schedule 19.05.2020


Ответы (3)


Мы можем сначала проверить смещение последнего сообщения в теме. Затем остановите цикл, когда мы достигли этого смещения.

    client = "localhost:9092"
    consumer = KafkaConsumer(client)
    topic = 'test'
    tp = TopicPartition(topic,0)
    #register to the topic
    consumer.assign([tp])

    # obtain the last offset value
    consumer.seek_to_end(tp)
    lastOffset = consumer.position(tp)

    consumer.seek_to_beginning(tp)        

    for message in consumer:
        print "Offset:", message.offset
        print "Value:", message.message.value
        if message.offset == lastOffset - 1:
            break
person Mohit    schedule 01.08.2017

Используйте параметр iter_timeout для установки времени ожидания. Если установлено значение 10, как и в следующем фрагменте кода, он завершится, если в течение 10 секунд не придет новое сообщение. Значение по умолчанию — None, что означает, что потребитель будет блокироваться здесь, даже если не будут поступать новые сообщения.

        self.consumer = SimpleConsumer(self.client, "test-group", "test",
                iter_timeout=10)

Обновить

Вышеупомянутый метод не является хорошим. Когда приходит много сообщений, трудно установить достаточно маленькое значение iter_timeout, чтобы гарантировать остановку. Итак, теперь я использую функцию get_message(), которая пытается использовать одно сообщение и останавливается. None возвращается, когда нет новых сообщений.

person BAE    schedule 06.08.2015

Аналогичное решение для ответа Мохита, но с использованием функции end_offsets потребителя.

from kafka import KafkaConsumer, TopicPartition

# settings
client = "localhost:9092"
topic = 'test'

# prepare consumer
tp = TopicPartition(topic,0)
consumer = KafkaConsumer(client)
consumer.assign([tp])
consumer.seek_to_beginning(tp)  

# obtain the last offset value
lastOffset = consumer.end_offsets([tp])[tp]

for message in consumer:
    print "Offset:", message.offset
    print "Value:", message.message.value
    if message.offset == lastOffset - 1:
        break
person mjspier    schedule 10.01.2019