Потоковая передача flink создать файл (csv или текст) во временном окне

Я новичок в флинке, у меня есть трансформация, предположим, что это

val supportTask= customSource

  .map( line => line.split(","))
  .map( line => SupportTaskNew(line(0)toInt,line(1).toString,line(2)toString,line(3)toLong,line(4).toString,line(5)toInt,line(6)toInt))
  .filter(_ => true) //todo put sent date condition
  .map( line => Count(1))
  .keyBy(0)
  .timeWindow(Time.seconds(20)) //todo for time being 10 seconds, actuals 30 min
 .sum(0)  

теперь я хочу создать файл для каждого временного окна 20 секунд

supportTask.writeAsText(("D://myfile_"+Calendar.getInstance().get(Calendar.SECOND)),WriteMode.NO_OVERWRITE).setParallelism(1)

Я предоставил имя файла + секунды, чтобы каждый раз создавать файл с добавленными к нему секундами.

Но здесь создается только один файл, я хочу создавать новый файл для каждого окна 20 секунд, как я могу это сделать?


person Sanjay Rabari    schedule 25.05.2017    source источник
comment
Используйте API DataStream.writeUsingOutputFormat(). writeAsText записывает все выходные записи в файл, указанный в параметре. Для этого вам нужно реализовать специальный формат вывода.   -  person BrightFlow    schedule 26.05.2017


Ответы (1)


Возможно, вы могли бы сделать это с помощью Групповой приемник файлов и пользовательский Bucketer.

person David Anderson    schedule 29.05.2017