Программа потоковой передачи Flink работает правильно со временем обработки, но не дает результатов со временем события

Обновление добавлено env.getConfig().setAutoWatermarkInterval(1000L);

не устранил проблему.

Думаю, проблема в другой части моего кода. Итак, во-первых, немного больше предыстории.

Программа использует поток JSON смешанных типов сообщений из одной очереди kafka. программа сначала конвертирует в поток типа ObjectNode. затем этот поток разделяется с использованием .split() примерно на 10 отдельных потоков. эти потоки отображаются в потоки POJO.

затем этим потокам POJO присваиваются временные метки, прежде чем они будут добавлены в окно (1 окно на поток типа POJO), запрограммированы, а затем суммированы и усреднены в рамках настраиваемой функции перед отправкой обратно в другую очередь kafka.

Пример расширенного кода

public class flinkkafka {

public static void main(String[] args) throws Exception {
    //create object mapper to allow object to JSON transform
    final ObjectMapper mapper = new ObjectMapper();
    final String OUTPUT_QUEUE = "test";
    //setup streaming environment
    StreamExecutionEnvironment env =    
         StreamExecutionEnvironment
              .getExecutionEnvironment();

    //set streaming environment variables from command line
    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    //set time characteristic to EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    //set watermark polling interval
    env.getConfig().setAutoWatermarkInterval(1000L);

    //Enable checkpoints to allow for graceful recovery
    env.enableCheckpointing(1000);

    //set parallelism
    env.setParallelism(1);

    //create an initial data stream of mixed messages
    DataStream<ObjectNode> messageStream = env.addSource
            (new FlinkKafkaConsumer09<>(
                    parameterTool.getRequired("topic"), 
                    new JSONDeserializationSchema(),
                    parameterTool.getProperties())) 
                      .assignTimestampsAndWatermarks(new
                      BoundedOutOfOrdernessTimestampExtractor<ObjectNode>
                      (Time.seconds(10)){
                        private static final long serialVersionUID = 1L;

                        @Override
                        public long extractTimestamp(ObjectNode value) {
                            DateFormat format = new SimpleDateFormat("yyyy-
                             MM-dd HH:mm:ss", Locale.ENGLISH);
                            long tmp = 0L;
                            try {
                                tmp = 
                               format.parse(value.get("EventReceivedTime")
                                    .asText()).getTime();
                            } catch (ParseException e) {
                                e.printStackTrace();
                            }
                            System.out.println("Assigning timestamp " + 
                               tmp);
                            return tmp;
                        }

                    });

    //split stream by message type
    SplitStream<ObjectNode> split = messageStream.split(new  
               OutputSelector<ObjectNode>(){
        private static final long serialVersionUID = 1L;

        @Override
        public Iterable<String> select(ObjectNode value){
            List<String> output = new ArrayList<String>();
            switch (value.get("name").asText()){
            case "one":
                switch (value.get("info").asText()){
                case "two":
                    output.add("info");
                    System.out.println("Sending message to two
                          stream");
                    break;
                case "three":
                    output.add("three");
                    System.out.println("Sending message to three stream");
                    break;
                case "four":
                    output.add("four");
                    System.out.println("Sending message to four stream");
                    break;
                case "five":
                    output.add("five");
                    System.out.println("Sending message to five stream");
                    break;
                case "six":
                    output.add("six");
                    System.out.println("Sending message to six stream");
                    break;
                default:
                    break;
                }
                break;
            case "seven":
                output.add("seven");
                System.out.println("Sending message to seven stream");
                break;
            case "eight":
                output.add("eight");
                System.out.println("Sending message to eight stream");
                break;
            case "nine":
                output.add("nine");
                System.out.println("Sending message to nine stream");
                break;
            case "ten":
                switch (value.get("info").asText()){
                case "eleven":
                    output.add("eleven");
                    System.out.println("Sending message to eleven stream");
                    break;
                case "twelve":
                    output.add("twelve");
                    System.out.println("Sending message to twelve stream");
                    break;
                default:
                    break;
                }
                break;
            default:
                output.add("failed");
                break;
            }
            return output;
        }
    });

    //assign splits to new data streams
    DataStream<ObjectNode> two = split.select("two");
    //assigning more splits to streams

    //convert ObjectNodes to POJO 

    DataStream<Two> twoStream = two.map(new MapFunction<ObjectNode, Two>(){
        private static final long serialVersionUID = 1L;

        @Override
        public Twomap(ObjectNode value) throws Exception {
            Two stream = new Two();
            stream.Time = value.get("Time").asText();
            stream.value = value.get("value").asLong();
            return front;
        }
    });

    DataStream<String> keyedTwo = twoStream
            .keyBy("name")
            .timeWindow(Time.minutes(5))
            .apply(new twoSum())
            .map(new MapFunction<Two, String>(){
                private static final long serialVersionUID = 1L;
                @Override
                public String map(Two value) throws Exception {
                    return mapper.writeValueAsString(value);
                }
            });
    keyedTwo.addSink(new FlinkKafkaProducer09<String>
         (parameterTool.getRequired("bootstrap.servers"),
                 OUTPUT_QUEUE, new SimpleStringSchema()));

    env.execute();

Я пытаюсь использовать Flink для агрегирования очереди Kafka и отправки потока данных обратно в Kafka. Агрегирование будет использовать 5-минутное временное окно события, программа компилируется и запускается, но собранные данные никогда не покидают окно для передачи в функцию агрегации и поэтому никогда не доставляют сообщения в Kafka. Однако, если я закомментирую характеристику eventTime, программа запускается и дает результаты. Я понятия не имею, где я ошибаюсь.

Код EventTime

StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool parameterTool = ParameterTool.fromArgs(args);

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.enableCheckpointing(1000);

DataStream<FrontEnd> frontEndStream = frontEnd.map(new
    MapFunction<ObjectNode, FrontEnd>(){

        private static final long serialVersionUID = 1L;

        @Override
        public FrontEnd map(ObjectNode value) throws Exception {
        FrontEnd front = new FrontEnd();
        front.eventTime = value.get("EventReceivedTime").asText();
        return front;
        }
    }).assignTimestampsAndWatermarks(new
        BoundedOutOfOrdernessTimestampExtractor<FrontEnd>(Time.seconds(10)){
            private static final long serialVersionUID = 1L;
            @Override
            public long extractTimestamp(FrontEnd value) {
                DateFormat format = new SimpleDateFormat("yyyy-MM-
                    ddHH:mm:ss",Locale.ENGLISH);
                long tmp = 0L;
                try {
                tmp = format.parse(value.eventTime).getTime();
            } catch (ParseException e) {
                e.printStackTrace();
            }
            return tmp;
        }

    });

    DataStream<String> keyedFrontEnd = frontEndStream
        .keyBy("name")
        .timeWindow(Time.minutes(5))
        .apply(new FrontEndSum())
        .map(new MapFunction<FrontEnd, String>(){
                private static final long serialVersionUID = 1L;
                @Override
                public String map(FrontEnd value) throws Exception {
                    return mapper.writeValueAsString(value);
                }
            });
   .map(new MapFunction<FrontEnd, String>(){
                private static final long serialVersionUID = 1L;
                @Override
                public String map(FrontEnd value) throws Exception {
                    return mapper.writeValueAsString(value);
                }
            });
    keyedFrontEnd.addSink(new FlinkKafkaProducer09<String>
    (parameterTool.getRequired("bootstrap.servers"), OUTPUT_QUEUE, new 
    SimpleStringSchema()));  

    env.execute();
    }
}

Я пробовал с экстрактором отметок времени, прикрепленным к входящему потоку, и с одним, прикрепленным к каждому из потоков POJO. Опять же, этот код работает со временем события и выдает ожидаемый результат в виде потока строк JSON с ожидаемыми агрегатами. Однако после включения времени события окна никогда не производят результат.


person andrew.kane    schedule 06.12.2016    source источник
comment
Удалось ли вам решить эту проблему? Я столкнулся с той же проблемой, когда я думаю, что некоторые события не обрабатываются, потому что водяной знак не перемещается вперед   -  person karthiks3000    schedule 14.09.2020


Ответы (2)


BoundedOutOfOrdernessTimestampExtractor реализует интерфейс AssignerWithPeriodicWatermarks, что означает, что Flink периодически запрашивает текущий водяной знак.

Вы должны настроить интервал опроса с помощью ExecutionConfig:

env.getConfig.setAutoWatermarkInterval(1000L); // poll watermark every second
person Fabian Hueske    schedule 07.12.2016
comment
спасибо за помощь, я попробовал исправить, но у меня все еще та же проблема - person andrew.kane; 08.12.2016
comment
Привет, Фабиан, setAutoWatermarkInterval работают ли должным образом? Потому что согласно code, теперь он не учитывает значение конфигурации пользователя, вместо этого он заменяет значение либо на ноль, если это ProcessingTime, иначе 200 мс. Тем не менее в документации говорится: можно настроить AutoWatermarkInterval. Не могли бы вы прояснить это? - person Jaya Ananthram; 09.05.2020
comment
На самом деле setAutoWatermarkInterval работает должным образом, если я устанавливаю значение после вызова метода setStreamTimeCharacteristic. Это ошибка? Не могли бы вы подтвердить это? Если да, я создам JIRA и попытаюсь исправить это в исходном коде Flink. - person Jaya Ananthram; 09.05.2020
comment
Привет, @JayaAnanthram, спасибо за ваш комментарий. Комментарии Stack Overflow - не лучшее место для обсуждения потенциальных ошибок. Создание задачи Jira - хорошая идея. Вы также можете сначала обратиться к списку рассылки пользователей, чтобы проверить проблему. Спасибо, Фабиан - person Fabian Hueske; 11.05.2020

Моя первая склонность - всегда предполагать проблему с часовым поясом.

Каков часовой пояс поля "EventReceivedTime" в полезной нагрузке kafka?

SimpleDateFormat будет анализировать в ЛОКАЛЬНОМ часовом поясе JVM:

DateFormat format = new SimpleDateFormat("yyyy-MM-ddHH:mm:ss",Locale.ENGLISH);

можете добавить

format.setTimeZone(TimeZone.getTimeZone("GMT"));

для анализа вашей строки как, например, GMT, если это то, что представляет текст. Вы должны убедиться, что часовой пояс / смещение всех ваших дат, водяных знаков и т. Д. Совпадают и все сравниваются во времени UTC / эпохи (это то, что вы получаете, когда извлекаете из него Long).

person Dave Torok    schedule 08.12.2016