Проблема, с которой я столкнулся, заключается в том, что я не могу вычислить сумму для одного шаблона CEP в scala. Я хочу определить, когда сумма превышает 6100 для определенного идентификатора клиента. Я предоставляю поток с ключом для CEP.pattern (...). Ниже я привел свой код для построения шаблона.
val pattern1 =Pattern.begin[GenericRecord]("start").where((v,ctx) => {
lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum
print((sum + v.get("amount").toString.toLong).toString)
//print(sum+v.get("amount").toString.toLong>6100)
//println(v.get("customer_id").toString+v.get("amount").toString+" , ")
(sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
}).oneOrMore
Мой ввод находится в формате avro, и Flink использует его из kafka. Ввод выглядит так -:
{"trasanction_id":196,"customer_id":28,"datetime":"2017-09-01 12:35:08","amount":6094,"state":"FAILED"}
{"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}
{"trasanction_id":199,"customer_id":27,"datetime":"2017-09-01 12:36:05","amount":2399,"state":"FAILED"}
{"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}```
Однако приведенный ниже код хорошо работает при использовании двух шаблонов:
val pattern1=Pattern.begin[GenericRecord]("start").followedBy("middle").where((v,ctx) => {
lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum
print((sum + v.get("amount").toString.toLong).toString)
//print(sum+v.get("amount").toString.toLong>6100)
//println(v.get("customer_id").toString+v.get("amount").toString+" , ")
(sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
}).oneOrMore