Как сообщить о стоимости в реальном времени в Fink?

Я хочу три значения, это aggValueInLastHour aggValueInLastDay aggValueInLastThreeDay.

Я пробовал, как показано ниже.

введите здесь описание изображения

Но я не хочу ждать, это означает, что я не предпочитаю использовать скользящее окно для агрегирования. (3-дневное окно должно ждать данных за 3 дня, это невыносимо для нашей системы).

Как я могу получить совокупную стоимость за последние 3 дня, когда наступит первое событие?

Заранее благодарим за любой совет!


person Brutal_JL    schedule 03.05.2018    source источник
comment
Вы имеете в виду как скользящее окно? Как агрегирование OVER в SQL: ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/?   -  person Dawid Wysakowicz    schedule 03.05.2018


Ответы (3)


Если вы хотите получать более частые обновления, вы можете использовать QueryableState, опрашивая состояние со скоростью, которая соответствует вашему варианту использования.

person Alex    schedule 03.05.2018

Вы можете использовать ContinuousEventTimeTrigger, что приведет к срабатыванию вашего окна в течение более короткого периода времени, чем полное окно, что позволит вам увидеть промежуточное состояние. Вы можете при желании обернуть это в PurgingTrigger, если нижестоящие потребители вашего приемника ожидают, что каждый вывод будет частичной агрегацией (а не полным текущим состоянием), и суммирует их.

person Joshua DeWald    schedule 03.05.2018

Я пробовал CEP.

введите здесь описание изображения

код:

AfterMatchSkipStrategy strategy = AfterMatchSkipStrategy.skipShortOnes();
    Pattern<RiskEvent, ?> loginPattern = Pattern.<RiskEvent>begin("start", strategy)
            .where(eventTypeCondition)
            .timesOrMore(1)
            .greedy()
            .within(Time.hours(1));


    KeyedStream<RiskEvent, String> keyedStream = dataStream.keyBy(new KeySelector<RiskEvent, String>() {
        @Override
        public String getKey(RiskEvent riskEvent) throws Exception {
            // key by user for aggregation
            return riskEvent.getEventType() + riskEvent.getDeviceFp();
        }
    });
    PatternStream<RiskEvent> eventPatternStream = CEP.pattern(keyedStream, loginPattern);

    eventPatternStream.select(new PatternSelectFunction<RiskEvent, RiskResult>() {
        @Override
        public RiskResult select(Map<String, List<RiskEvent>> map) throws Exception {
            List<RiskEvent> list = map.get("start");

            ArrayList<Long> times = new ArrayList<>();
            for (RiskEvent riskEvent : list) {
                times.add(riskEvent.getEventTime());
            }
            Long min = Collections.min(times);
            Long max = Collections.max(times);

            Set<String> accountList = list.stream().map(RiskEvent::getUserName).collect(Collectors.toSet());
            logger.info("时间范围:" + new Date(min) + " --- " + new Date(max) + " 事件:" + list.get(0).getEventType() + ", 设备指纹:" + list.get(0).getDeviceFp() + ", 关联账户:" + accountList.toString());
            return null;
        }
    });

возможно, вы заметили, что стратегия пропуска skipShortOnes является индивидуализированной стратегией.

Покажу вам мою модификацию в CEP lib.

  1. добавить стратегию в Enum.

    общедоступное перечисление SkipStrategy {NO_SKIP, SKIP_PAST_LAST_EVENT, SKIP_TO_FIRST, SKIP_TO_LAST, SKIP_SHORT_ONES}

  2. добавить метод доступа в AfterMatchSkipStrategy.java

    общедоступная статическая AfterMatchSkipStrategy skipShortOnes () {вернуть новую AfterMatchSkipStrategy (SkipStrategy.SKIP_SHORT_ONES); }

  3. добавить стратегические действия в discardComputationStatesAccordingToStrategy метод в NFA.java.

    case SKIP_SHORT_ONES: int i = 0; Список >> tempResult = новый ArrayList ‹> (matchedResult); для (Карта> resultMap: tempResult) {if (i ++ == 0) {продолжить; } matchedResult.remove (resultMap); } перерыв;

person Brutal_JL    schedule 04.05.2018