Когда flink может поддерживать сопоставление шаблонов с участием полей предыдущих событий?

Было бы здорово иметь возможность сопоставлять события на основе значений их полей, выходящих за рамки текущей возможности создания шаблонов из событий, соответствующих отдельным критериям. Например, как описано на странице https://flink.apache.org/news/2016/04/06/cep-monitoring.html мы можем:

Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("First Event")
    .subtype(TemperatureEvent.class)
    .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD)
    .next("Second Event")
    .subtype(TemperatureEvent.class)
    .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD)
    .within(Time.seconds(10));

Однако было бы здорово создать Pattern из таких возможностей, как: .where(second_evt->evt.getTemperature() == first_evt->evt.getTemperature()


person Systems User    schedule 06.02.2017    source источник
comment
Попробуйте написать на адрес [email protected], так как это скорее запрос функции.   -  person Dawid Wysakowicz    schedule 06.02.2017
comment
Спасибо, Давид.. электронная почта не работает! Это ошибка, которую я получил -------------- › Привет. Это программа qmail-send на apache.org. Боюсь, я не смог доставить ваше сообщение по следующим адресам. Это постоянная ошибка; Я сдался. Жаль, что не получилось. ‹[email protected]›: должно быть отправлено с адреса @apache.org, адреса подписчика или адреса в LDAP.   -  person Systems User    schedule 09.02.2017
comment
Чтобы иметь возможность отправлять сообщения в список рассылки, вы должны подписаться, отправив электронное письмо на этот адрес: [email protected]   -  person Alex Chermenin    schedule 09.02.2017


Ответы (1)


Если вы хотите сравнить значения из полей в разных событиях, вы можете сделать это в методе flatSelect и просто использовать очень простой шаблон без каких-либо выражений where:

  1. Создайте узор:

    Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("First Event")
        .subtype(TemperatureEvent.class)
        .next("Second Event")
        .subtype(TemperatureEvent.class)
        .within(Time.seconds(10));
    
  2. Примените шаблон к потоку данных:

    PatternStream<MonitoringEvent> tempPatternStream = CEP.pattern(
        inputEventStream.keyBy("rackID"),
        warningPattern);
    
  3. Проверьте значения и сгенерируйте новое сложное событие с помощью метода flatSelect:

    DataStream<TemperatureWarning> warnings = tempPatternStream.flatSelect(
        (Map<String, MonitoringEvent> pattern, Collector<TemperatureAlert> out) -> {
            TemperatureEvent first = (TemperatureEvent) pattern.get("First Event");
            TemperatureEvent second = (TemperatureEvent) pattern.get("Second Event");
    
            if (first.getTemperature() <= second.getTemperature()) {
                out.collect(new TemperatureWarning(
                    first.getRackID(), 
                    (first.getTemperature() + second.getTemperature()) / 2));
            }
        });
    
person Alex Chermenin    schedule 07.02.2017
comment
Я хочу полагаться на движок CEP, чтобы найти (и, так сказать, поймать) «пару» событий, которые соответствуют отношению. Подход, в котором мы объединяем последовательные события по мере их поступления, хотя и требует очень интенсивной обработки (поскольку у нас есть шаблон для каждой пары входящих событий), не сможет поймать пару связанных во времени событий, разбросанных по времени. - person Systems User; 09.02.2017
comment
Я согласен с вами, но сейчас возможен только такой путь. Вы можете добавить задачу здесь: issues.apache.org/jira/browse/FLINK - person Alex Chermenin; 09.02.2017
comment
Спасибо Алексей, так и сделаю. Я понимаю применимость подхода, который вы предложили, для определенных сценариев (например, когда у нас есть данные из источников, предоставляющих последовательные обновления с относительно низкой частотой событий, и нас интересуют только определенные шаблоны). - person Systems User; 09.02.2017