У нас есть чтение топологии из входной темы (с binder: x - адрес брокера: x), и записи обрабатываются и записываются в выходную тему (с binder: y - адрес брокера: y) с использованием потоков Spring Cloud Stream Kafka. Записи в тему вывода не записываются. Но когда я устанавливаю биндеры (адреса брокеров) одинаковыми (как для x, так и для y), записи записываются в тему y. Следует ли использовать в топологии одного и того же брокера? Мне нужно использовать разные связующие и брокеры для тем ввода и вывода? Как я могу это решить?
Ошибка: 2021-06-17 12: 17: 21.483 ПРЕДУПРЕЖДЕНИЕ 20848 --- [чтение-1-производитель] oakcNetworkClient: [Производитель clientId = inputTopic-32100000000000000000015-f0bd5423-e670-43e8-ab0b-84ec5505c2fd-StreamThread- 1-производитель] Ошибка при получении метаданных с идентификатором корреляции 182: {inputTopic = UNKNOWN_TOPIC_OR_PARTITION}
Application.yml
spring:
cloud:
stream:
kafka:
streams:
bindings:
doob-output-topic-out:
applicationId: doob-output-topic-out
producer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: tr.com.havelsan.doob.cloud.framework.service.track.base.core.serde.BaseDataSerde
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
doob-input-topic-in:
consumer:
applicationId: doob-input-topic-in
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: tr.com.havelsan.doob.cloud.framework.service.track.base.core.serde.BaseDataSerde
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
binders:
outputKafka:
type: kstream
environment:
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: ${1kafka.brokers1}
autoCreateTopics: true
autoAddPartitions: true
minPartitionCount: 8
configuration:
commit.interval.ms: 1000
inputKafka:
type: kstream
environment:
spring:
cloud:
stream:
kafka:
streams:
binder:
brokers: ${2kafka.brokers2}
autoCreateTopics: true
autoAddPartitions: true
minPartitionCount: 8
configuration:
commit.interval.ms: 1000
max:
request:
size: 20000000
bindings:
doob-output-topic-out:
destination: outputTopic
binder: outputKafka
producer:
partition-count: 8
doob-input-topic-in:
destination: inputTopic
binder: inputKafka
manage:
storeName: trackList15
Исходный код :
@StreamListener(BASE_TOPIC_INPUT)
@SendTo(BASE_TOPIC_OUTPUT)
public KStream<String, BaseData> consumeTrackFromSynchronization(KStream<String, BaseData> baseDataStream) {
return baseDataStream.filter((s, baseData) -> BaseUtil.getTrackType(baseData).equals(BaseTypeEnum.FK)).groupByKey()
.reduce((baseData, s1) -> s1, Materialized.<String, BaseData, KeyValueStore<Bytes, byte[]>>as(storeName)
.withKeySerde(Serdes.String()).
withValueSerde(baseDataSerde)).toStream()
.peek((s, baseData) -> baseServiceHelper.processBase(baseData, BaseTypeEnum.FK));
}