Не удалось идентифицировать ошибку в моем коде соединения Reducer

У меня есть два набора данных:
пользователи:

Bobby 06 Amsterdam
Sunny 07 Rotterdam
Steven 08 Liverpool
Jamie 23 Liverpool
Macca 91 Liverpool
Messi 10 Barcelona
Pique 04 Barcelona
Suarez 09 Barcelona
Neymar 11 brazil
Klopp 12 Liverpool

журналы пользователей:

Sunny NewPlayer 12.23.14.421
Klopp Crazy 88.33.44.555
Bobby NewPlayer 99.12.11.222
Steven Captain 99.55.66.777
Jamie Local 88.99.33.232
Suarez Spain 77.55.66.444

Я хочу присоединиться к этим двум наборам данных, используя редукторное соединение. Я написал свои классы таким образом:

MapperClass:

Public class MapperClass {
    public static class UserMap extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line     = value.toString();
            String[] tokens = line.split(" ");
            String name     = tokens[0];
            String city     = tokens[2];
            context.write(new Text(name), new Text("UserFile" + "\t" + city));
        }
    }

    public static class UserLogs extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line     = value.toString();
            String[] tokens = line.split(" ");
            String name     = tokens[0];
            String ip       = tokens[2];
            context.write(new Text(name), new Text("UserLogs" + "\t" + ip));
        }
    }
}

Класс редуктора:

public class ReducerClass extends Reducer<Text, Text, Text, Text>{
    @Override
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        String city = null;
        String ip   = null;
        for(Text t: values) {
            String[] parts = t.toString().split("\t");
            if(parts[0].equals("UserFile")) {
                city = parts[1];
            }
            if(parts[0].equals("UserLogs")) {
                ip = parts[1];
            } else {
                ip = "IP Address not found";
            }
        }
        context.write(key, new Text(city + "\t" + ip));
    }
}

Класс водителя:

public class MainClass {
    public static void main(String[] args)throws IOException, InterruptedException, ClassNotFoundException {
        Job job = new Job();
        job.setJarByClass(MainClass.class);
        job.setOutputKeyClass(Text.class);
        job.setReducerClass(ReducerClass.class);
        job.setOutputValueClass(Text.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, UserMap.class);
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, UserLogs.class);
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        System.exit(job.waitForCompletion(true)?0:1);
    }
}

Вывод должен быть таким:

Bobby   Amsterdam 99.12.11.222
Sunny   Rotterdam  12.23.14.421
Klopp    Liverpool  88.33.44.555
Steven  Liverpool   99.55.66.777
Jamie    Liverpool    88.99.33.232
Suarez  Barcelona   77.55.66.444

Вместо этого я получаю вывод следующим образом:

Bobby   Amsterdam       IP Address not found
Jamie   Liverpool       88.99.33.232
Klopp   Liverpool       IP Address not found
Macca   Liverpool       IP Address not found
Messi   Barcelona       IP Address not found
Neymar  brazil IP Address not found
Pique   Barcelona       IP Address not found
Steven  Liverpool       99.55.66.777
Suarez  Barcelona       IP Address not found
Sunny   Rotterdam       12.23.14.421

Я не мог понять, какую ошибку я сделал здесь. Может ли кто-нибудь помочь мне в решении этой проблемы. Любая помощь очень ценится.


person Metadata    schedule 05.11.2016    source источник


Ответы (1)


В вашем редукторе ошибка, он переопределяет IP-адрес в зависимости от values порядка. Попробуй это:

public class ReducerClass extends Reducer<Text, Text, Text, Text>{
    @Override
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        String city = null;
        String ip   = null;
        for(Text t: values) {
            String[] parts = t.toString().split("\t");
            if(parts[0].equals("UserFile")) {
                city = parts[1];
            } else if(parts[0].equals("UserLogs")) {
                ip = parts[1];
            }
        }
        if (ip != null && city != null) {
            context.write(key, new Text(city + "\t" + ip));
        }
    }
}
person Mariusz    schedule 05.11.2016
comment
Исправил это и работал как шарм. Спасибо большое Мариуш. - person Metadata; 06.11.2016