Как подключить KSQLDB-Cluster в OpenShift к локальному керберизованному Kafka-кластеру

Чего я хочу достичь:
У нас есть локальный кластер Kafka. Я хочу настроить KSQLDB в OpenShift и подключить его к брокерам локального кластера Kafka.

Проблема:
Когда я пытаюсь запустить сервер KSQLDB с помощью команды / usr / bin / ksql-server-start /etc/ksqldb/ksql-server.properties, я получаю сообщение об ошибке :

[2020-05-14 15:47:48,519] ERROR Failed to start KSQL (io.confluent.ksql.rest.server.KsqlServerMain:60)
io.confluent.ksql.util.KsqlServerException: Could not get Kafka cluster configuration!
        at io.confluent.ksql.services.KafkaClusterUtil.getConfig(KafkaClusterUtil.java:90)
        at io.confluent.ksql.security.KsqlAuthorizationValidatorFactory.isKafkaAuthorizerEnabled(KsqlAuthorizationValidatorFactory.java:81)
        at io.confluent.ksql.security.KsqlAuthorizationValidatorFactory.create(KsqlAuthorizationValidatorFactory.java:51)
        at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:624)
        at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:544)
        at io.confluent.ksql.rest.server.KsqlServerMain.createExecutable(KsqlServerMain.java:98)
        at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:56)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1589471268517) timed out at 1589471268518 after 1 attempt(s)
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at io.confluent.ksql.services.KafkaClusterUtil.getConfig(KafkaClusterUtil.java:60)
        ... 6 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1589471268517) timed out at 1589471268518 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

Моя конфигурация
Я настроил свой Dockerfile на основе этого образа: https://hub.docker.com/r/confluentinc/ksqldb-server, порты 9092, 9093, 8080, 8082 и 443 открыты.

Мой сервис-ямл выглядит так:

kind: Service
apiVersion: v1
metadata:
  name: social-media-dev
  namespace: abc
  selfLink: xyz
  uid: xyz
  resourceVersion: '1'
  creationTimestamp: '2020-05-14T09:47:15Z'
  labels:
    app: social-media-dev
  annotations:
    openshift.io/generated-by: OpenShiftNewApp
spec:
  ports:
    - name: social-media-dev
      protocol: TCP
      port: 9092
      targetPort: 9092
      nodePort: 31364
  selector:
    app: social-media-dev
    deploymentconfig: social-media-dev
  clusterIP: XX.XX.XXX.XXX
  type: LoadBalancer
  externalIPs:
    - XXX.XX.XXX.XXX
  sessionAffinity: None
  externalTrafficPolicy: Cluster
status:
  loadBalancer:
    ingress:
      - ip: XX.XX.XXX.XXX

Мой файл ksql-server.properties содержит следующую информацию:
слушатели: http://0.0.0.0:8082
bootstrap.servers: XXXX: 9092, XXXY: 9092, XXXZ: 9092

Что я пробовал на данный момент:

Я попытался подключиться к брокеру из модуля, и это сработало:
(timeout 1 bash -c '</dev/tcp/X.X.X.X/9092 && echo PORT OPEN || echo PORT CLOSED') 2>/dev/null
результат: PORT OPEN

Я также поигрался со слушателем, но затем сообщение об ошибке стало короче только с информацией. Не удалось получить конфигурацию кластера Kafka! и без ошибки тайм-аута.

Я пытался заменить LoadBalancer на Nodeport, но тоже безуспешно.

У вас есть идеи, что я могу попробовать дальше?

ОБНОВЛЕНИЕ: с обновлением до Cloudera CDH6 кластер Cloudera Kafka теперь работает также с Kafka Streams. Следовательно, теперь я смог подключиться из моего кластера KSQLDB в Openshift к локальному кластеру Kafka.


person Ke_rstin    schedule 15.05.2020    source источник
comment
Используя kafkacat, из которого вы запускаете ksqlDB, можете ли вы успешно подключиться к брокерам и вернуть метаданные (используя флаг -L)?   -  person Robin Moffatt    schedule 15.05.2020
comment
Спасибо @RobinMoffatt. Я попробую, но получить librdkafka-devel для kafkacat, работающего на RHEL8, немного сложно.   -  person Ke_rstin    schedule 15.05.2020
comment
Если вы используете ksqlDB как контейнер Docker, вы также можете запустить kafkacat как контейнер Docker :)   -  person Robin Moffatt    schedule 15.05.2020
comment
Спасибо, я только что запустил. Таким образом, с помощью kafkacat я могу видеть все наши темы, разделы, лидеры и т. Д. Так что это работает.   -  person Ke_rstin    schedule 15.05.2020
comment
Я предполагаю @RobinMoffatt, что это, вероятно, проблема со слушателем kafka? Я не запускаю его на порту по умолчанию, потому что он не открыт и вместо этого использовал 8083. 8083 не используется другим процессом, я уже это проверял.   -  person Ke_rstin    schedule 18.05.2020
comment
Сообщение об ошибке возникает из-за того, что ksqlDB не может получить конфигурацию кластера Kafka при запуске. Извините, я не знаю openshift, поэтому не могу комментировать, какую конфигурацию нужно изменить, чтобы ksql мог общаться с Kafka.   -  person Andrew Coates    schedule 02.06.2020
comment
Спасибо, Робин и Эндрю. Я обнаружил, что версия нашего кластера Cloudera Kafka слишком старая и не поддерживает потоки Kafka, которые доступны начиная с CDK 4.0.0. Поскольку наша миграция будет в следующем месяце, я дам обновленную информацию о stackoverflow. В настоящее время я подготовлю все с помощью Confluent Operator в OpenShift и, надеюсь, смогу выполнить миграцию в следующем месяце.   -  person Ke_rstin    schedule 02.06.2020


Ответы (1)


ОБНОВЛЕНИЕ: с обновлением до Cloudera CDH6 кластер Cloudera Kafka теперь работает также с Kafka Streams. Следовательно, теперь я смог подключиться из моего кластера KSQLDB в Openshift к локальному кластеру Kafka.

Я также опишу свой последний способ подключения к керберизованному Kafka-кластеру здесь, так как я много боролся, чтобы запустить его:

  1. Получение Kerberos-билетов и установление соединений по SSL

ksql-server.properties (его часть sasl_ssl):

security.protocol=SASL_SSL
sasl.mechanism=GSSAPI

ssl.truststore.location=truststore.jks
ssl.truststore.password=password
ssl.truststore.type=JKS

ssl.ca.location=cert

sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="my.keytab" serviceName="kafka"  principal="myprincipal";
serviceName="kafka"

producer.ssl.endpoint.identification.algorithm=HTTPS
producer.security.protocol=SASL_SSL
producer.ssl.truststore.location=truststore.jks
producer.ssl.truststore.password=password
producer.sasl.mechanism=GSSAPI
producer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="my.keytab" serviceName="kafka"  principal="myprincipal";

consumer.ssl.endpoint.identification.algorithm=HTTPS
consumer.security.protocol=SASL_SSL
consumer.ssl.truststore.location=truststore.jks
consumer.ssl.truststore.password=password
consumer.sasl.mechanism=GSSAPI
consumer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="my.keytab" serviceName="kafka"  principal="myprincipal";`
  1. Поэтому настройте правила Sentry

HOST=[HOST]->CLUSTER=kafka-cluster->action=idempotentwrite

HOST=[HOST]->TRANSACTIONALID=[ID]->action=describe

HOST=[HOST]->TRANSACTIONALID=[ID]->action=write

person Ke_rstin    schedule 18.11.2020