Порядок временного окна событий потоковой передачи Flink

У меня возникли проблемы с пониманием семантики оконного времени событий. Следующая программа генерирует несколько кортежей с отметками времени, которые используются в качестве времени события, и выполняет простую агрегацию окон. Я ожидаю, что вывод будет в том же порядке, что и ввод, но вывод упорядочен по-другому. Почему вывод не по порядку относительно времени события?

import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._

object WindowExample extends App {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.enableTimestamps()
    env.setParallelism(1)

    val start = 1449597577379L
    val tuples = (1 to 10).map(t => (start + t * 1000, t))

    env.fromCollection(tuples)
      .assignAscendingTimestamps(_._1)
      .timeWindowAll(Time.of(1, TimeUnit.SECONDS))
      .sum(1)
      .print()

    env.execute()
}

Вход:

 (1449597578379,1)
 (1449597579379,2)
 (1449597580379,3)
 (1449597581379,4)
 (1449597582379,5)
 (1449597583379,6)
 (1449597584379,7)
 (1449597585379,8)
 (1449597586379,9)
 (1449597587379,10)

Результат:

[info] (1449597579379,2)
[info] (1449597581379,4)
[info] (1449597583379,6)
[info] (1449597585379,8)
[info] (1449597587379,10)
[info] (1449597578379,1)
[info] (1449597580379,3)
[info] (1449597582379,5)
[info] (1449597584379,7)
[info] (1449597586379,9)

person bandrews    schedule 08.12.2015    source источник


Ответы (1)


Причина такого поведения в том, что во Flink не учитывается порядок элементов (относительно отметки времени). Только правильность водяных знаков и их связь с временными метками элементов важна для операций, которые учитывают время, потому что водяные знаки обычно запускают вычисления в основанных на времени операциях.

В вашем примере оператор окна сохраняет все элементы из источника во внутренних буферах окна. Затем источник выдает водяной знак, говорящий о том, что в будущем элементы с меньшей временной меткой не появятся. Это, в свою очередь, указывает оператору окна обрабатывать все окна с конечными временными метками, которые находятся под водяными знаками (что верно для всех окон). Таким образом, он испускает все окна (в произвольном порядке), а затем сам выпускает водяной знак. Операции, расположенные ниже по потоку, сами получат элементы и смогут выполнять обработку после получения водяных знаков.

По умолчанию интервал выдачи водяных знаков из источников составляет 200 мс. При небольшом количестве элементов, которые испускает ваш источник, все они испускаются до того, как испускается первый водяной знак. В реальном случае использования, когда интервалы выпуска водяных знаков намного меньше, чем размер окна, вы получите ожидаемое поведение выпускаемых окон в порядке их отметки времени. Например, если у вас есть окна на 1 час и водяные знаки каждые 500 мс.

person aljoscha    schedule 09.12.2015
comment
Не могли бы вы привести или указать на пример последующей операции, которая может переупорядочивать элементы в зависимости от времени события после получения водяного знака? Спасибо! - person Maksim Kolchin; 04.08.2017
comment
@MaximKolchin такое переупорядочивание происходит, например, в библиотеке CEP. Вы можете посмотреть здесь: github.com/apache/flink/blob/master/flink-libraries/flink-cep/ - person Dawid Wysakowicz; 09.08.2017