Работа Spark, кажется, плохо распараллеливается

Использование Спарк 1.1

У меня есть работа, которая работает следующим образом:

  1. Читает список папок в заданном корне, распараллеливает список
  2. Для каждой папки прочитайте файлы в ней - это файлы, заархивированные gzip.
  3. Для каждого файла извлеките содержимое — это строки, каждая строка представляет собой одно событие с полями, разделенными табуляцией (TSV)
  4. Создайте единый RDD всех строк.
  5. Преобразуйте TSV в json.

(Теперь строки представляют определенный тип события. Всего 4 типа: Сессия, запрос, рекомендация, пользовательское событие)

  1. Отфильтровать только события сеанса. Выберите только 1:100 из них в соответствии с некоторым полем идентификатора пользователя. Преобразуйте их в пару с ключом, представляющим некоторую структуру вывода (например: тип события/дата/события), а затем запишите ее в FS.
  2. Сделайте то же самое для запросов и пользовательских событий.

(Для рекомендаций выборка не может быть выполнена в соответствии с идентификатором пользователя (поскольку его там нет), но мы знаем, что между запросом и рекомендацией существует отношение 1: 1 на основе поля идентификатора взаимного запроса. Итак:)

  1. Создайте список различных идентификаторов запросов. присоедините этот список к списку рекомендаций на основе идентификатора запроса в качестве ключа, поэтому достигните желаемой фильтрации. Затем выведите сокращенный список в ФС.

Теперь вот моя проблема. Код, который я использую для этих целей, работает в небольших масштабах. Но когда я работаю с относительно большими входными данными и использую кластер из 80 машин с 8 ядрами и 50 ГБ памяти каждая, я вижу, что многие машины не используются, то есть занято только одно ядро ​​(и также только ~ 20%). а памяти только 16гб из 40гб настроено на работу.

