Kafka-Python, производитель отправляет запись, но потребитель ее не получает

У меня проблемы с использованием kafka для моего кода на Python. Я использую python 2.7.5 и пакет kafka-python.

Я хочу отправить csv (300000 строк, 20 полей в строке) через темы кафки. Перед этим я сериализую каждую строку в файл json, и до сих пор все работает. My Producer отправляет каждую строку файлов, а затем закрывает. Но с другой стороны, мой потребитель ничего не потребляет...

Что касается кафки, то у меня одна тема с одним разделом. Мои экземпляры kafka и zookeeper содержатся в контейнерах докеров, но не мой потребитель или производитель.

Вот мой код для производителя: ...

def producer(path) :
    producer = KafkaProducer(bootstrap_servers="localhost:9092", retries = 5)

    with open(path, newline = '', encoding='utf-8-sig') as csvFile :
        reader = csv.DictReader(csvFile, fieldnames = dataElements)
        for row in reader :
            log = process_row(row)
            producer.send(topic = TOPIC, value = json.dumps(log).encode())
    producer.flush()
    producer.close()
    print('processing done')

Вот мой код для потребителя:

consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
consumer.subscribe(TOPIC)
for message in consumer:
    log = json.loads(message.value.decode())
    print(log)
consumer.close()

Я получаю «обработку» после запуска моего продюсера. Я ничего не получаю, когда запускаю своего потребителя. (Сначала я запускаю своего потребителя).

Я читаю документацию, и она может исходить из конфигурации производителя. Тем не менее я не уверен, какие параметры мне следует изменить (linger_ms, batch_size...?). Мне кажется, что значения по умолчанию работают в моем случае.

Любые идеи ?


person RxxxxSxxxx    schedule 21.01.2021    source источник
comment
На вашем месте я бы использовал Python 3.x, так как Python 2.7.x больше не поддерживается. Для вашей проблемы вы можете использовать akhq, чтобы увидеть, что происходит в ваших темах.   -  person MetallimaX    schedule 21.01.2021
comment
Я бы хотел, но это не зависит от меня. Хорошо, я собираюсь попробовать это, спасибо!   -  person RxxxxSxxxx    schedule 21.01.2021
comment
Доступны сценарии миграции, и миграция должна быть приоритетной для любого проекта, поскольку прежний Python больше не поддерживается.   -  person MetallimaX    schedule 21.01.2021
comment
Я скажу об этом своему руководителю :) Итак, я проверил свои экземпляры kafka в своем контейнере докеров. Моя тема существует и там фактически 1 раздел. Я пытался потреблять через консоль-потребителя, и он не потреблял никаких записей...   -  person RxxxxSxxxx    schedule 21.01.2021
comment
Для меня это больше похоже на проблему Кафки, но не уверен на 100%.   -  person MetallimaX    schedule 21.01.2021
comment
Я нашел решение: я создал 2 контейнера (1 для моего производителя, 1 для моего потребителя), и это сработало (я просто изменил ip: порты). Так что проблема, похоже, в соединении. между моим контейнером Kafka и моей локальной машиной.   -  person RxxxxSxxxx    schedule 21.01.2021


Ответы (1)


Я понял это, используя следующее содержимое: https://www.kaaproject.org/blog/kafka-docker https://github.com/wurstmeister/kafka-docker/wiki/Connectivity

Требуется добавить некоторые переменные среды, такие как KAFKA_ADVERTISED_HOST, в файл docker-compose.yml, чтобы клиенты могли подключаться к брокеру kafka из-за пределов узла докера.

person RxxxxSxxxx    schedule 11.02.2021