Возможно ли использование нескольких брокеров в одной топологии Kafka Streams (Spring Cloud Stream)?

У нас есть чтение топологии из входной темы (с binder: x - адрес брокера: x), и записи обрабатываются и записываются в выходную тему (с binder: y - адрес брокера: y) с использованием потоков Spring Cloud Stream Kafka. Записи в тему вывода не записываются. Но когда я устанавливаю биндеры (адреса брокеров) одинаковыми (как для x, так и для y), записи записываются в тему y. Следует ли использовать в топологии одного и того же брокера? Мне нужно использовать разные связующие и брокеры для тем ввода и вывода? Как я могу это решить?

Ошибка: 2021-06-17 12: 17: 21.483 ПРЕДУПРЕЖДЕНИЕ 20848 --- [чтение-1-производитель] oakcNetworkClient: [Производитель clientId = inputTopic-32100000000000000000015-f0bd5423-e670-43e8-ab0b-84ec5505c2fd-StreamThread- 1-производитель] Ошибка при получении метаданных с идентификатором корреляции 182: {inputTopic = UNKNOWN_TOPIC_OR_PARTITION}

Application.yml

spring:
  cloud:
    stream:
      kafka:
        streams:
          bindings:
            doob-output-topic-out:
              applicationId: doob-output-topic-out
              producer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: tr.com.havelsan.doob.cloud.framework.service.track.base.core.serde.BaseDataSerde
                topic:
                  properties:
                    retention.bytes: 300000000
                    segment.bytes: 300000000
            doob-input-topic-in:
              consumer:
                applicationId: doob-input-topic-in
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: tr.com.havelsan.doob.cloud.framework.service.track.base.core.serde.BaseDataSerde
                topic:
                  properties:
                    retention.bytes: 300000000
                    segment.bytes: 300000000
      binders:
        outputKafka:
          type: kstream
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    streams:
                      binder:
                        brokers: ${1kafka.brokers1}
                        autoCreateTopics: true
                        autoAddPartitions: true
                        minPartitionCount: 8
                        configuration:
                          commit.interval.ms: 1000
        inputKafka:
          type: kstream
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    streams:
                      binder:
                        brokers: ${2kafka.brokers2}
                        autoCreateTopics: true
                        autoAddPartitions: true
                        minPartitionCount: 8
                        configuration:
                          commit.interval.ms: 1000
                          max:
                            request:
                              size: 20000000
      bindings:
        doob-output-topic-out:
          destination: outputTopic
          binder: outputKafka
          producer:
            partition-count: 8
        doob-input-topic-in:
          destination: inputTopic
          binder: inputKafka

manage:
  storeName: trackList15

Исходный код :

    @StreamListener(BASE_TOPIC_INPUT)
    @SendTo(BASE_TOPIC_OUTPUT)
    public KStream<String, BaseData> consumeTrackFromSynchronization(KStream<String, BaseData> baseDataStream) {
        return baseDataStream.filter((s, baseData) -> BaseUtil.getTrackType(baseData).equals(BaseTypeEnum.FK)).groupByKey()
                .reduce((baseData, s1) -> s1, Materialized.<String, BaseData, KeyValueStore<Bytes, byte[]>>as(storeName)
                        .withKeySerde(Serdes.String()).
                                withValueSerde(baseDataSerde)).toStream()
                .peek((s, baseData) -> baseServiceHelper.processBase(baseData, BaseTypeEnum.FK));
    }

person omerstack    schedule 17.06.2021    source источник
comment
1) Похоже, ваши брокеры не являются частью одного кластера 2) Вы должны использовать список, разделенный запятыми, в качестве свойства вашего брокера.   -  person OneCricketeer    schedule 17.06.2021


Ответы (1)


В одном процессоре Kafka Streams невозможно читать из кластера и записывать в другой кластер. Однако в одном приложении (JVM) вы можете иметь несколько процессоров, каждый из которых взаимодействует с одним кластером Kafka.

См. Этот поток для подробнее.

Один из способов обхода проблемы с использованием Spring Cloud Stream в вашем случае заключается в следующем.

  1. Ваш процессор Kafka Streams потребляет и производит в одном кластере.
  2. Затем напишите еще один простой процессор с привязкой Kafka на основе обычного канала сообщений (не связыватель Kafka Streams). В этой модели вы можете применить шаблон с несколькими связывателями, который у вас есть выше, то есть входные данные принимаются из темы, в которую вы писали в процессоре Kafka Streams, а затем выходные данные поступают в тему в другом кластере. Этот процессор просто становится сквозным процессором, который перемещает данные из кластера 1 в кластер 2.
person sobychacko    schedule 17.06.2021