Итеративное условие Flink CEP для одного шаблона в Scala

Проблема, с которой я столкнулся, заключается в том, что я не могу вычислить сумму для одного шаблона CEP в scala. Я хочу определить, когда сумма превышает 6100 для определенного идентификатора клиента. Я предоставляю поток с ключом для CEP.pattern (...). Ниже я привел свой код для построения шаблона.

val pattern1 =Pattern.begin[GenericRecord]("start").where((v,ctx) => {

      lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum

      print((sum + v.get("amount").toString.toLong).toString)
     //print(sum+v.get("amount").toString.toLong>6100)
      //println(v.get("customer_id").toString+v.get("amount").toString+" , ")
      (sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
    }).oneOrMore

Мой ввод находится в формате avro, и Flink использует его из kafka. Ввод выглядит так -:

{"trasanction_id":196,"customer_id":28,"datetime":"2017-09-01 12:35:08","amount":6094,"state":"FAILED"}
{"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}
{"trasanction_id":199,"customer_id":27,"datetime":"2017-09-01 12:36:05","amount":2399,"state":"FAILED"}
{"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}```

Однако приведенный ниже код хорошо работает при использовании двух шаблонов:

val pattern1=Pattern.begin[GenericRecord]("start").followedBy("middle").where((v,ctx) => {

      lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum

      print((sum + v.get("amount").toString.toLong).toString)
     //print(sum+v.get("amount").toString.toLong>6100)
      //println(v.get("customer_id").toString+v.get("amount").toString+" , ")
      (sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
    }).oneOrMore

person Anish Sarangi    schedule 27.12.2019    source источник
comment
Привет, Дэвид, я пытался показать вам код, для которого я получал результат.   -  person Anish Sarangi    schedule 27.12.2019


Ответы (1)


getEventsForPattern возвращает значения, уже совпадающие с шаблоном. Разберем покупателя 27. При обработке события

{"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}

ваш первый фрагмент отклоняет это сообщение, поскольку это не удовлетворяет условию: sum + amount = 0 + 6094 < 6100. При обработке

{"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}

ваше условие снова будет проверять, 0 + 547 > 6100, и поэтому вы не видите вывода.

Во втором примере вы используете оператор followedBy, что означает, что вы собираетесь обрабатывать пары элементов. Первая транзакция принимается безоговорочно (так как вы не включаете оператор where), и теперь она будет возвращена вызовом ctx.getEventsForPattern("start"). Надеюсь, вы понимаете поведение этого кода.


CEP в основном используется для выявления закономерностей в данных, а не для их агрегирования. К вашей проблеме можно подойти, выполнив оконное управление с последующей фильтрацией - здесь нет необходимости использовать CEP.

person moped    schedule 28.12.2019
comment
CEP сортирует временные потоки событий перед сопоставлением с образцом. Это может быть поводом использовать его даже в тех случаях, когда шаблон тривиален. - person David Anderson; 30.12.2019
comment
@bottaio спасибо за устранение путаницы. Однако что, если я хочу сопоставить шаблоны в соответствии с определенным агрегированным значением. Как использовать для этого итеративное условие? Я думал об использовании окон, а затем CEP, но в этом случае, возможно, будет конфликт между CEP во времени и во времени окна. Я не уверен в этом. Не могли бы вы уточнить это? Заранее спасибо. - person Anish Sarangi; 30.12.2019
comment
@DavidAnderson Я разделил значения по идентификатору customer_id, поэтому, думаю, сортировка не будет проблемой, потому что сортировка будет выполняться по разделам. - person Anish Sarangi; 30.12.2019