Я думаю, что где-то мои преобразования плохо распараллеливаются, но я не уверен, где и почему. Вот большая часть моего кода (я опускаю некоторые вспомогательные функции, которые, как мне казалось, не имеют отношения к проблеме)

 public static void main(String[] args) {

    BasicConfigurator.configure();

    conf[0] = new Conf("local[4]");
    conf[1] = new Conf("spark://hadoop-m:7077");
    Conf configuration = conf[1];

    if (args.length != 4) {
        log.error("Error in parameters. Syntax: <input path> <output_path> <filter_factor> <locality>\nfilter_factor is what fraction of sessions to process. For example, to process 1/100 of sessions, use 100\nlocality should be set to \"local\" in case running on local environment, and to \"remote\" otherwise.");
        System.exit(-1);
    }

    final String inputPath = args[0];
    final String outputPath = args[1];
    final Integer filterFactor;

    if (args[3].equals("local")) {
        configuration = conf[0];
    }

    log.setLevel(Level.DEBUG);
    Logger.getRootLogger().removeAppender("console");
    final SparkConf conf = new SparkConf().setAppName("phase0").setMaster(configuration.getMaster());
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    conf.set("spark.kryo.registrator", "com.doit.customer.dataconverter.MyRegistrator");
    final JavaSparkContext sc = new JavaSparkContext(conf);
    if (configuration.getMaster().contains("spark:")) {
        sc.addJar("/home/hadoop/hadoop-install/phase0-1.0-SNAPSHOT-jar-with-dependencies.jar");
    }
    try {
        filterFactor = Integer.parseInt(args[2]);
        // read all folders from root
        Path inputPathObj = new Path(inputPath);
        FileSystem fs = FileSystem.get(inputPathObj.toUri(), new Configuration(true));
        FileStatus[] statusArr = fs.globStatus(inputPathObj);
        List<FileStatus> statusList = Arrays.asList(statusArr);

        List<String> pathsStr = convertFileStatusToPath(statusList);

        JavaRDD<String> paths = sc.parallelize(pathsStr);

        // read all files from each folder
        JavaRDD<String> filePaths = paths.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
            @Override
            public Iterable<String> call(Iterator<String> pathsIterator) throws Exception {
                List<String> filesPath = new ArrayList<String>();
                if (pathsIterator != null) {
                    while (pathsIterator.hasNext()) {
                        String currFolder = pathsIterator.next();
                        Path currPath = new Path(currFolder);
                        FileSystem fs = FileSystem.get(currPath.toUri(), new Configuration(true));
                        FileStatus[] files = fs.listStatus(currPath);
                        List<FileStatus> filesList = Arrays.asList(files);
                        List<String> filesPathsStr = convertFileStatusToPath(filesList);
                        filesPath.addAll(filesPathsStr);
                    }
                }
                return filesPath;
            }
        });


        // Transform list of files to list of all files' content in lines
        JavaRDD<String> typedData = filePaths.map(new Function<String, List<String>>() {
            @Override
            public List<String> call(String filePath) throws Exception {
                Tuple2<String, List<String>> tuple = null;
                try {
                    String fileType = null;
                    List<String> linesList = new ArrayList<String>();
                    Configuration conf = new Configuration();
                    CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
                    Path path = new Path(filePath);
                    fileType = getType(path.getName());

                    // filter non-trc files
                    if (!path.getName().startsWith("1")) {
                        return linesList;
                    }

                    CompressionCodec codec = compressionCodecs.getCodec(path);
                    FileSystem fs = path.getFileSystem(conf);
                    InputStream in = fs.open(path);
                    if (codec != null) {
                        in = codec.createInputStream(in);
                    } else {
                        throw new IOException();
                    }

                    BufferedReader r = new BufferedReader(new InputStreamReader(in, "UTF-8"), BUFFER_SIZE);

                    // This line will not be added to the list ,
                    // which is what we want - filter the header row
                    String line = r.readLine();

                    // Read all lines
                    while ((line = r.readLine()) != null) {
                        try {
                            String sliceKey = getSliceKey(line, fileType);
                            // Adding event type and output slice key as additional fields
                            linesList.add(fileType + "\t" + sliceKey + "\t" + line);
                        } catch(ParseException e) {
                        }
                    }

                    return linesList;
                } catch (Exception e) { // Filtering of files whose reading went wrong
                    log.error("Reading of the file " + filePath + " went wrong: " + e.getMessage());
                    return new ArrayList();
                }
            }
            // flatten to one big list with all the lines
        }).flatMap(new FlatMapFunction<List<String>, String>() {
            @Override
            public Iterable<String> call(List<String> strings) throws Exception {
                return strings;
            }
        });

        // convert tsv to json

        JavaRDD<ObjectNode> jsons = typedData.mapPartitions(new FlatMapFunction<Iterator<String>, ObjectNode>() {
            @Override
            public Iterable<ObjectNode> call(Iterator<String> stringIterator) throws Exception {
                List<ObjectNode> res = new ArrayList<>();
                while(stringIterator.hasNext()) {
                    String currLine = stringIterator.next();
                    Iterator<String> i = Splitter.on("\t").split(currLine).iterator();
                    if (i.hasNext()) {
                        String type = i.next();
                        ObjectNode json = convert(currLine, type, filterFactor);
                        if(json != null) {
                            res.add(json);
                        }
                    }
                }
                return res;
            }
        }).cache();


        createOutputType(jsons, "Session", outputPath, null);
        createOutputType(jsons, "UserEvent", outputPath, null);
        JavaRDD<ObjectNode> requests = createOutputType(jsons, "Request", outputPath, null);


        // Now leave only the set of request ids - to inner join with the recommendations
        JavaPairRDD<String,String> requestsIds = requests.mapToPair(new PairFunction<ObjectNode, String, String>() {
            @Override
            public Tuple2<String, String> call(ObjectNode jsonNodes) throws Exception {
                String id = jsonNodes.get("id").asText();
                return new Tuple2<String, String>(id,id);
            }
        }).distinct();

        createOutputType(jsons,"RecommendationList", outputPath, requestsIds);

    } catch (IOException e) {
        log.error(e);
        System.exit(1);
    } catch (NumberFormatException e) {
        log.error("filter factor is not a valid number!!");
        System.exit(-1);
    }

    sc.stop();

}

