Обработка сложных событий с сохранением состояния с помощью apache flink

Я хочу определить, происходят ли два события в определенный период времени на основе двух событий с одинаковым идентификатором. Например, DoorEvent выглядит так:

<doorevent>
  <door>
    <id>1</id>
    <status>open</status>
  </door>
  <timestamp>12345679</timestamp>
</doorevent> 

<doorevent>
  <door>
    <id>1</id>
    <status>close</status>
  </door>
  <timestamp>23456790</timestamp>
</doorevent>

Мой java-класс DoorEvent в приведенном ниже примере имеет ту же структуру.

Я хочу обнаружить, что дверь с идентификатором 1 закрывается в течение 5 минут после открытия. Я пытаюсь использовать для этой цели библиотеку Apache flink CEP. Входящий поток содержит все открытые и закрытые сообщения, скажем, от 20 дверей.

Pattern<String, ?> pattern = Pattern.<String>begin("door_open").where(
    new SimpleCondition<String>() {
        private static final long serialVersionUID = 1L;
        public boolean filter(String doorevent) {
            DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML);
            if (event.getDoor().getStatus().equals("open")){
                // save state of door as open
                return true;
            }
            return false;                           
        }
    }
)
.followedByAny("door_close").where(
    new SimpleCondition<String>() {
            private static final long serialVersionUID = 1L;
            public boolean filter(String doorevent) throws JsonParseException, JsonMappingException, IOException {
                DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML);
                if (event.getDoor().getStatus().equals("close")){
                    // check if close is of previously opened door
                    return true;
                }
                return false;
            }
        }
)
.within(Time.minutes(5));

Как сохранить состояние двери 1 как открытой в door_open, чтобы на шаге door_close я знал, что закрыта дверь 1, а не какая-то другая дверь?


person Gert Kommer    schedule 12.09.2017    source источник


Ответы (1)


Если у вас есть Flink 1.3.0 и выше, это действительно просто, что вы хотите сделать.

Ваш шаблон будет выглядеть примерно так:

Pattern.<DoorEvent>begin("first")
        .where(new SimpleCondition<DoorEvent>() {
          private static final long serialVersionUID = 1390448281048961616L;

          @Override
          public boolean filter(DoorEvent event) throws Exception {
            return event.getDoor().getStatus().equals("open");
          }
        })
        .followedBy("second")
        .where(new IterativeCondition<DoorEvent>() {
          private static final long serialVersionUID = -9216505110246259082L;

          @Override
          public boolean filter(DoorEvent secondEvent, Context<DoorEvent> ctx) throws Exception {

            if (!secondEvent.getDoor().getStatus().equals("close")) {
              return false;
            }

            for (DoorEvent firstEvent : ctx.getEventsForPattern("first")) {
              if (secondEvent.getDoor().getEventID().equals(firstEvent.getDoor().getEventId())) {
                return true;
              }
            }
            return false;
          }
        })
        .within(Time.minutes(5));

Таким образом, в основном вы можете использовать IterativeConditions и получать контекст для первых шаблонов, которые сопоставляются, и перебирать этот список, сравнивая тот, который вам нужен, и действовать по своему усмотрению.

IterativeConditions являются дорогостоящими и должны обрабатываться соответствующим образом.

Дополнительную информацию об условиях см. здесь: Flink — Условия

person Biplob Biswas    schedule 12.09.2017