Apache Flink - временные окна событий

Я хочу создать окна с ключами в Apache flink, чтобы окна для каждого ключа выполнялись через n минут после прибытия первого события для ключа. Можно ли сделать это с использованием временных характеристик событий (поскольку время обработки зависит от системных часов и неизвестно, когда появится первое событие). Если возможно, объясните назначение времени события и водяного знака также событиям, а также объясните, как вызвать функцию окна процесса через n минут.

Ниже приведена часть кода, которая может дать вам представление о том, что я делаю в настоящее время:

            //Make keyed events so as to start a window for a key
            KeyedStream<SourceData, Tuple> keyedEvents = 
                    env.addSource(new MySource(configData),"JSON Source")
                    .assignTimestampsAndWatermarks(new MyTimeStamps())
                    .setParallelism(1)
                    .keyBy("service");


            //Start a window for windowTime time
            DataStream<ResultData> resultData=
                    keyedEvents
                    .timeWindow(Time.minutes(winTime))
                    .process(new ProcessEventWindow(configData))
                    .name("Event Collection Window")
                    .setParallelism(25);

Итак, как бы мне назначить время события и водную отметку, чтобы окно следовало за временем первого события в качестве начальной точки и выполнялось через 10 минут (время начала первого события может быть разным для разных клавиш). Любая помощь могла бы быть полезна.

        /------------ ( window of 10 minutes )
Streams          |------------ ( window of 10 minutes )
            \------------ ( window of 10 minutes )

Изменить: класс, который я использовал для назначения отметки времени и водяных знаков

public class MyTimeStamps implements AssignerWithPeriodicWatermarks<SourceData> {

    @Override
    public long extractTimestamp(SourceData element, long previousElementTimestamp) {

          //Will return epoch of currentTime
        return GlobalUtilities.getCurrentEpoch();
    }

    @Override
    public Watermark getCurrentWatermark() {
        // TODO Auto-generated method stub
        //Will return epoch of currentTime + 10 minutes
        return new Watermark(GlobalUtilities.getTimeShiftNMinutesEpoch(10));
    }

}

person Kspace    schedule 29.08.2018    source источник


Ответы (2)


Я думаю, что для вашего варианта использования лучше всего использовать ProcessFunction. Что вы можете сделать, так это зарегистрировать EventTimeTimer при наступлении первого события. Затем в методе onTimer выдайте результаты.

Что-то вроде:

public class ProcessFunctionImpl extends ProcessFunction<SourceData, ResultData> {

    @Override
    public void processElement(SourceData value, Context ctx, Collector<ResultData> out)
        throws Exception {

        // retrieve the current aggregate
        ResultData current = state.value();
        if (current == null) {
            // first event arrived
            current = new ResultData();
            // register end of window
            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 60 * 1000 /* 10 minutes */);
        }

        // update the state's aggregate
        current += value;

        // write the state back
        state.update(current);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultData> out)
        throws Exception {

        // get the state for the key that scheduled the timer
        ResultData result = state.value();

        out.collect(result);

        // reset the window state
        state.clear();
    }
}
person Dawid Wysakowicz    schedule 30.08.2018
comment
Спасибо, Давид, за ваш ответ. Я попробовал ваше решение, но коллекция продолжала бесконечный цикл или что-то в этом роде (когда я распечатываю поток данных через 10 минут, он ничего не печатал). Я предполагаю, что есть некоторая проблема с назначением времени события для событий, не могли бы вы объяснить мне это тоже ... как я могу назначить время события другому событию, чтобы достичь желаемого результата. - person Kspace; 30.08.2018
comment
Вы прочитали эти документы: ci.apache.org/projects/flink/flink-docs-release-1.6/dev/? Они подробно описывают, как назначать временные метки и водяные знаки. - person Dawid Wysakowicz; 30.08.2018
comment
Я прочитал документ и назначаю временные метки аналогично тому, как указано в документации, но все еще сталкиваюсь с некоторой проблемой. Если бы вы могли дать псевдокод, это действительно помогло бы. Я обновлю вопрос своей реализацией. - person Kspace; 30.08.2018
comment
@DawidWysakowicz У меня такая же проблема, когда я хочу собрать все события пользователя в течение 30 минут с первой временной метки события пользователя и выполнить некоторую агрегацию этих событий, но у меня также есть второе условие: если в течение 30 минут запускается какой-либо тип события прибыл, чем также запустить и запустить новое окно. Не могли бы вы помочь, как решить этот тип использования. - person Anuj jain; 21.05.2020

Некоторое время назад у меня был аналогичный вопрос относительно временных окон событий. Вот как выглядит мой стрим

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//Consumer Setup

val stream = env.addSource(consumer)
  .assignTimestampsAndWatermarks(new WMAssigner)

// Additional Setup here

stream
  .keyBy { data => data.findValue("service") }
  .window(TumblingEventTimeWindows.of(Time.minutes(10)))
  .process { new WindowProcessor }

  //Sinks go here

Мой класс WMAssigner выглядел следующим образом (примечание: это позволяло произойти неупорядоченным событиям в течение 1 минуты, вы можете расширить другой экстрактор меток времени, если не хотите допускать опоздания):

class WMAssigner extends BoundedOutOfOrdernessTimestampExtractor[ObjectNode] (Time.seconds(60)) {
  override def extractTimestamp(element: ObjectNode): Long = {
    val tsStr = element.findValue("data").findValue("ts").toString replaceAll("\"", "")
    tsStr.toLong
  }
}

Моей меткой времени, которую я хотел использовать для водяных знаков, было поле data.ts.

Мой WindowProcessor:

class WindowProcessor extends ProcessWindowFunction[ObjectNode,String,String,TimeWindow] {
  override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[String]): Unit = {
    val out = ""
    elements.foreach( value => {
      out = value.findValue("data").findValue("outData")
    }
    out.collect(out)
  }
}

Сообщите мне, если что-то неясно

person Eumcoz    schedule 29.08.2018
comment
Спасибо за ваш ответ. Пожалуйста, проверьте обновленный вопрос. Я хочу, чтобы окно работало в течение 10 минут, начиная с момента первого события. Итак, как мне указать начало и конец окна для разных ключей. - person Kspace; 30.08.2018