Flink timeWindow получает время начала

Я рассчитываю количество (сумма 1) по временному окну следующим образом:

mappedUserTrackingEvent
            .keyBy("videoId", "userId")
            .timeWindow(Time.seconds(30))
            .sum("count")

Я хотел бы добавить время начала окна в качестве ключевого поля. поэтому результат будет примерно таким:

key: videoId=123,userId=234,time=2016-09-16T17:01:30
value: 50

Таким образом, совокупный счет по окну. Конечная цель — нарисовать гистограмму этих окон.

Как я могу добавить начало окна в качестве поля в ключе? и после этого выровняйте окно по 00 или 30 с в этом случае? Это возможно?


person AtharvaI    schedule 16.09.2016    source источник


Ответы (2)


Метод apply() объекта WindowFunction предоставляет объект Window, который является TimeWindow, если вы используете keyBy().timeWindow(). Объект TimeWindow имеет два метода, getStart() и getEnd(), которые возвращают метку времени начала и конца окна соответственно.

На данный момент невозможно использовать агрегацию sum() вместе с WindowFunction. Вам нужно сделать что-то вроде:

 mappedUserTrackingEvent
        .keyBy("videoId", "userId")
        .timeWindow(Time.seconds(30))
        .apply(new MySumReduceFunction(), new MyWindowFunction());`

MySumReduceFunction реализует интерфейс ReduceFunction и вычисляет сумму путем постепенного агрегирования элементов, поступающих в окно. MyWindowFunction реализует WindowFunction. Он получает агрегированное значение через параметр Iterable и дополняет значение отметкой времени, полученной из параметра TimeWindow.

person Fabian Hueske    schedule 16.09.2016

Вы можете использовать метод aggregate вместо суммы.
В aggregate установите второй параметр, реализующий WindowFunction или расширяющий ProcessWindowFunction.
Я использую Flink-1.4.0, рекомендую использовать ProcessWindowFunction, например:

mappedUserTrackingEvent
    .keyBy("videoId", "userId")
    .timeWindow(Time.seconds(30))
    .aggregate(new Count(), new MyProcessWindowFunction();

public static class MyProcessWindowFunction extends ProcessWindowFunction<Integer, Tuple2<Long,  Integer>, Tuple, TimeWindow>
{
    @Override
    public void process(Tuple tuple, Context context, Iterable<Integer> iterable, Collector<Tuple2<Long,  Integer>> collector) throws Exception
    {
        context.currentProcessingTime();
        context.window().getStart();
    }
}
person yeyunlong    schedule 24.01.2018