У меня проблемы с использованием kafka для моего кода на Python. Я использую python 2.7.5 и пакет kafka-python.
Я хочу отправить csv (300000 строк, 20 полей в строке) через темы кафки. Перед этим я сериализую каждую строку в файл json, и до сих пор все работает. My Producer отправляет каждую строку файлов, а затем закрывает. Но с другой стороны, мой потребитель ничего не потребляет...
Что касается кафки, то у меня одна тема с одним разделом. Мои экземпляры kafka и zookeeper содержатся в контейнерах докеров, но не мой потребитель или производитель.
Вот мой код для производителя: ...
def producer(path) :
producer = KafkaProducer(bootstrap_servers="localhost:9092", retries = 5)
with open(path, newline = '', encoding='utf-8-sig') as csvFile :
reader = csv.DictReader(csvFile, fieldnames = dataElements)
for row in reader :
log = process_row(row)
producer.send(topic = TOPIC, value = json.dumps(log).encode())
producer.flush()
producer.close()
print('processing done')
Вот мой код для потребителя:
consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
consumer.subscribe(TOPIC)
for message in consumer:
log = json.loads(message.value.decode())
print(log)
consumer.close()
Я получаю «обработку» после запуска моего продюсера. Я ничего не получаю, когда запускаю своего потребителя. (Сначала я запускаю своего потребителя).
Я читаю документацию, и она может исходить из конфигурации производителя. Тем не менее я не уверен, какие параметры мне следует изменить (linger_ms, batch_size...?). Мне кажется, что значения по умолчанию работают в моем случае.
Любые идеи ?