Чего я хочу достичь:
У нас есть локальный кластер Kafka. Я хочу настроить KSQLDB в OpenShift и подключить его к брокерам локального кластера Kafka.
Проблема:
Когда я пытаюсь запустить сервер KSQLDB с помощью команды / usr / bin / ksql-server-start /etc/ksqldb/ksql-server.properties, я получаю сообщение об ошибке :
[2020-05-14 15:47:48,519] ERROR Failed to start KSQL (io.confluent.ksql.rest.server.KsqlServerMain:60)
io.confluent.ksql.util.KsqlServerException: Could not get Kafka cluster configuration!
at io.confluent.ksql.services.KafkaClusterUtil.getConfig(KafkaClusterUtil.java:90)
at io.confluent.ksql.security.KsqlAuthorizationValidatorFactory.isKafkaAuthorizerEnabled(KsqlAuthorizationValidatorFactory.java:81)
at io.confluent.ksql.security.KsqlAuthorizationValidatorFactory.create(KsqlAuthorizationValidatorFactory.java:51)
at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:624)
at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:544)
at io.confluent.ksql.rest.server.KsqlServerMain.createExecutable(KsqlServerMain.java:98)
at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:56)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1589471268517) timed out at 1589471268518 after 1 attempt(s)
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at io.confluent.ksql.services.KafkaClusterUtil.getConfig(KafkaClusterUtil.java:60)
... 6 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1589471268517) timed out at 1589471268518 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
Моя конфигурация
Я настроил свой Dockerfile на основе этого образа: https://hub.docker.com/r/confluentinc/ksqldb-server, порты 9092, 9093, 8080, 8082 и 443 открыты.
Мой сервис-ямл выглядит так:
kind: Service
apiVersion: v1
metadata:
name: social-media-dev
namespace: abc
selfLink: xyz
uid: xyz
resourceVersion: '1'
creationTimestamp: '2020-05-14T09:47:15Z'
labels:
app: social-media-dev
annotations:
openshift.io/generated-by: OpenShiftNewApp
spec:
ports:
- name: social-media-dev
protocol: TCP
port: 9092
targetPort: 9092
nodePort: 31364
selector:
app: social-media-dev
deploymentconfig: social-media-dev
clusterIP: XX.XX.XXX.XXX
type: LoadBalancer
externalIPs:
- XXX.XX.XXX.XXX
sessionAffinity: None
externalTrafficPolicy: Cluster
status:
loadBalancer:
ingress:
- ip: XX.XX.XXX.XXX
Мой файл ksql-server.properties содержит следующую информацию:
слушатели: http://0.0.0.0:8082
bootstrap.servers: XXXX: 9092, XXXY: 9092, XXXZ: 9092
Что я пробовал на данный момент:
Я попытался подключиться к брокеру из модуля, и это сработало: (timeout 1 bash -c '</dev/tcp/X.X.X.X/9092 && echo PORT OPEN || echo PORT CLOSED') 2>/dev/null
результат: PORT OPEN
Я также поигрался со слушателем, но затем сообщение об ошибке стало короче только с информацией. Не удалось получить конфигурацию кластера Kafka! и без ошибки тайм-аута.
Я пытался заменить LoadBalancer на Nodeport, но тоже безуспешно.
У вас есть идеи, что я могу попробовать дальше?
ОБНОВЛЕНИЕ: с обновлением до Cloudera CDH6 кластер Cloudera Kafka теперь работает также с Kafka Streams. Следовательно, теперь я смог подключиться из моего кластера KSQLDB в Openshift к локальному кластеру Kafka.
-L
)? - person Robin Moffatt   schedule 15.05.2020kafkacat
как контейнер Docker :) - person Robin Moffatt   schedule 15.05.2020