Я установил док-контейнер kafka с портами, сопоставленными с хост-машиной Windows.
составить файл:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.0.1
hostname: zookeeper
container_name: zookeeper
restart: always
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:6.0.1
hostname: broker
container_name: broker
restart: always
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
control-center:
image: confluentinc/cp-enterprise-control-center:6.0.1
hostname: control-center
container_name: control-center
depends_on:
- broker
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
У меня также есть простые сценарии производителя и потребителя, написанные на python:
from kafka import KafkaProducer
topic = 'kontext-kafka'
bootstrap_servers = '192.168.1.103:9092'
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
# Generate 100 messages
for _ in range(10):
msg = f'Kontext kafka msg: {_}'
future = producer.send(topic, msg.encode('utf-8'))
print(f'Sending msg: {msg}')
result = future.get(timeout=60)
metrics = producer.metrics()
print(metrics)
потребитель:
from kafka import KafkaConsumer
topic = 'kontext-kafka'
bootstrap_servers = '192.168.1.103:9092'
consumer = KafkaConsumer(
topic, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest')
for msg in consumer:
print(msg)
Эти сценарии работают на хост-компьютере, но когда я пытаюсь запустить эти сценарии на другом компьютере, они не могут отправлять или получать сообщения, даже если брокер подключен. Примерно через 10 секунд я получаю эту ошибку:
Traceback (most recent call last):
File "//main.py", line 12, in <module>
result = future.get(timeout=60)
File "//venv/lib/python3.8/site-packages/kafka/producer/future.py", line 65, in get
raise self.exception # pylint: disable-msg=raising-bad-type
kafka.errors.KafkaTimeoutError: KafkaTimeoutError: Batch for TopicPartition(topic='kontext-kafka', partition=0) containing 1 record(s) expired: 30 seconds have passed since batch creation plus linger time
Я все еще могу подключить контейнер интерфейса контроллера через браузер, а команда netstat -ab показывает порт 9092 как ПРОСЛУШИВАЕМЫЙ.
Как мне правильно настроить мой контейнер на хост-компьютере, чтобы я мог отправлять и получать сообщения в своей сети?
версия myclient:
kafka-python==2.0.2
ОС хоста:
Win 10
рабочий стол в моей сети:
Ubuntu 20.04.2
PLAINTEXT_HOST://localhost:9092
, чтобы он не был локальным, если вы хотите подключаться с других компьютеров. Netstat не подходит для проверки правильности конфигурации прослушивателя рекламируемого Kafka, только то, чтоlisteners
открыл порт - person OneCricketeer   schedule 09.04.2021