Цель :
Внедрите HashPartition и проверьте количество редукторов, которые создаются автоматически.
Для этой цели всегда приветствуется любая помощь и любой пример кода.
Что я сделал :
Я запустил программу уменьшения карты с Hash Partition, реализованную в CSV-файле размером 250 МБ. Но я все еще вижу, что hdfs использует только 1 редюсер для агрегации. Если я правильно понял, hdfs должен автоматически создавать разделы и равномерно распределять данные. Тогда n редукторов будут работать на этих n созданных разделах. Но я не вижу, чтобы это происходило. Может ли кто-нибудь помочь мне достичь этого с помощью Hash Partitions. Я не хочу определять количество разделов.
Код картографа:
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] line = value.toString().split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
String airlineid = line[7];
//int tailno = Integer.parseInt(line[10].replace("\"", ""));
String tailno = line[9].replace("\"", "");
if (tailno.length() != 0 ){
//System.out.println(airlineid + " " + tailno + " " + tailno.length());
context.write(new Text(airlineid), new Text(tailno));
}
}
}
Код редуктора:
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int count=0;
for (Text value : values) {
count ++;
}
//context.write(key, new IntWritable(maxValue));
context.write(key, new IntWritable(count));
}
Код разделителя:
public class FlightPartition extends Partitioner<Text, Text> {
public int getPartition(Text key, Text value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
Драйвер:
public class Flight
{
public static void main (String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Flight");
job.setJarByClass(Flight.class);
job.setMapperClass(FlightMapper.class);
job.setReducerClass(FlightReducer.class);
job.setPartitionerClass(FlightPartition.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Журнал:
15/11/09 06:14:14 INFO mapreduce.Job: Counters: 50
File System Counters
FILE: Number of bytes read=7008211
FILE: Number of bytes written=14438683
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=211682444
HDFS: Number of bytes written=178
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Killed map tasks=2
Launched map tasks=5
Launched reduce tasks=1
Data-local map tasks=5
Total time spent by all maps in occupied slots (ms)=2235296
Total time spent by all reduces in occupied slots (ms)=606517
Total time spent by all map tasks (ms)=2235296
Total time spent by all reduce tasks (ms)=606517
Total vcore-seconds taken by all map tasks=2235296
Total vcore-seconds taken by all reduce tasks=606517
Total megabyte-seconds taken by all map tasks=2288943104
Total megabyte-seconds taken by all reduce tasks=621073408
Map-Reduce Framework
Map input records=470068
Map output records=467281
Map output bytes=6073643
Map output materialized bytes=7008223
Input split bytes=411
Combine input records=0
Combine output records=0
Reduce input groups=15
Reduce shuffle bytes=7008223
Reduce input records=467281
Reduce output records=15
Spilled Records=934562
Shuffled Maps =3
Failed Shuffles=0
Merged Map outputs=3
GC time elapsed (ms)=3701
CPU time spent (ms)=277080
Physical memory (bytes) snapshot=590581760
Virtual memory (bytes) snapshot=3196801024
Total committed heap usage (bytes)=441397248
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=211682033
File Output Format Counters
Bytes Written=178