MapReduce считает и находит среднее значение

Я хочу разработать программу в MapReduce, которая получает значения cust_key и balance из файла .tbl. Я объединил 2 значения в строку, а затем отправил ее в Reducer, поэтому я посчитаю cust_key и найду средний баланс на сегмент. Вот почему я добавил сегмент в качестве ключа.

Я хочу разделить строку и разделить 2 значения, чтобы подсчитать ключи cust и суммировать баланс, чтобы найти среднее значение. Но разделенный массив [0] дает мне всю строку, а не первое значение строки. Также разделен array[1] выдает исключение ArrayoutofBounds. Надеюсь, это понятно.

Код ниже

public class MapReduceTest {

        public static class TokenizerMapper extends Mapper<Object, Text, Text, Text>{

         private Text segment = new Text();

         private Text word = new Text();

         private float balance = 0;


         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
           String[] line = value.toString().split("\\|");

           balance = Float.parseFloat(line[5]);

           String cust_key = line[1];

           int nation = Integer.parseInt(line[3]);

           if((balance > 8000) && ( nation < 15) && (nation > 1)){ 

             segment.set(line[6]);

             //word.set(cust_key+","+balance);

             word.set(cust_key+","+balance);

             context.write(segment,word);
           }
         }

       }

    public static class AvgReducer extends Reducer<Text,Text,Text,Text> {


         Text val = new Text();

    public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {

         String cust_key = "";
         float avg,sum = 0;
         int count = 0;
            for(Text v : values){
                 String[] a = v.toString().trim().split(",");

                 cust_key +=a[0];

            }

            val.set(cust_count);

            context.write(key, val);

     }

   }

Входные данные

8794|Customer#000008794|6dnUgJZGX73Kx1idr6|18|28-434-484-9934|7779.30|HOUSEHOLD|deposits detect furiously even requests. furiously ironic packages are slyly into th
8795|Customer#000008795|oA1cLUtWOAIFz5Douypbq1jHv glSE|9|19-829-732-8102|9794.80|BUILDING|totes. blithely unusual theodolites integrate carefully ironic foxes. unusual excuses cajole carefully carefully fi
8796|Customer#000008796|CzCzpV7SDojXUzi4165j,xYJuRv wZzn grYsyZ|24|34-307-411-6825|4323.03|AUTOMOBILE|s. pending, bold accounts above the sometimes express accounts 
8797|Customer#000008797|TOWDryHNNqp8bvgMW6 FAhRoLyG1ldu2bHcJCM6|2|12-517-522-5820|219.78|FURNITURE|ly bold pinto beans can nod blithely quickly regular requests. fluffily even deposits ru
8798|Customer#000008798|bIegyozQ5kzprN|15|25-472-647-6270|6832.96|AUTOMOBILE|es-- silent instructions nag blithely

Трассировки стека

java.lang.Exception: java.lang.ArrayIndexOutOfBoundsException: 1
        at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
        at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
        at MapReduceTest$AvgReducer.reduce(MapReduceTest.java:69)
        at MapReduceTest$AvgReducer.reduce(MapReduceTest.java:1)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
        at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
17/04/12 18:40:33 INFO mapreduce.Job: Job job_local806960399_0001 running in uber mode : false
17/04/12 18:40:33 INFO mapreduce.Job:  map 100% reduce 0%
17/04/12 18:40:33 INFO mapreduce.Job: Job job_local806960399_0001 failed with state FAILED due to: NA
17/04/12 18:40:33 INFO mapreduce.Job: Counters: 35

Обновлять

Редуктор

    public static class AvgReducer extends Reducer<Text,Text,Text,Text> {

    Logger log = Logger.getLogger(AvgReducer.class.getName());

    public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {

            float sumBalance=0,avgBalance = 0;

            int cust_count = 1;

            for(Text v : values){
               String[] a = v.toString().trim().split(",");

               //c2 += " i "+i+" "+a[0]+"\n";

               sumBalance +=Float.parseFloat(a[a.length-1]);

               cust_count++;
            }

            avgBalance = sumBalance / cust_count;


            context.write(key,new Text(avgBalance+" "+cust_count));

     }

   }

Трассировки стека

java.lang.Exception: java.lang.NumberFormatException: For input string: "8991.715 289"

Заранее спасибо.


