Apache flink - Характеристики времени

Как я могу использовать характеристики времени приема в Apache flink. Я знаю, что нам нужно задать временные характеристики среды. Но как я могу собрать данные с отметками времени, которые можно назвать временем приема? В настоящее время, когда я использую его, он обрабатывает окно на основе системного времени. Я хочу выполнять обработку в зависимости от времени, когда данные попадают в среду flink.

Небольшой отрывок из кода, который может помочь в его понимании:

Временные характеристики для среды:

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

Время окна:

keyedEvents.timeWindow(Time.minutes(5))

Сборник в источнике:

ctx.collect(monSourceData);

Если сбор данных начинается, скажем, в 11:03, я хочу закончить его в 11:08, то есть на 5 минут. Но он останавливается в 11:05 (что-то вроде времени обработки).

Заранее спасибо за помощь.


person Kspace    schedule 28.08.2018    source источник


Ответы (1)


Всплывающие и скользящие окна в Flink всегда выровнены по часам (либо часы времени событий, определяемые событиями и водяными знаками, либо системные часы); временные окна не совпадают с первым событием. Таким образом, если у вас есть окна продолжительностью 5 минут, будет окно для интервала, например, с 11:00 до 11:05, независимо от TimeCharacteristic.

Однако переворачивающиеся окна принимают необязательный параметр offset, который можно использовать для смещения этого выравнивания. Таким образом, вы можете указать TumblingEventTimeWindows.of(Time.minutes(5), Time.minutes(3)), например, чтобы сдвинуть интервалы на 3 минуты.

person David Anderson    schedule 28.08.2018
comment
Большое вам спасибо за ваш ответ. Есть ли возможность запускать окна на основе первого события, если я получаю первый объект события / данных в 11:03, окно должно работать в течение n минут, как я упоминал в приведенном примере. - person Kspace; 28.08.2018
comment
Я не могу придумать, как добиться этого с помощью Window API, но вы можете создать его самостоятельно, используя ProcessFunction. - person David Anderson; 28.08.2018
comment
Я не понимаю, согласно моим знаниям, функция процесса выполняется, как только окно заканчивается, поэтому нам нужно как-то настроить время окна. Также я думаю, что триггер может быть полезен в этом сценарии. Но поскольку я только начал изучать flink, если вы можете помочь мне с любым фрагментом кода или логикой, это было бы здорово. Спасибо - person Kspace; 28.08.2018
comment
Я хочу сказать, что вы можете использовать функцию процесса вместо окна. - person David Anderson; 28.08.2018
comment
Большое спасибо Дэвиду за вашу помощь. :) - person Kspace; 28.08.2018
comment
Я прочитал документацию о функции процесса, но не смог реализовать ее так, как я хочу ... как я сказал в примере выше. Не могли бы вы объяснить это в терминах непрофессионала, это было бы большим подспорьем. (если возможно, тоже есть псевдокод). Спасибо - person Kspace; 29.08.2018
comment
Ты прав; Я попытался написать код, и это оказалось не так просто, как я себе представлял. Пока что единственное решение, которое я вижу, довольно сложное - вы уверены, что вам нужно решить эту проблему? :) Что усложняет работу, так это то, что вы хотите работать с ключевыми окнами параллельно, и чтобы все согласовали начальную временную метку. - person David Anderson; 29.08.2018
comment
Что вы могли бы сделать, так это разделить поток до того, как он будет введен. Отправьте одну сторону разделения в RIchFlatMap, который захватывает временную метку первого события и транслирует ее в KeyedBroadcastProcessFunction, которая выполняет оконную обработку, используя таймеры, которые срабатывают каждые пять минут, начиная с этой временной метки широковещательной передачи. - person David Anderson; 29.08.2018
comment
Да, мне обязательно нужно решить эту проблему. Но проблема в том, что я не очень хорошо знаком с flink. Так что любая помощь / подсказка с вашей стороны будет отличной. - person Kspace; 29.08.2018
comment
См. training.data-artisans.com для учебных материалов, которые показывают, как работать с функциями процессов, транслировать состояние, таймеры и т. д. - person David Anderson; 29.08.2018
comment
Спасибо, Дэвид .. Я их проверю, попробую реализовать и скажу, добьюсь ли я успеха или снова буду вам мешать. :) - person Kspace; 29.08.2018