как сделать подтип () для кортежа в flink cep?

Pattern< Tuple3< String, String, String >, ? > pattern = Pattern.<Tuple3< String, String, String > > begin( "start" )            
            .next( "3" ).where( new FilterFunction< Tuple3< String, String, String > >() {
                @Override
                public boolean filter ( Tuple3< String, String, String > value ) throws Exception {
                    return value.f2.equals( "3" );
                }
            } )
            .next( "4" ).subtype(Tuple.getTupleClass( 2 )).where( new FilterFunction< Tuple2< String, String> >() {
                @Override
                public boolean filter ( Tuple2< String, String > value ) throws Exception {
                    return value.f1.equals( "3" );
                }
            } )

subtype(Tuple.getTupleClass( 2 )) и возникла ошибка Inferred type 'capture<? extends org.apapche.flink.api.java.tuple.Tuple>' for type parameter 'S' is not within its bound;should extend 'org.apapche.flink.api.java.tuple.Tuple3<java.lang.String,java.lang.String,java.lang.String>'

я должен изменить это?но как?Pattern< Tuple3< String, String, String >, ? > pattern


обновление до 2017 012

JoinedStreams< Tuple2< String, String >, Tuple3< String, String, String > >.Where< String >.EqualTo
        joinedStreams = someStream
        .join( otherStream )
        .where( value -> value.f1 )
        .equalTo( value -> value.f1 );

Pattern< Tuple, ? > pattern = Pattern.< Tuple > begin( "start" )
        .subtype( Tuple3.class )
        .where( evt -> evt.f2.equals( "3" ) )
        .next( "4" )
        .subtype( Tuple2.class )
        .where( evt -> evt.f1.equals( "3" ) )
        .within( Time.seconds( 10 ) );

PatternStream< ...> patternStream = CEP.pattern( joinedStreams, pattern );

Я попробовал это, и не знаю, что я должен заполнить PatternStream< ...>. Спасибо всем, кто может предложить помощь.


person 陶加涛    schedule 10.01.2017    source источник


Ответы (2)


Попробуйте этот код:

Pattern<Tuple, ?> pattern =
    Pattern.<Tuple>begin("start")
    .next("3")
        .subtype(Tuple3.class)
        .where(new FilterFunction<Tuple3>() {

            @Override
            public boolean filter(Tuple3 value) throws Exception {
                return value.f2.equals("3");
            }
        })
    .next("4")
        .subtype(Tuple2.class)
        .where(new FilterFunction<Tuple2>() {

            @Override
            public boolean filter(Tuple2 value) throws Exception {
                return value.f1.equals("3");
            }
        });

Начните с общего типа Tuple и используйте конкретные типы Tuple2 и Tuple3 для подсобытий. И поток данных для этого паттерна должен иметь тип Tuple.

person Alex Chermenin    schedule 10.01.2017
comment
Большое спасибо, я попробую, и последний даст вам ответ. Другой вопрос, могу ли я присоединиться к другому потоку в Flink CEP? Потому что CEP.pattern( partitionedEventA, pattern ); вы можете передать ему только один поток данных. Tuple2 в моем questino из другого потока данных .Но я не знаю, как это сделать. Еще раз спасибо. - person 陶加涛; 11.01.2017

что насчет этого:

    Pattern<Tuple, ?> pattern = Pattern.<Tuple>begin("start")
            .subtype(Tuple3.class)
            .where(evt -> evt.f2.equals("3"))
            .next("4")
            .subtype(Tuple2.class)
            .where(evt -> evt.f1.equals("3"))
            .within(Time.seconds(10));
  1. вам не нужно добавлять следующий после начала
  2. обратите внимание на буквальное значение подтипа, tuple3 и tuple2 должны расширять tuple.

Если вы хотите соединить два разных потока данных.

DataStream<Tuple2> someStream = //...
DataStream<Tuple3> otherStream = //...

ConnectedStreams<Tuple2, Tuple3> connectedStreams = someStream.connect(otherStream);

Затем вы можете использовать CoMap, CoFlatMap, чтобы получить тот же тип, например преобразовать Tuple2, Tuple3 в String: ConnectedStreams → DataStream

connectedStreams.flatMap(new CoFlatMapFunction<Tuple2, Tuple3, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(Tuple2.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(Tuple3.toString);
       }
   }
});

Вот несколько полезных ссылок, представляющих хороший вариант использования:

  1. Введение в обработку сложных событий (CEP) с помощью Apache Flink
  2. китайская версия, которую я перевожу
person William_Sang    schedule 10.01.2017
comment
большое спасибо, так как я могу выразить это: Кортеж2, за которым следует Кортеж3? - person 陶加涛; 10.01.2017
comment
Большое спасибо. Другой вопрос, могу ли я присоединиться к другому потоку в Flink CEP? Потому что в CEP.pattern( partitionedEventA, pattern ); вы можете передать ему только один поток данных. Tuple2 в моем вопросе - из другого потока данных. Но я не знаю, как это сделать. Спасибо опять таки. - person 陶加涛; 11.01.2017
comment
Да, но это связано с DataStream. Вы можете подключить два DataStream. Я обновил ответ. - person William_Sang; 11.01.2017
comment
Спасибо. Но если я хочу объединить поля в someStream и otherStream .like someStream.FieldB==otherStream.FieldE . Должен ли я использовать joinStream ? Но я не знаю, как написать предложение where. Еще раз большое спасибо. Могу ли я добавить ваш qq? Это мой номер: 245915794 - person 陶加涛; 12.01.2017
comment
Конечно, можешь. Просто используйте select API в PatternStream. См. ссылку на китайскую версию: segmentfault.com/a/1190000008074156 . Если ты прочитаешь это, ты сможешь понять. Нам нельзя использовать QQ в рабочее время, возможно, после работы. - person William_Sang; 12.01.2017
comment
Присоединение основано на окне, вам нужно понять это. почему бы вам не использовать метод подключения? - person William_Sang; 16.01.2017