person Marios Nikolaou    schedule 09.04.2017    source источник
comment
Не могли бы вы попробовать распечатать value? Вероятно, у него недостаточно | для разделения на массив из 7 элементов.   -  person Darshan Mehta    schedule 09.04.2017
comment
Вопрос слишком широкий. Где находится минимально воспроизводимый пример. Если вы потратили на это несколько часов, не могли бы вы потратить несколько минут, чтобы добавить больше информации? В чем проблема, какой ввод/вывод... И, конечно же, не используйте короткие ссылки для публикации чего-либо, я не буду нажимать на эти ссылки, в основном потому, что здешний брандмауэр их заблокирует...   -  person AxelH    schedule 12.04.2017
comment
Можете ли вы предоставить образец входных данных.   -  person Ambrish    schedule 12.04.2017
comment
Рассматривали ли вы для этого использование Hive, Pig или Spark?   -  person OneCricketeer    schedule 12.04.2017
comment
Я не знаю, я должен?   -  person Marios Nikolaou    schedule 12.04.2017
comment
Ваши данные не такие сложные, но 1) Укажите здесь трассировку стека. 2) если вы получаете cust_key +=a[0] за пределы, вы пробовали просто напечатать v? Что это такое?   -  person OneCricketeer    schedule 12.04.2017
comment
Когда я печатаю v, он печатает всю строку, a[0] печатает только ключ клиента, a[1] дает исключение.   -  person Marios Nikolaou    schedule 12.04.2017
comment
Имеет ли значение использование Pattern.quote("|")? value.toString().split(Pattern.quote("|"));   -  person Dan W    schedule 12.04.2017
comment
@DanW Ошибка в редукторе на split(",")   -  person OneCricketeer    schedule 12.04.2017
comment
@MariosNikolaou Вы показали здесь все данные или только образец?   -  person OneCricketeer    schedule 12.04.2017
comment
Это образец, он слишком большой.   -  person Marios Nikolaou    schedule 12.04.2017
comment
Я бы порекомендовал вам выполнить следующую команду hdfs dfs -cat /path/on/hdfs/* | awk -F'|' '{print NF}' | sort -u.   -  person Ambrish    schedule 12.04.2017


Ответы (2)


Pig запускает MapReduce (если настроено таким образом). Это также намного чище, чем возиться с MapReduce, и устанавливается в основных дистрибутивах Hadoop.

A = LOAD 'test.txt' USING PigStorage('|') AS (f1:int,customer_key:chararray,f3:chararray,nation:int,f5:chararray,balance:float,segment:chararray,f7:chararray);
filtered = FILTER A BY balance > 8000 AND (nation > 1 AND nation < 15);
X = FOREACH filtered generate segment,customer_key,balance;

И вывод

\d X
(BUILDING,Customer#000008795,9794.8)

Не уверен, что вам действительно нужно среднее значение здесь, здесь только один элемент, но вам нужно GROUP BY на segment и customer_key, тогда вы можете легко использовать AVG функция.


Если вы лучше знакомы с SQL, Hive также может быть более простым подходом.

(Также работает через MapReduce, если не настроено иначе)

CREATE EXTERNAL TABLE IF NOT EXISTS records (
    f1 INT,
    customer_key STRING, 
    f3 STRING, 
    nation INT,
    f5 STRING,
    balance FLOAT,
    f8 STRING
) ROW FORMAT DELIMETED 
FIELDS TERMINATED BY '|'
LOCATION 'hdfs://path/test.txt';

Тогда это будет что-то вроде этого

SELECT segment, customer_key, AVG(balance)
FROM records
WHERE balance > 8000 AND nation > 1 AND nation < 15
GROUP BY segment, customer_key;

Я бы перешел к примеру Apache Spark, но Spark SQL, по сути, был бы тем же запросом Hive.

person OneCricketeer    schedule 12.04.2017
comment
Я ценю всю помощь, но я должен реализовать ее только с помощью MapReduce. - person Marios Nikolaou; 12.04.2017
comment
Извините, я не понимаю, почему вы хотите написать более 50 строк кода, которые вы могли бы сделать за 10 или меньше. - person OneCricketeer; 12.04.2017
comment
Это небольшая программа для моего проекта. Профессор хочет разработать ее на MapReduce, чтобы лучше учиться :) - person Marios Nikolaou; 12.04.2017
comment
Хорошо, чувак, я проверю нулевые значения перед отправкой текста в редуктор. - person Marios Nikolaou; 12.04.2017
comment
Или если ввода вообще нет... Если вы получите null, вы увидите NullPointerException, а не за пределами. Кроме того, вы не упомянули, как вы выполняете эту работу, но я предполагаю, что hadoop jar - person OneCricketeer; 12.04.2017
comment
да, jar Hadoop. Итак, я проверю, если баланс нулевой и пустой, тогда баланс = 0. - person Marios Nikolaou; 12.04.2017
comment
Ваш картограф должен нормально работать для значений, которые вы предоставили - person OneCricketeer; 12.04.2017
comment
Ваше право, даже если я проверю нулевые значения, я должен получить исключение nullpointerexception - person Marios Nikolaou; 12.04.2017
comment
я попробовал [a.length-1] и это работает, но как я найду количество элементов - person Marios Nikolaou; 12.04.2017
comment
Наконец, используя [0], я получаю все ключи cust, но как найти количество элементов :) - person Marios Nikolaou; 12.04.2017
comment
Разве это не простой счетчик? int count = 0; for (...) { count++; } print(count); ? - person OneCricketeer; 12.04.2017
comment
Да, но когда я вывожу из редуктора, я получаю исключение java.lang.Exception: java.lang.NumberFormatException: для входной строки: 8991.715 i 289 из-за context.write(key, avgBalance+ i +i). В конце я хочу напечатать ключ, средний баланс и количество клиентов. Спасибо - person Marios Nikolaou; 12.04.2017
comment
Вы знаете, что можете тестировать свой код независимо от Hadoop? Я бы порекомендовал это. - person OneCricketeer; 12.04.2017
comment
Как? я пытался поставить пути хаупа, но это не работает - person Marios Nikolaou; 12.04.2017
comment
Нет нет. Я сказал за пределами hadoop. Это означает, что запустите преобразователь, но замените context.write на System.out.println, а затем скопируйте этот вывод и запустите его непосредственно через код редуктора. Это намного проще с Python (или любым другим быстрым языком сценариев). michael-noll.com/tutorials/< /а> - person OneCricketeer; 13.04.2017
comment
Хорошо, чувак, я должен запустить банку Hadoop, и она выведет результат в cmd? - person Marios Nikolaou; 13.04.2017
comment
Нет... Когда я сказал тестировать независимо от хаупа, как вы думаете, зачем вам нужен hadoop jar? - person OneCricketeer; 13.04.2017
comment
потому что я новичок в программах mapreduce, поэтому я просто запущу его, но для ввода? - person Marios Nikolaou; 13.04.2017
comment
Все, что я пытаюсь сказать, это то, что вы должны всегда тестировать репрезентативный небольшой набор данных без Hadoop, прежде чем писать его в MapReduce, чтобы вы могли исправить любые логические ошибки. - person OneCricketeer; 13.04.2017
comment
без обид :) Это будет работать с маппером и редуктором? потому что я должен указать ввод и вывод, какой-нибудь учебник? - person Marios Nikolaou; 13.04.2017
comment
Затем дайте разные входные и выходные данные! Не беги hadoop jar. Создайте полностью отдельный проект и перепишите код без классов Hadoop, только чистый Java. Не требует учебника - person OneCricketeer; 13.04.2017
comment
Я попытался разбить строку в java, и она отлично работает. Я решил это, теперь я столкнулся с другой проблемой, когда я пытаюсь отобразить вывод редуктора, он выдает исключение. Проверьте обновленный - person Marios Nikolaou; 13.04.2017
comment
Редуктор — это не просто шаг. Несколько редукторов запускаются один за другим. avgBalance+" "+cust_count, кажется, производит 8991.715 289, который вы не можете разобрать как число с плавающей запятой - person OneCricketeer; 13.04.2017

Если вы действительно хотите попробовать это в Java MapReduce, попробуйте стандартизировать свои входные данные и быстро отловить ошибки.

Вернуться, чтобы удалить проблемные записи

     public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
       float balance = 0.0;
       String custKey = "";
       String segment = "";
       int nation = 0;

       String[] line = value.toString().split("\\|");
       if (line.length < 7) { 
           System.err.println("map: Not enough records");
           return;
       }
       cust_key = line[1];
       try {
           nation = Integer.parseInt(line[3]);
           balance = Float.parseFloat(line[5]);
       } catch (NumberFormatException e) {
           e.printStackTrace();
           return;
       }

       if(balance > 8000 && (nation < 15 && nation > 1)){ 
         segment.set(line[6]);
         word.set(cust_key + "\t" + balance);
         context.write(segment,word);
       }
  }

Затем редюсер в идеале должен генерировать тот же формат, если вы пытаетесь записать аналогичные выходные записи.

public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {

        float sumBalance=0
        int count = 0;

        for(Text v : values){
           String[] a = v.toString().trim().split("\t");
           if (a.length < 2) {
               System.err.println("reduce: Not enough records");
               continue;
           }

           sumBalance += Float.parseFloat(a[1]);
           count++;
        }

        float avgBalance = count <= 1 ? sumBalance : sumBalance / count;

        context.write(key,new Text(avgBalance + "\t" + count));

 }

(код не проверен)

person OneCricketeer    schedule 13.04.2017