private static JavaRDD<ObjectNode> createOutputType(JavaRDD jsonsList, final String type, String outputPath,JavaPairRDD<String,String> joinKeys) {

    outputPath = outputPath + "/" + type;

    JavaRDD events = jsonsList.filter(new Function<ObjectNode, Boolean>() {
        @Override
        public Boolean call(ObjectNode jsonNodes) throws Exception {
            return jsonNodes.get("type").asText().equals(type);
        }
    });


    // This is in case we need to narrow the list to match some other list of ids... Recommendation List, for example... :)
    if(joinKeys != null) {
        JavaPairRDD<String,ObjectNode> keyedEvents = events.mapToPair(new PairFunction<ObjectNode, String, ObjectNode>() {
            @Override
            public Tuple2<String, ObjectNode> call(ObjectNode jsonNodes) throws Exception {
                return new Tuple2<String, ObjectNode>(jsonNodes.get("requestId").asText(),jsonNodes);
            }
        });

        JavaRDD<ObjectNode> joinedEvents = joinKeys.join(keyedEvents).values().map(new Function<Tuple2<String, ObjectNode>, ObjectNode>() {
           @Override
           public ObjectNode call(Tuple2<String, ObjectNode> stringObjectNodeTuple2) throws Exception {
               return stringObjectNodeTuple2._2;
           }
        });
        events = joinedEvents;
    }


    JavaPairRDD<String,Iterable<ObjectNode>> groupedEvents = events.mapToPair(new PairFunction<ObjectNode, String, ObjectNode>() {
        @Override
        public Tuple2<String, ObjectNode> call(ObjectNode jsonNodes) throws Exception {
            return new Tuple2<String, ObjectNode>(jsonNodes.get("sliceKey").asText(),jsonNodes);
        }
    }).groupByKey();
    // Add convert jsons to strings and add "\n" at the end of each

    JavaPairRDD<String, String> groupedStrings = groupedEvents.mapToPair(new PairFunction<Tuple2<String, Iterable<ObjectNode>>, String, String>() {
        @Override
        public Tuple2<String, String> call(Tuple2<String, Iterable<ObjectNode>> content) throws Exception {
            String string = jsonsToString(content._2);
            log.error(string);
            return new Tuple2<>(content._1, string);
        }
    });
    groupedStrings.saveAsHadoopFile(outputPath, String.class, String.class, KeyBasedMultipleTextOutputFormat.class);
    return events;
}

// Notice the special case of if(joinKeys != null) in which I join the recommendations with request ids.

Наконец, команда, которую я использую для запуска задания Spark:

spark-submit --class com.doit.customer.dataconverter.Phase0 --driver-cores 8 --total-executor-cores 632 --driver-memory 40g --executor-memory 40G --deploy-mode cluster /home/hadoop/hadoop-install/phase0-1.0-SNAPSHOT-jar-with-dependencies.jar gs://input/2014_07_31* gs://output/2014_07_31 100 remote

person Yaniv Donenfeld    schedule 19.11.2014    source источник


Ответы (1)


Ваши исходные разделы основаны на наборе папок в вашем корне (sc.parallelize(pathsStr)). В вашем потоке есть два шага, которые могут значительно разбалансировать ваши разделы: 1) чтение списка файлов в каждой папке, если в некоторых папках файлов намного больше, чем в других папках; 2) чтение строк TSV из каждого файла, если в одних файлах строк намного больше, чем в других.

Если ваши файлы примерно одинакового размера, но в одних папках их намного больше, чем в других, вы можете перебалансировать свои разделы после того, как соберете имена файлов. После установки начального значения для filePaths попробуйте добавить эту строку:

filePaths = filePaths.repartition(sc.defaultParallelism());

Это перетасует собранные имена файлов в сбалансированные разделы.

Если у вас есть дисбаланс из-за того, что некоторые файлы значительно больше других, вы можете попробовать перебалансировать свой RDD typedData, аналогичным образом вызвав перераспределение на нем, хотя это будет намного дороже, так как оно перетасует все ваши данные TSV.

В качестве альтернативы, если вы перебалансируете пути к файлам и по-прежнему имеете некоторый дисбаланс разделов, вызванный наличием нескольких файлов несколько большего размера, которые заканчиваются в нескольких разделах, вы можете получить немного лучшую производительность, используя большее число в аргументе перераспределения, например как умножить на четыре, чтобы получить в четыре раза больше разделов, чем ядер. Это немного увеличит стоимость связи, но может быть выигрышным, если обеспечит лучшую балансировку результирующих размеров разделов в typedData.

person Jim McBeath    schedule 21.11.2014