Я хочу сделать KStream to KTable Join. используя KTable как просто справочную таблицу. ниже шаги показывают последовательность, в которой выполняется код
Построить KTable
ReKey KTable
Создать KStream
ReKey KStream
Присоединяйтесь к KStream - KTable
Допустим, есть 8000 записей в KStream, 14 записей в KTable и Предположим, что для каждого ключа в KStreams есть запись в KTable. Таким образом, ожидаемый результат будет 8000 записей.
Каждый раз, когда я присоединяюсь впервые или когда запускаю приложение. Ожидаемый результат - 8000 записей, но иногда я вижу только 6200 записей, иногда 8000 полных наборов записей (дважды), иногда нет записей и т. Д.
Вопрос 1: почему при каждом запуске приложения в записях возникает несогласованность?
Перед тем, как KTable будет построен (construct + Rekey), KStreams получает Constructed, и данные доступны для соединения со стороны KStream, затем соединение начинается без KTable, поэтому данные не будут отображаться в окончательном соединении, пока KTable не будет построен. после того, как KTable будет построен, мы сможем увидеть, как происходит соединение для оставшихся записей.
Вопрос 2: Как устранить несоответствие соединения в записях?
Я пробовал использовать тестовый пример, используя Embedded Kafka для соединения KStream и Ktable. Было использовано 10 записей из KStreams и 3 записи из KTable, которые использовались в процессе. когда я запускал тестовый пример в первый раз, соединения не было, и я не видел никаких данных после присоединения. Когда пробежал то же самое во второй раз, он работал отлично. Если я очищу хранилище состояний, то вернусь к нулю.
Вопрос 3: Почему происходит такое поведение?
Я пробовал с KSQL, и соединение работало отлично, и я получил 8000 записей, затем я перешел в исходный код KSQL и заметил, что KSQL также выполняет ту же функцию соединения.
Вопрос 4: Как KSQL решает проблему?
Я видел несколько примеров предлагаемых ответов
- Используйте GlobalKTable, который не сработал. у меня такое же непоследовательное соединение.
- используйте Custom joiner https://github.com/confluentinc/kafka-streams-examples/blob/5.2.1-post/src/test/java/io/confluent/examples/streams/CustomStreamTableJoinIntegrationTest.java что не сработало
Я использую весенние облачные потоки в качестве зависимости.
Также я видел, что где-то в JIRA была открытая проблема.