определение задержки обнаружения Flink CEP для каждого кортежа

У меня есть простой шаблон, как показано ниже

 Pattern<Event,?> pattern = Pattern.<Event>begin("s1")
                    .where(new SimpleCondition<Event>() {
                        @Override
                        public boolean filter(Event event) throws Exception {

                            Long time = System.nanoTime();
                            // here we are setting the time when this event is detected
                            event.setEdtl(time);

                            return event.getSensor_id() == 1 && event.getValue() > 150;
                        }
                    }).followedBy("s2")
                    .where(new SimpleCondition<Event>() {
                        @Override
                        public boolean filter(Event event) throws Exception {
                            Long time = System.nanoTime();

                            // here we are setting the time when this event is detected
                            event.setEdtl(time);

                            return event.getSensor_id() == 2 && event.getValue() > 15;
                        }
                    }).followedBy("s3")
                    .where(new SimpleCondition<Event>() {
                        @Override
                        public boolean filter(Event event) throws Exception {

                            Long time = System.nanoTime();
                            // here we are setting the time when this event is detected
                            event.setEdtl(time);
                            return event.getSensor_id() == 3 && event.getValue() > 35;
                        }
                    })
                    .within(Time.milliseconds(WindowLength_join__ms));

для определения задержки времени обнаружения CEP добавлено время выбора каждого события в шаблоне, как показано выше. У каждого класса событий есть параметр Edtl (локальное время обнаружения события), который сначала устанавливается на 0, а затем устанавливается на System.nanoTime();.

Я получаю следующую ошибку во время выполнения, но дело в том, что ошибка возникает после того, как программа запускается в течение некоторого времени.

    Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
    at org.apache.flink.cep.operator.KeyedCEPPatternOperator.emitMatchedSequences(KeyedCEPPatternOperator.java:77)
    at org.apache.flink.cep.operator.KeyedCEPPatternOperator.processEvent(KeyedCEPPatternOperator.java:58)
    at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:236)
    at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
    at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
    ... 7 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
    ... 18 more
Caused by: java.io.IOException: Failed to send message 'patient_id=1, egtl_raw=null, edtg=null
' to socket server at localhost:6020. Connection re-tries are not enabled.
    at org.apache.flink.streaming.api.functions.sink.SocketClientSink.invoke(SocketClientSink.java:154)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
    ... 26 more
Caused by: java.net.SocketException: Broken pipe (Write failed)
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:143)
    at org.apache.flink.streaming.api.functions.sink.SocketClientSink.invoke(SocketClientSink.java:146)

Я думаю, что это шаблон, потому что я выполняю операции чтения и записи внутри шаблона. Если это так, то как мне найти среднюю задержку сложного события в Flink CEP.


person Amarjit Dhillon    schedule 03.01.2018    source источник


Ответы (1)


введите описание изображения здесь

Я нашел решение, которое может быть не лучшим, но оно работает. Обнаружив задержку сложного события [CE] на рисунке выше, нам нужно найти время, когда CE был обнаружен, и минимальное время необработанного события, а затем найти разницу между этими временами. Причина, по которой я использую минимум здесь, состоит в том, что, скажем, в комплексное событие включены 3 потока a, b & c, поэтому, кому приходится ждать больше всего, мы рассмотрим его время. На мой взгляд, это можно выбрать по выкройке.

Задача 1: присвоение метки времени первому шаблону, как показано ниже.

if(perform_cep ){

        // performing some cep on merged stream


        Pattern<Event,?> pattern = Pattern.<Event>begin("s1")
                .where(new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) throws Exception {

                        // here we are setting the time when this event is detected

                        if(event.getSensor_id() == 1 && event.getValue() > 150){
                            Long time = System.currentTimeMillis();
                            event.setEdtl(time);
                            return true;

                        }
                        else return false;

                    }
                }).followedBy("s2")
                .where(new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) throws Exception {

                        return  event.getSensor_id() == 2 && event.getValue() > 15 ;

                    }
                }).followedBy("s3")
                .where(new SimpleCondition<Event>() {
                    @Override
                    public boolean filter(Event event) throws Exception {

                       return  event.getSensor_id() == 3 && event.getValue() > 35;

                    }
                })
                .within(Time.milliseconds(WindowLength_join__ms));

Задача 2: присвоение метки времени сложному потоку событий и определение задержки. Это делается следующим образом

 PatternStream<Event> patternStream = CEP.pattern(mergedStream,pattern);



            DataStream<String> cep_stream = patternStream.select(new PatternSelectFunction<Event, String>() {


  @Override
            public String select(Map<String, List<Event>> map) throws Exception {

                Event s1 = map.get("s1").get(0);

                Integer patient_id = s1.getPatient_id();
                Integer val1 = s1.getValue();
                Long time_s1 = s1.getEdtl();



                Event s2 = map.get("s2").get(0);
                Integer val2 = s2.getValue();


                Event s3 = map.get("s3").get(0);
                Integer val3 = s3.getValue();



                System.out.println("value 1  = " + val1);
                System.out.println("value 2  = " + val2);
                System.out.println("value 3  = " + val3);



                Long current_time = System.currentTimeMillis();
                Long cep_latency = current_time - time_s1 ;
                System.out.println("cep_latency = " + cep_latency + "ms" );


                String event_data =  "patient_id=" + patient_id +
                               ", cep_latency=" + cep_latency ;


                return event_data+ "\n";


            }
        });

Надеюсь, это поможет ???? Сообщите мне, есть ли другое решение этой проблемы.

person Amarjit Dhillon    schedule 03.01.2018