У меня проблемы с тестированием новых функций Flink 1.0.0. Я возился с CEP, и мне еще не удалось запустить простой демонстрационный код:
val pattern : Pattern[TrafficEvent, _] = Pattern.begin[TrafficEvent]("start")
val patternStream = CEP.pattern(stream.javaStream, pattern);
class MyPatternSelectFunction extends PatternSelectFunction[TrafficEvent, TrafficEvent] {
override def select(pattern : java.util.Map[String, TrafficEvent]) : TrafficEvent ={
pattern.get("start")
}
}
val alerts = patternStream.select(new MyPatternSelectFunction())
Код компилируется хорошо, и maven не выводит предупреждений. TrafficEvent - это класс с несколькими простыми полями, а поток - это Scala DataStream этого класса. Ошибка появляется, когда код работает на Flink. Он выполняется в течение секунды, а затем код выходит с этим сообщением об ошибке:
Программа завершилась со следующим исключением:
Input mismatch: Tuple type expected.
org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:878)
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:302)
org.apache.flink.cep.PatternStream.select(PatternStream.java:64)
com.demo.DemoTraffic$.main(DemoTraffic.scala:311)
Я попытался перенести функциональность на Java, создав статический класс, подобный этому (возможно, есть какие-то странные проблемы с вызовом API из Scala):
public static DataStream<DemoTraffic.trafficEvent> getStreamByPattern(DataStream<DemoTraffic.trafficEvent> stream) {
Pattern<DemoTraffic.trafficEvent, ?> pattern = Pattern.<DemoTraffic.trafficEvent>begin("start");
PatternStream<DemoTraffic.trafficEvent> patternStream = CEP.pattern(stream, pattern);
DataStream<DemoTraffic.trafficEvent> rvalue = patternStream.select(new PatternSelectFunction<DemoTraffic.trafficEvent, DemoTraffic.trafficEvent>() {
@Override
public DemoTraffic.trafficEvent select(Map<String, DemoTraffic.trafficEvent> pattern) throws Exception {
return pattern.get("start");
}
});
return rvalue;
}
Но результат точно такой же, и он вызывает ту же ошибку в строке PatternStream.select. Есть какие-нибудь подсказки о том, что я могу попробовать или что делаю не так? Как видите, шаблон довольно глупый и предназначен только для проверки целей. Он принимает только все события и возвращает это событие в ответ. Flink - 1.0.0, с использованием версии Scala 2.10.
Спасибо
TrafficEvent
? - person Till Rohrmann   schedule 06.04.2016