HashPartition в MapReduce

Цель :

Внедрите HashPartition и проверьте количество редукторов, которые создаются автоматически.

Для этой цели всегда приветствуется любая помощь и любой пример кода.

Что я сделал :

Я запустил программу уменьшения карты с Hash Partition, реализованную в CSV-файле размером 250 МБ. Но я все еще вижу, что hdfs использует только 1 редюсер для агрегации. Если я правильно понял, hdfs должен автоматически создавать разделы и равномерно распределять данные. Тогда n редукторов будут работать на этих n созданных разделах. Но я не вижу, чтобы это происходило. Может ли кто-нибудь помочь мне достичь этого с помощью Hash Partitions. Я не хочу определять количество разделов.

Код картографа:

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {


        String[] line = value.toString().split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
        String airlineid = line[7];
        //int tailno = Integer.parseInt(line[10].replace("\"", ""));
        String tailno = line[9].replace("\"", "");

        if (tailno.length() != 0 ){
        //System.out.println(airlineid + " " + tailno + " " + tailno.length());
        context.write(new Text(airlineid), new Text(tailno));
        }


    }       

}

Код редуктора:

public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {


        int count=0;

        for (Text value : values) {
        count ++;
        }

        //context.write(key, new IntWritable(maxValue));
        context.write(key, new IntWritable(count));

    }

Код разделителя:

public class FlightPartition extends Partitioner<Text, Text> {

     public int getPartition(Text key, Text value, int numReduceTasks) {
         return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
     }

}

Драйвер:

public class Flight
{

            public static void main (String[] args) throws Exception
            {

                Configuration conf = new Configuration();
                Job job = Job.getInstance(conf, "Flight");
                job.setJarByClass(Flight.class);

                job.setMapperClass(FlightMapper.class);
                job.setReducerClass(FlightReducer.class);
                job.setPartitionerClass(FlightPartition.class);

                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));

                System.exit(job.waitForCompletion(true) ? 0 : 1);

             }
}

Журнал:

15/11/09 06:14:14 INFO mapreduce.Job: Counters: 50
    File System Counters
        FILE: Number of bytes read=7008211
        FILE: Number of bytes written=14438683
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=211682444
        HDFS: Number of bytes written=178
        HDFS: Number of read operations=12
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Killed map tasks=2
        Launched map tasks=5
        Launched reduce tasks=1
        Data-local map tasks=5
        Total time spent by all maps in occupied slots (ms)=2235296
        Total time spent by all reduces in occupied slots (ms)=606517
        Total time spent by all map tasks (ms)=2235296
        Total time spent by all reduce tasks (ms)=606517
        Total vcore-seconds taken by all map tasks=2235296
        Total vcore-seconds taken by all reduce tasks=606517
        Total megabyte-seconds taken by all map tasks=2288943104
        Total megabyte-seconds taken by all reduce tasks=621073408
    Map-Reduce Framework
        Map input records=470068
        Map output records=467281
        Map output bytes=6073643
        Map output materialized bytes=7008223
        Input split bytes=411
        Combine input records=0
        Combine output records=0
        Reduce input groups=15
        Reduce shuffle bytes=7008223
        Reduce input records=467281
        Reduce output records=15
        Spilled Records=934562
        Shuffled Maps =3
        Failed Shuffles=0
        Merged Map outputs=3
        GC time elapsed (ms)=3701
        CPU time spent (ms)=277080
        Physical memory (bytes) snapshot=590581760
        Virtual memory (bytes) snapshot=3196801024
        Total committed heap usage (bytes)=441397248
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=211682033
    File Output Format Counters 
        Bytes Written=178

person ritabrata saha    schedule 09.11.2015    source источник


Ответы (1)


Проверьте свой файл mapred-default.xml и найдите

mapreduce.job.reduces свойство. Измените значение на > 1, чтобы увеличить количество редукторов в вашем кластере. Это свойство будет игнорироваться, если когда mapreduce.jobtracker.address имеет значение «local».

Вы можете переопределить свойство по умолчанию в java с помощью

job.setNumReduceTasks(3)

