Как заставить Spark Streaming записывать свой вывод, чтобы Impala могла его прочитать?

У меня есть следующая проблема с Spark Streaming API. В настоящее время я передаю входные данные через Flume в Spark Streaming, с помощью которого я планирую выполнить некоторую предварительную обработку данных. Затем я хотел бы сохранить данные в файловой системе Hadoop и запросить их с помощью Impala. Однако Spark записывает файлы данных в отдельные каталоги, и для каждого RDD создается новый каталог.

Это проблема, потому что, прежде всего, внешние таблицы в Impala не могут обнаруживать подкаталоги, а только файлы внутри каталога, на который они указывают, если они не разбиты на разделы. Во-вторых, Spark добавляет новые каталоги так быстро, что для производительности было бы очень плохо периодически создавать новый раздел в Impala для каждого сгенерированного каталога. С другой стороны, если я решу увеличить интервал записи в Spark, чтобы каталоги генерировались реже, будет дополнительная задержка, пока Impala не сможет прочитать входящие данные. Это неприемлемо, поскольку моя система должна поддерживать приложения реального времени. В Hive я мог настроить внешние таблицы так, чтобы они также обнаруживали подкаталоги без необходимости разделения, используя следующие параметры:

set hive.mapred.supports.subdirectories=true;
set mapred.input.dir.recursive=true;

Но, насколько я понимаю, у Impala нет такой функции.

В настоящее время я использую следующий код для чтения данных из Flume и записи их в HDFS:

val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8"))).saveAsTextFiles(path)

Здесь переменная path определяет префикс каталога, в который добавляются текстовые файлы (part-0000 и т. д.), а остальная часть имени каталога представляет собой отметку времени, сгенерированную Spark. Я мог бы изменить код на что-то вроде этого:

val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
val mapStream = stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8")))
mapStream.foreachRDD(rdd => rdd.saveAsTextFile(path))

В этом случае файлы будут добавлены в один и тот же каталог, определяемый путем, но, поскольку они всегда называются часть-00000, часть-00001, часть-00002 и т. д., ранее созданные файлы будут перезаписаны. Изучая исходный код Spark, я заметил, что имена файлов определяются строкой в ​​методе open() SparkHadoopWriter:

val outputName = "part-"  + numfmt.format(splitID)

И мне кажется, что нет возможности манипулировать splitID через Spark API. Подводя итог, мои вопросы следующие:

  • Есть ли способ заставить внешние таблицы в Impala обнаруживать подкаталоги?
  • Если нет, есть ли способ заставить Spark записывать свои выходные файлы в один каталог или иным образом в форме, которая мгновенно читается Impala?
  • Если нет, ожидается ли какое-либо обновление для Spark, чтобы решить эту проблему, или мне просто нужно разветвить свою собственную версию Spark, с помощью которой я могу выбирать имена файлов, которые он записывает сам?

person fubar    schedule 13.06.2014    source источник
comment
Вам удалось ее решить? Если да, то как? Спасибо.   -  person bhelm    schedule 29.01.2015


Ответы (1)


Я не могу говорить за Импалу.

part-xxxxx — это соглашение Hadoop, которому следует Spark. Большинство инструментов понимают этот формат, и я предполагаю, что Spark мало что может с этим поделать. Файлы деталей должны быть уникальными, и обычно к имени файла добавляется номер раздела.

Я бы посмотрел в Impala, чтобы узнать, как читать файл детали, поскольку большинство инструментов Hadoop генерируют его таким образом.

Если кто-то хочет настроить структуру каталогов — хотя это не ваш вопрос — этого можно легко добиться, скажем, изменить формат prefix-timestamp-suffix. Spark Steaming использует RDD.saveAsTextFiles(..) под капотом Spark, который можно настроить. Вот код из DStream.scala:

  def saveAsTextFiles(prefix: String, suffix: String = "") {
    val saveFunc = (rdd: RDD[T], time: Time) => {
      val file = rddToFileName(prefix, suffix, time)
      rdd.saveAsTextFile(file)
    }
    this.foreachRDD(saveFunc)
  }
person antony    schedule 11.02.2015