У меня есть следующая проблема с Spark Streaming API. В настоящее время я передаю входные данные через Flume в Spark Streaming, с помощью которого я планирую выполнить некоторую предварительную обработку данных. Затем я хотел бы сохранить данные в файловой системе Hadoop и запросить их с помощью Impala. Однако Spark записывает файлы данных в отдельные каталоги, и для каждого RDD создается новый каталог.
Это проблема, потому что, прежде всего, внешние таблицы в Impala не могут обнаруживать подкаталоги, а только файлы внутри каталога, на который они указывают, если они не разбиты на разделы. Во-вторых, Spark добавляет новые каталоги так быстро, что для производительности было бы очень плохо периодически создавать новый раздел в Impala для каждого сгенерированного каталога. С другой стороны, если я решу увеличить интервал записи в Spark, чтобы каталоги генерировались реже, будет дополнительная задержка, пока Impala не сможет прочитать входящие данные. Это неприемлемо, поскольку моя система должна поддерживать приложения реального времени. В Hive я мог настроить внешние таблицы так, чтобы они также обнаруживали подкаталоги без необходимости разделения, используя следующие параметры:
set hive.mapred.supports.subdirectories=true;
set mapred.input.dir.recursive=true;
Но, насколько я понимаю, у Impala нет такой функции.
В настоящее время я использую следующий код для чтения данных из Flume и записи их в HDFS:
val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8"))).saveAsTextFiles(path)
Здесь переменная path определяет префикс каталога, в который добавляются текстовые файлы (part-0000 и т. д.), а остальная часть имени каталога представляет собой отметку времени, сгенерированную Spark. Я мог бы изменить код на что-то вроде этого:
val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
val mapStream = stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8")))
mapStream.foreachRDD(rdd => rdd.saveAsTextFile(path))
В этом случае файлы будут добавлены в один и тот же каталог, определяемый путем, но, поскольку они всегда называются часть-00000, часть-00001, часть-00002 и т. д., ранее созданные файлы будут перезаписаны. Изучая исходный код Spark, я заметил, что имена файлов определяются строкой в методе open() SparkHadoopWriter:
val outputName = "part-" + numfmt.format(splitID)
И мне кажется, что нет возможности манипулировать splitID через Spark API. Подводя итог, мои вопросы следующие:
- Есть ли способ заставить внешние таблицы в Impala обнаруживать подкаталоги?
- Если нет, есть ли способ заставить Spark записывать свои выходные файлы в один каталог или иным образом в форме, которая мгновенно читается Impala?
- Если нет, ожидается ли какое-либо обновление для Spark, чтобы решить эту проблему, или мне просто нужно разветвить свою собственную версию Spark, с помощью которой я могу выбирать имена файлов, которые он записывает сам?