Сжатый ввод BZip2 для Apache Flink

У меня есть дамп википедии, сжатый с помощью bzip2 (загружен с http://dumps.wikimedia.org/enwiki/), но я не хочу его распаковывать: я хочу обработать его, распаковывая на лету.

Я знаю, что это можно сделать на простой Java (см., например, Java — чтение файла BZ2 и распаковка/анализ на лету), но мне интересно, как это сделать в Apache Flink? Мне, вероятно, нужно что-то вроде https://github.com/whym/wikihadoop, но для Flink, не Хадуп.


person Alexey Grigorev    schedule 03.04.2015    source источник


Ответы (1)


В Apache Flink можно читать сжатые файлы следующих форматов:

org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.Lz4Codec
org.apache.hadoop.io.compress.SnappyCodec

Как видно из названий пакетов, Flink делает это с помощью Hadoop InputFormats. Это пример чтения файлов gz с использованием Flink Scala API: (Вам нужен Flink не ниже 0.8.1)

def main(args: Array[String]) {

  val env = ExecutionEnvironment.getExecutionEnvironment
  val job = new JobConf()
  val hadoopInput = new TextInputFormat()
  FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz"))
  val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job)

  lines.print

  env.execute("Read gz files")
}

Apache Flink поддерживает только файлы .deflate. Добавить поддержку дополнительных кодеков сжатия легко, но это еще не сделано.

Использование HadoopInputFormats с Flink не приводит к снижению производительности. Flink имеет встроенную поддержку сериализации для типов Writable Hadoop.

person Robert Metzger    schedule 03.04.2015