Как выполнить timeWindow () для String DataStream во Flink?

Я хочу создать временное окно для потоковой передачи данных в Apache Flink. Мои данные выглядят примерно так:

1> {52,"mokshda",84.85}
2> {1,"kavita",26.16}
2> {131,"nidhi",178.9}
3> {2,"poorvi",22.97}
4> {115,"saheba",110.41}

Каждые 20 секунд мне нужна сумма оценок (последний столбец, например, оценка Мокшды - 84,85) всех строк. Функция timeWindow () работает с KeyedStream, поэтому мне нужно keyBy () для этого DataStream. Я могу набрать его по номеру рулона (первый столбец, например, 52 для Мокшды).

val windowedStream = stockStream
                        .keyBy(0)
                        .timeWindow(Time.seconds(20))
                        .sum(2)

Но очевидно, что Flink не читает мои данные в виде списка. Он читает его как строку, и поэтому я получаю следующее исключение:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: String

Как я могу выполнить timeWindow для данных String или как преобразовать эти данные в Tuple?


person Piyush Shrivastava    schedule 18.04.2016    source источник


Ответы (1)


Вы можете преобразовать DataStream[String] в DataStream[(Int, String, Double)], используя MapFunction[String, (Int, String, Double)], который разбирает строку на ее компоненты, преобразует типы данных и выдает Tuple.

Вы также можете применить timeWindowAll к потоку данных без ключа. Однако семантика, конечно, другая, и AllWindow может обрабатываться только с параллелизмом 1.

person Fabian Hueske    schedule 18.04.2016