Q: Преобразование Avro в Parquet в памяти

Я получаю записи Avro от Kafka. Я хочу преобразовать эти записи в файлы Parquet. Я слежу за этой записью в блоге: http://blog.cloudera.com/blog/2014/05/how-to-convert-existing-data-into-parquet/

Код пока выглядит примерно так:

final String fileName
SinkRecord record, 
final AvroData avroData

final Schema avroSchema = avroData.fromConnectSchema(record.valueSchema());
CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;

int blockSize = 256 * 1024 * 1024;
int pageSize = 64 * 1024;

Path path = new Path(fileName);
writer = new AvroParquetWriter<>(path, avroSchema, compressionCodecName, blockSize, pageSize);

Теперь это выполнит преобразование Avro в Parquet, но файл Parquet будет записан на диск. Мне было интересно, есть ли более простой способ просто сохранить файл в памяти, чтобы мне не приходилось управлять временными файлами на диске. Спасибо


person user1077071    schedule 22.09.2016    source источник


Ответы (2)


"but it will write the Parquet file to the disk"
"if there was an easier way to just keep the file in memory"

Из ваших запросов я понял, что вы не хотите записывать частичные файлы в паркет. Если вы хотите, чтобы весь файл был записан на диск в формате паркета, а временные файлы находились в памяти, вы можете использовать комбинацию файла с отображением памяти и формата паркета.

Запишите свои данные в файл с отображением памяти, после завершения записи преобразуйте байты в формат паркета и сохраните на диск.

Взгляните на MappedByteBuffer.

person Krishas    schedule 05.10.2016

Пожалуйста, проверьте мой блог, https://yanbin.blog/convert-apache-avro-to-parquet-format-in-java/ при необходимости переведите на английский

package yanbin.blog;
 
import org.apache.parquet.io.DelegatingPositionOutputStream;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;
 
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
 
public class InMemoryOutputFile implements OutputFile {
    private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
    @Override
    public PositionOutputStream create(long blockSizeHint) throws IOException { // Mode.CREATE calls this method
        return new InMemoryPositionOutputStream(baos);
    }
 
    @Override
    public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
        return null;
    }
 
    @Override
    public boolean supportsBlockSize() {
        return false;
    }
 
    @Override
    public long defaultBlockSize() {
        return 0;
    }
 
    public byte[] toArray() {
        return baos.toByteArray();
    }
 
    private static class InMemoryPositionOutputStream extends DelegatingPositionOutputStream {
 
        public InMemoryPositionOutputStream(OutputStream outputStream) {
            super(outputStream);
        }
 
        @Override
        public long getPos() throws IOException {
            return ((ByteArrayOutputStream) this.getStream()).size();
        }
    }
}
    public static <T extends SpecificRecordBase> void writeToParquet(List<T> avroObjects) throws IOException {
        Schema avroSchema = avroObjects.get(0).getSchema();
        GenericData genericData = GenericData.get();
        genericData.addLogicalTypeConversion(new TimeConversions.DateConversion());
        InMemoryOutputFile outputFile = new InMemoryOutputFile();
        try (ParquetWriter<Object> writer = AvroParquetWriter.builder(outputFile)
                .withDataModel(genericData)
                .withSchema(avroSchema)
                .withCompressionCodec(CompressionCodecName.SNAPPY)
                .withWriteMode(ParquetFileWriter.Mode.CREATE)
                .build()) {
            avroObjects.forEach(r -> {
                try {
                    writer.write(r);
                } catch (IOException ex) {
                    throw new UncheckedIOException(ex);
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        }
 
        // dump memory data to file for testing
        Files.write(Paths.get("./users-memory.parquet"), outputFile.toArray());
    }

Тестовые данные из памяти

$ parquet-tools cat --json users-memory.parquet
$ parquet-tools schema users-memory.parquet
person Yanbin    schedule 24.02.2021