Окно, не достигающее своей длины окна

Я пробовал примеры работы с окнами flink, и чтобы проверить время окна, я добавил метку времени к событию потока. И я обнаружил, что продолжительность окна меньше, чем длина окна. Также, если бы мне пришлось использовать скользящее окно и изменить событие, я получу измененное событие в следующем окне.

Когда я указываю длину окна, не дожидается ли оно завершения? И перекрывающиеся события между скользящими окнами относятся к одному и тому же экземпляру? (Я знаю, что потоки - это неизменяемые структуры)

public class WindowDemo {

public static void main(String[] args) {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

    Properties prop=PropertyLoader.loadPropertiesForConsumer("WC",0);
    FlinkKafkaConsumer09<Alarm> consumer= new FlinkKafkaConsumer09<Alarm>("topic_smartEmse", new AlarmSchema(), prop);
    DataStream<Alarm> inputStream= env.addSource(consumer);

    inputStream= inputStream.flatMap(new FlatMapFunction<Alarm, Alarm>() {

        @Override
        public void flatMap(Alarm value, Collector<Alarm> out)
                throws Exception {
            System.out.println("flatMap Started at "+System.currentTimeMillis());
            value.setUserDefined10("IN TIME "+System.currentTimeMillis());
            out.collect(value);
            System.out.println("flatMap Ended at "+System.currentTimeMillis());
        }
    });

    KeyedStream<Alarm, String> keyedStream= inputStream.keyBy(new KeySelector<Alarm, String>(){

        @Override
        public String getKey(Alarm value) throws Exception {
            System.out.println("getKey Started at "+System.currentTimeMillis());
            return "XX";
        }});

    DataStream<Alarm> dataStream= keyedStream.timeWindow(Time.of(90, TimeUnit.SECONDS)).apply(new WindowFunction<Alarm, Alarm, String, TimeWindow>() {

        @Override
        public void apply(String key, TimeWindow window,
                Iterable<Alarm> input, Collector<Alarm> out)
                throws Exception {
            System.out.println("timeWindow Started at "+System.currentTimeMillis());
            int count=0;
            System.out.println("Key : "+key);
            System.out.println("Values : "+input);
            Iterator<Alarm> itr= input.iterator();
            while (itr.hasNext()){
                Alarm alarm= itr.next();
                alarm.setUserDefined1(""+count++);

                out.collect(alarm);
            }
            System.out.println("timeWindow ended at "+System.currentTimeMillis());

        }
    });

    dataStream= dataStream.flatMap(new FlatMapFunction<Alarm, Alarm>() {

        @Override
        public void flatMap(Alarm value, Collector<Alarm> out)
                throws Exception {
            value.setUserDefined11("OUT TIME "+System.currentTimeMillis());
            out.collect(value);
        }
    });
    dataStream.printToErr();
    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
}

person Ajith Nayak    schedule 17.01.2017    source источник


Ответы (1)


Если я вас правильно понял, вас беспокоит, что окно оценивается (вызывается apply) до того, как истечет указанный период времени. Я заметил такой же эффект при первой оценке окна. Кажется, что временной интервал как-то выровнен. Я начал обработку в 19:09:13, а первая оценка окна была в 19:10:30, то есть через 77 секунд. После этого первого вызова окно закрывалось не точно, а примерно каждые 90 секунд.

Для TumblingProcessingTimeWindows (который вы используете), похоже, это код:

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {

    private long size;

    private TumblingProcessingTimeWindows(long size) {
        this.size = size;
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {

        final long now = context.getCurrentProcessingTime();
        // here goes the alignment 
        long start = now - (now % size);
        return Collections.singletonList(new TimeWindow(start, start + size));
    }

Это имеет для вас смысл?

person TobiSH    schedule 19.01.2017
comment
Код, которым вы поделились, вручную исправляет выравнивание времени. Интересно, следовало ли это сделать, неявно задавая временные характеристики (что я пробовал). - person Ajith Nayak; 07.02.2017
comment
Это не мой код. Это из источников flink: github.com/apache/flink/blob/release-1.1.4-rc1/ - person TobiSH; 07.02.2017
comment
Ладно, в этом есть смысл. Я также хотел знать, будут ли изменения, внесенные одним (скользящим) окном, видны в другом окне? - person Ajith Nayak; 13.02.2017