Взгляните на этот статья для полного списка mapred-default.xml от Apache.

Сколько сокращений? (из Apache)

Правильное количество сокращений равно 0,95 или 1,75, умноженное на ( * ).

В версии 0.95 все сокращения могут запускаться немедленно и начинать передачу выходных данных карты по мере ее завершения. В версии 1.75 более быстрые узлы завершат свой первый раунд сокращений и запустят вторую волну сокращений, выполняя гораздо лучшую работу по балансировке нагрузки.

Увеличение числа редукций увеличивает накладные расходы платформы, но повышает балансировку нагрузки и снижает стоимость сбоев.

Сколько карт?

Количество карт обычно определяется общим размером входных данных, то есть общим количеством блоков входных файлов.

Правильный уровень параллелизма для карт, по-видимому, составляет около 10-100 карт на узел, хотя он был установлен до 300 карт для задач с очень низкой нагрузкой на процессор. Настройка задачи занимает некоторое время, поэтому лучше всего, если на выполнение карт потребуется хотя бы минута.

Таким образом, если вы ожидаете 10 ТБ входных данных и имеете размер блока 128 МБ, вы получите 82 000 карт, если только Configuration.set(MRJobConfig.NUM_MAPS, int) (который только дает подсказку для платформы) используется для установки это даже выше.

Взгляните на Apache Map Reduce Учебник

person Ravindra babu    schedule 09.11.2015
comment
Привет Равиндра, я нашел свойства, как вы упомянули. Но он говорит: количество задач сокращения по умолчанию на задание. Обычно устанавливается на 99 % от емкости сокращения кластера, так что в случае сбоя узла сокращения все равно могут выполняться за одну волну. Игнорируется, если адрес mapreduce.jobtracker.address является локальным. . Поскольку я использую псевдорежим, я вижу: ‹свойство› ‹имя›mapreduce.jobtracker.address‹/имя› ‹значение›local‹/значение› ‹описание›хост и порт, на которых работает трекер заданий MapReduce. Если локально, то задания запускаются внутри процесса как единая задача карты и редукции. ‹/описание› ‹/свойство› - person ritabrata saha; 10.11.2015
comment
Как я могу изменить это в своей среде? - person ritabrata saha; 10.11.2015
comment
Я изменил mapreduce.job.reduces=3 и запустил свою работу. На этот раз он показывает, что запущено сокращение задач = 2. Следовательно, могу ли я сделать вывод, что любое значение, которое вы установили в mapred-default.xml, является максимально возможным значением для этого кластера? Пожалуйста, подтвердите.. - person ritabrata saha; 10.11.2015
comment
По знаниям ДА. Вы можете проверить, используя job.setNumReduceTasks - person Ravindra babu; 10.11.2015
comment
Другой вопрос: теперь мои карты mapreduce.job.reduces=3 и mapreduce.job.map=2. И вывод показывает (набор данных 500 МБ): Запущенные задачи карты = 7 и Запущенные задачи уменьшения = 2. Можете ли вы объяснить, как количество задач карты увеличивается, чем то, что указано как максимальное значение по умолчанию в файле конфигурации, для меня, пожалуйста? - person ritabrata saha; 10.11.2015
comment
wiki.apache.org/hadoop/HowManyMapsAndReduces. Это зависит от размера блока файла dfs .of входного файла - person Ravindra babu; 10.11.2015
comment
Ссылка говорит о hadoop 1.x. вы можете найти новые параметры для 2.x на веб-сайте apache. - person Ravindra babu; 10.11.2015
comment
Спасибо за реф. Я сделал уменьшение = 10. Но все же дальше 2-х редукторов дело не идет. Я думаю, это потому, что в наборе данных меньше разных ключей. Я добавлю еще несколько разных ключей, а затем попробую. На данный момент я в порядке с выходом. Нужно больше экспериментов :) - person ritabrata saha; 10.11.2015
comment
Если вы напишете собственный разделитель, вы сможете этого добиться. Но давайте оставим решение самому фреймворку hadoop. - person Ravindra babu; 10.11.2015