Лучший способ итерации / потоковой передачи Spark Dataframe

У меня есть задание Spark, которое читает файл паркета с примерно 150 000 000 записей ключ/значение.

    SparkConf conf = new SparkConf();
    conf.setAppName("Job");
    JavaSparkContext jsc = new JavaSparkContext(conf);
    SQLContext sql = new SQLContext(jsc);
    DataFrame df = sql.read().parquet(path);

Моя цель - записать пары ключ/значение в HBase, но у меня возникают проблемы с памятью, и я подозреваю, что это не лучший способ сделать это. Я хотел бы перенести вычисления в кластер, но не могу понять, как пропустить часть сбора. Прямо сейчас мой код выглядит так:

HBaseClient client = HbaseWrapper.initClient();
   df.collectAsList().stream().forEach(row -> {
            try {
                HbaseWrapper.putRows(client, row);
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
   jsc.stop();

И я попытался сначала собрать в виде списка без потоковой передачи, а затем записать это, но это также занимает вечность.

Любые идеи приветствуются.


person C.A    schedule 14.03.2016    source источник


Ответы (1)


Вы получаете ошибку OOM, потому что collectAsList отправляет все данные драйверу.

Чтобы решить, вы можете использовать foreachPartitions, поэтому вы будете параллельно передавать Hbase.

    df.toJavaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() {

        @Override
        public void call(Iterator<Row> t) throws Exception {
            try {
                HBaseClient client = HbaseWrapper.initClient();
                while(t.hasNext()){
                    Row row = t.next();
                    HbaseWrapper.putRows(client, row);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    });
person Igor Berman    schedule 14.03.2016