Преобразование данных в паркет в Spark

У меня есть устаревшие данные в S3, которые я хочу преобразовать в формат паркета с помощью Spark 2 с помощью Java API.

У меня есть желаемая схема Avro (файлы .avsc) и их сгенерированные классы Java с использованием компилятора Avro, и я хочу сохранить данные, используя эту схему в формате Parquet. Входные данные не имеют какого-либо стандартного формата, но у меня есть библиотека, которая может преобразовывать каждую строку из устаревших файлов в классы Avro.

Можно ли прочитать данные как JavaRDD<String>, применить преобразование к классам Avro с помощью библиотеки и, наконец, сохранить их в формате паркета.

Что-то вроде:

JavaRDD<String> rdd = javaSparkContext.textFile("s3://bucket/path_to_legacy_files");    
JavaRDD<MyAvroClass> converted = rdd.map(line -> customLib.convertToAvro(line));    
converted.saveAsParquet("s3://bucket/destination"); //how do I do this

Возможно ли что-то подобное? Позже я захочу обработать преобразованные данные паркета с помощью Hive, Presto, а также Spark.


person Swaranga Sarma    schedule 18.01.2017    source источник
comment
Найдите пресс-релиз Spark Summit. Стив Лофран (Хортон) о хранилищах объектов ...   -  person Samson Scharfrichter    schedule 18.01.2017
comment
@SamsonScharfrichter Не отвечает на мой вопрос. Единственное, что я видел отдаленно, - это то, как он преобразовал некоторые CSV-данные в Parquet. Он использует вызов sparkSession.csv () для загрузки данных, которые я не могу, поскольку мне нужно использовать специальный десериализатор.   -  person Swaranga Sarma    schedule 18.01.2017
comment
Итак, каков ваш актуальный вопрос? Речь идет о преобразовании пользовательского JavaRDD<stuff> в обычный DataFrame? О сохранении ваших нестандартных вещей в формате Parquet? О сохранении этого в хранилище объектов S3? О способе чтения ваших пользовательских материалов с помощью другого инструмента, который не знает, что такое RDD? Комбинация вышеперечисленного?   -  person Samson Scharfrichter    schedule 18.01.2017
comment
@SamsonScharfrichter Вопрос в основном в том, как преобразовать некоторые нестандартные данные в паркет. В моем распоряжении кластер Spark 2.0, определения схемы Avro и библиотека Java, которая может преобразовывать записи из устаревшего нестандартного формата в экземпляр класса Avro. Фрагмент кода был просто мыслью, спрашивающей, можно ли сделать что-то подобное.   -  person Swaranga Sarma    schedule 18.01.2017


Ответы (2)


Пока игнорируйте S3; это деталь производства. Вам нужно начать с более простой задачи «преобразовать локальный файл моего формата в стандартный». Это то, что вы можете реализовать локально с помощью модульных тестов для небольшого набора данных.

Как правило, в Spark это то же самое, что и в Hadoop Mapreduce: реализовать подкласс InputFormat<K, V> или FileInputFormat<K, V> или использовать входной формат Hadoop org.apache.hadoop.streaming.mapreduce.StreamInputFormat, реализовать собственный RecordReader, затем установить параметр spark.hadoop.stream.recordreader.class на имя класса вашего устройства чтения записей (возможно, самый простой).

По этому поводу есть много документации, а также вопросы о переполнении стека. И множество примеров в самом дереве исходных текстов.

person stevel    schedule 23.01.2017

Разобрался, в основном подход, упомянутый Стивом, за исключением того, что форматы ввода и вывода Hadoop уже существуют:

         Job job = new Job();
         ParquetOutputFormat.setWriteSupportClass(job, AvroWriteSupport.class);
         AvroParquetOutputFormat.setSchema(job, MyAvroType.SCHEMA$);
         AvroParquetOutputFormat.setBlockSize(job, 128*1024*1024);
         AvroParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
         AvroParquetOutputFormat.setCompressOutput(job, true);

         sparkContext.textFile("s3://bucket/path_to_legacy_files")
            .map(line -> customLib.convertToAvro(line))
            .mapToPair(record -> new Tuple2<Void, MyAvroType>(null, record))
            .saveAsNewAPIHadoopFile(
                "s3://bucket/destination", 
                Void.class, 
                MyAvroType.class,
                new ParquetOutputFormat<MyAvroType>().getClass(), 
                job.getConfiguration());
person Swaranga Sarma    schedule 23.01.2017