KStream to KTable Inner Join, создавая разное количество записей каждый раз, обрабатываемых с одними и теми же данными

Я хочу сделать KStream to KTable Join. используя KTable как просто справочную таблицу. ниже шаги показывают последовательность, в которой выполняется код

  1. Построить KTable

  2. ReKey KTable

  3. Создать KStream

  4. ReKey KStream

  5. Присоединяйтесь к KStream - KTable

Допустим, есть 8000 записей в KStream, 14 записей в KTable и Предположим, что для каждого ключа в KStreams есть запись в KTable. Таким образом, ожидаемый результат будет 8000 записей.

Каждый раз, когда я присоединяюсь впервые или когда запускаю приложение. Ожидаемый результат - 8000 записей, но иногда я вижу только 6200 записей, иногда 8000 полных наборов записей (дважды), иногда нет записей и т. Д.

  • Вопрос 1: почему при каждом запуске приложения в записях возникает несогласованность?

    Перед тем, как KTable будет построен (construct + Rekey), KStreams получает Constructed, и данные доступны для соединения со стороны KStream, затем соединение начинается без KTable, поэтому данные не будут отображаться в окончательном соединении, пока KTable не будет построен. после того, как KTable будет построен, мы сможем увидеть, как происходит соединение для оставшихся записей.

  • Вопрос 2: Как устранить несоответствие соединения в записях?

    Я пробовал использовать тестовый пример, используя Embedded Kafka для соединения KStream и Ktable. Было использовано 10 записей из KStreams и 3 записи из KTable, которые использовались в процессе. когда я запускал тестовый пример в первый раз, соединения не было, и я не видел никаких данных после присоединения. Когда пробежал то же самое во второй раз, он работал отлично. Если я очищу хранилище состояний, то вернусь к нулю.

  • Вопрос 3: Почему происходит такое поведение?

    Я пробовал с KSQL, и соединение работало отлично, и я получил 8000 записей, затем я перешел в исходный код KSQL и заметил, что KSQL также выполняет ту же функцию соединения.

  • Вопрос 4: Как KSQL решает проблему?

Я видел несколько примеров предлагаемых ответов

Я использую весенние облачные потоки в качестве зависимости.

Также я видел, что где-то в JIRA была открытая проблема.


person bhargav N Reddy    schedule 08.03.2020    source источник


Ответы (1)


ниже шаги показывают последовательность, в которой выполняется код

Обратите внимание, что построение топологии просто обеспечивает логическое описание программы потока данных и отсутствует «порядок выполнения» различных операторов. Программа будет переведена, и все операторы будут выполняться одновременно. Следовательно, данные по всем темам будут считываться параллельно.

Эта параллельная обработка является основной причиной вашего наблюдения, т. Е. Таблица не загружается перед началом обработки (по крайней мере, по умолчанию нет гарантии), и, таким образом, данные на стороне потока могут обрабатываться, даже если таблица загружена не полностью.

Порядок обработки между разными темами зависит от временных меток записи: записи с меньшими временными метками обрабатываются первыми. Следовательно, если вы хотите убедиться, что данные KTable обрабатываются в первую очередь, вы должны убедиться, что временные метки записи меньше, чем временные метки записи на стороне потока. Это может быть обеспечено либо при вводе входных данных во входную тему, либо с помощью специального средства извлечения временных меток.

Во-вторых, выборка данных из тем является недетерминированной, и поэтому, если возвращаются данные только для стороны потока (но не данные стороны таблицы), сравнение отметок времени не может быть выполнено, и, таким образом, данные стороны потока будут обработаны до данных стороны таблицы . Чтобы решить эту проблему, вы можете увеличить параметр конфигурации max.task.idle.ms (по умолчанию 0ms). Если вы увеличите эту конфигурацию (и я считаю, что KSQL также делает это по умолчанию), если нет данных для одного ввода, задача заблокируется и попытается получить данные для пустого ввода (только по истечении времени простоя обработка будет продолжена даже если одна сторона пуста).

Для GlobalKTable поведение другое. Эта таблица будет загружена при запуске до начала любой обработки. Следовательно, я не уверен, почему это не сработало для вас.

person Matthias J. Sax    schedule 10.03.2020