Несоответствие ввода: ожидается тип кортежа при попытке выбора в PatternStream

У меня проблемы с тестированием новых функций Flink 1.0.0. Я возился с CEP, и мне еще не удалось запустить простой демонстрационный код:

val pattern : Pattern[TrafficEvent, _] = Pattern.begin[TrafficEvent]("start")
val patternStream = CEP.pattern(stream.javaStream, pattern);

class MyPatternSelectFunction extends PatternSelectFunction[TrafficEvent, TrafficEvent] {
    override def select(pattern : java.util.Map[String, TrafficEvent]) : TrafficEvent ={
        pattern.get("start")
    }
}
val alerts = patternStream.select(new MyPatternSelectFunction())

Код компилируется хорошо, и maven не выводит предупреждений. TrafficEvent - это класс с несколькими простыми полями, а поток - это Scala DataStream этого класса. Ошибка появляется, когда код работает на Flink. Он выполняется в течение секунды, а затем код выходит с этим сообщением об ошибке:

Программа завершилась со следующим исключением:

  Input mismatch: Tuple type expected.
            org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:878)
            org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:302)
            org.apache.flink.cep.PatternStream.select(PatternStream.java:64)
            com.demo.DemoTraffic$.main(DemoTraffic.scala:311)

Я попытался перенести функциональность на Java, создав статический класс, подобный этому (возможно, есть какие-то странные проблемы с вызовом API из Scala):

public static DataStream<DemoTraffic.trafficEvent>  getStreamByPattern(DataStream<DemoTraffic.trafficEvent> stream) {
  Pattern<DemoTraffic.trafficEvent, ?> pattern = Pattern.<DemoTraffic.trafficEvent>begin("start");
  PatternStream<DemoTraffic.trafficEvent> patternStream = CEP.pattern(stream, pattern);
  DataStream<DemoTraffic.trafficEvent> rvalue = patternStream.select(new PatternSelectFunction<DemoTraffic.trafficEvent, DemoTraffic.trafficEvent>() {
    @Override
    public DemoTraffic.trafficEvent select(Map<String, DemoTraffic.trafficEvent> pattern) throws Exception {
      return pattern.get("start");
    }
  });
  return rvalue;
}

Но результат точно такой же, и он вызывает ту же ошибку в строке PatternStream.select. Есть какие-нибудь подсказки о том, что я могу попробовать или что делаю не так? Как видите, шаблон довольно глупый и предназначен только для проверки целей. Он принимает только все события и возвращает это событие в ответ. Flink - 1.0.0, с использованием версии Scala 2.10.

Спасибо


person midnight1247    schedule 06.04.2016    source источник
comment
Какое определение для TrafficEvent?   -  person Till Rohrmann    schedule 06.04.2016
comment
Не могли бы вы попробовать свой пример с последним SNAPSHOT? Возможно, это как-то связано с недавно исправленным issues.apache.org/jira/browse/ ФЛИНК-3563.   -  person twalthr    schedule 06.04.2016


Ответы (1)


Я предполагаю, что TrafficEvent - это класс случая Scala. Библиотека CEP была написана для Java API Flink и, таким образом, пока не поддерживает классы сценария Scala.

В качестве обходного пути вы можете перевести свой класс case в обычный класс Scala.

Также существует билет JIRA, который отслеживает разработку CEP Scala API.

person Till Rohrmann    schedule 06.04.2016