Hadoop Уменьшить количество входных записей = 0

Я новичок в Hadoop, мой код уменьшения карты работает, но не дает никакого результата. Вот информация о map-reduce:

16/09/20 13:11:40 INFO mapred.JobClient: Job complete: job_201609081210_0078
16/09/20 13:11:40 INFO mapred.JobClient: Counters: 28
16/09/20 13:11:40 INFO mapred.JobClient:   Map-Reduce Framework
16/09/20 13:11:40 INFO mapred.JobClient:     Spilled Records=0
16/09/20 13:11:40 INFO mapred.JobClient:     Map output materialized bytes=1362
16/09/20 13:11:40 INFO mapred.JobClient:     Reduce input records=0
16/09/20 13:11:40 INFO mapred.JobClient:     Virtual memory (bytes)   snapshot=466248720384
16/09/20 13:11:40 INFO mapred.JobClient:     Map input records=852032443
16/09/20 13:11:40 INFO mapred.JobClient:     SPLIT_RAW_BYTES=29964
16/09/20 13:11:40 INFO mapred.JobClient:     Map output bytes=0
16/09/20 13:11:40 INFO mapred.JobClient:     Reduce shuffle bytes=1362
16/09/20 13:11:40 INFO mapred.JobClient:     Physical memory (bytes) snapshot=57472311296
16/09/20 13:11:40 INFO mapred.JobClient:     Reduce input groups=0
16/09/20 13:11:40 INFO mapred.JobClient:     Combine output records=0
16/09/20 13:11:40 INFO mapred.JobClient:     Reduce output records=0
16/09/20 13:11:40 INFO mapred.JobClient:     Map output records=0
16/09/20 13:11:40 INFO mapred.JobClient:     Combine input records=0
16/09/20 13:11:40 INFO mapred.JobClient:     CPU time spent (ms)=2375210
16/09/20 13:11:40 INFO mapred.JobClient:     Total committed heap usage (bytes)=47554494464
16/09/20 13:11:40 INFO mapred.JobClient:   File Input Format Counters
16/09/20 13:11:40 INFO mapred.JobClient:     Bytes Read=15163097088
16/09/20 13:11:40 INFO mapred.JobClient:   FileSystemCounters
16/09/20 13:11:40 INFO mapred.JobClient:     HDFS_BYTES_READ=15163127052
16/09/20 13:11:40 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=13170190
16/09/20 13:11:40 INFO mapred.JobClient:     FILE_BYTES_READ=6
16/09/20 13:11:40 INFO mapred.JobClient:   Job Counters
16/09/20 13:11:40 INFO mapred.JobClient:     Launched map tasks=227
16/09/20 13:11:40 INFO mapred.JobClient:     Launched reduce tasks=1
16/09/20 13:11:40 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=759045
16/09/20 13:11:40 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
16/09/20 13:11:40 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=1613259
16/09/20 13:11:40 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
16/09/20 13:11:40 INFO mapred.JobClient:     Data-local map tasks=227
16/09/20 13:11:40 INFO mapred.JobClient:   File Output Format Counters
16/09/20 13:11:40 INFO mapred.JobClient:     Bytes Written=0

Вот код кода, запускающего задание mapreduce:

import java.io.File;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class mp{

public static void main(String[] args) throws Exception {

    Job job1 = new Job();
    job1.setJarByClass(mp.class);
    FileInputFormat.addInputPath(job1, new Path(args[0]));                  
    String oFolder = args[0] + "/output";
    FileOutputFormat.setOutputPath(job1, new Path(oFolder));
    job1.setMapperClass(TransMapper1.class);
    job1.setReducerClass(TransReducer1.class);
    job1.setMapOutputKeyClass(LongWritable.class);
    job1.setMapOutputValueClass(DnaWritable.class);
    job1.setOutputKeyClass(LongWritable.class);
    job1.setOutputValueClass(Text.class);
}
}

А вот и класс картографа (TransMapper1):

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TransMapper1 extends  Mapper<LongWritable, Text, LongWritable, DnaWritable> {

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        LongWritable bamWindow = new LongWritable(Long.parseLong(tokenizer.nextToken()));
        LongWritable read = new LongWritable(Long.parseLong(tokenizer.nextToken()));
        LongWritable refWindow = new LongWritable(Long.parseLong(tokenizer.nextToken()));
        IntWritable chr = new IntWritable(Integer.parseInt(tokenizer.nextToken()));
        DoubleWritable dist = new DoubleWritable(Double.parseDouble(tokenizer.nextToken()));
        DnaWritable dnaW = new DnaWritable(bamWindow,read,refWindow,chr,dist);
        context.write(bamWindow,dnaW);
    }
}

А это класс Reducer (TransReducer1):

import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TransReducer1 extends Reducer<LongWritable, DnaWritable, LongWritable, Text> {

@Override
 public void reduce(LongWritable key, Iterable<DnaWritable> values, Context context) throws IOException, InterruptedException {

ArrayList<DnaWritable> list = new ArrayList<DnaWritable>();
double minDist = Double.MAX_VALUE;
    for (DnaWritable value : values) {
            long bamWindow = value.getBamWindow().get();
            long read = value.getRead().get();
            long refWindow = value.getRefWindow().get();
            int chr = value.getChr().get();
            double dist = value.getDist().get();
            if (dist > minDist)
                continue;
            else
            if (dist < minDist)
                 list.clear();
            list.add(new DnaWritable(bamWindow,read,refWindow,chr,dist));
            minDist = Math.min(minDist, value.getDist().get());
        }
        for(int i = 0; i < list.size(); i++){
            context.write(new LongWritable(list.get(i).getRead().get()),new Text(new DnaWritable(list.get(i).getBamWindow(),list.get(i).getRead(),list.get(i).getRefWindow(),list.get(i).getChr(),list.get(i).getDist()).toString()));
        }
    }
}

И это класс DnaWritable (я не стал немного укорачивать раздел импорта):

public class DnaWritable implements Writable {
    LongWritable bamWindow;
    LongWritable read;
    LongWritable refWindow;
    IntWritable chr;
    DoubleWritable dist;

    public DnaWritable(LongWritable bamWindow, LongWritable read, LongWritable refWindow, IntWritable chr, DoubleWritable dist){

    this.bamWindow = bamWindow;
    this.read = read;
    this.refWindow = refWindow;
    this.chr = chr;
    this.dist = dist;
}

public DnaWritable(long bamWindow, long read, long refWindow, int chr, double dist){
    this.bamWindow = new LongWritable(bamWindow);
    this.read = new LongWritable(read);
    this.refWindow = new LongWritable(refWindow);
    this.chr = new IntWritable(chr);
    this.dist = new DoubleWritable(dist);
}

@Override
public void write(DataOutput dataOutput) throws IOException {
    bamWindow.write(dataOutput);
    read.write(dataOutput);
    refWindow.write(dataOutput);
    chr.write(dataOutput);
    dist.write(dataOutput);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
        bamWindow.readFields(dataInput);
        read.readFields(dataInput);
        refWindow.readFields(dataInput);
        chr.readFields(dataInput);
        dist.readFields(dataInput);
    }
}

Любая помощь будет очень признательна .. Спасибо


person Hamid_UMB    schedule 20.09.2016    source источник
comment
Можете ли вы предоставить некоторые образцы данных?   -  person cody123    schedule 20.09.2016
comment
Да, пример входных данных будет здесь, чтобы получить ответ здесь, предоставьте две или три строки ввода   -  person Koitoer    schedule 20.09.2016
comment
Спасибо, ребята... Я только что обнаружил ошибку в одном из входных файлов... Надеюсь, это решит мою проблему. Спасибо   -  person Hamid_UMB    schedule 21.09.2016


Ответы (4)


Можете ли вы изменить свой класс DnaWritable и протестировать его (обработка NPE)

package com.hadoop.intellipaat;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class DnaWritable implements Writable {

    private Long bamWindow;
    private Long read;
    private Long refWindow;
    private Integer chr;
    private Double dist;

    public DnaWritable(Long bamWindow, Long read, Long refWindow, Integer chr, Double dist) {
        super();
        this.bamWindow = bamWindow;
        this.read = read;
        this.refWindow = refWindow;
        this.chr = chr;
        this.dist = dist;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(bamWindow);
        out.writeLong(read);
        out.writeLong(refWindow);
        out.writeInt(chr);
        out.writeDouble(dist);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.bamWindow = in.readLong();
        this.read = in.readLong();
        this.refWindow = in.readLong();
        this.chr = in.readInt();
        this.dist = in.readDouble();
    }

}
person cody123    schedule 20.09.2016
comment
Лучше избегать коробочных примитивов. Я бы рекомендовал изменить поля на их примитивные типы. Я говорю это только потому, что не похоже, что нули являются возможными значениями для полей OP. - person Pradeep Gollakota; 20.09.2016
comment
Я изменил реализацию, но пока не повезло - person Hamid_UMB; 20.09.2016
comment
@Hamid_UMB, пожалуйста, предоставьте несколько примеров данных ?? Чтобы я мог проверить. - person cody123; 21.09.2016

Я не думаю, что вы вообще отправили свою работу в кластер. в вашем основном классе нет job1.submit() или job1.waitForCompletion(true).

////submit the job to hadoop 
if (!job1.waitForCompletion(true))
return;

также в вашем основном методе требуется коррекция.

Job job1 = new Job();  //new Job() constructor is deprecated now.

ниже правильный для создания объекта задания

Configuration conf = new Configuration();
Job job1 = Job.getInstance(conf, "Your Program name");
person Meeran0823    schedule 21.09.2016

Я думаю, что вы неправильно реализовали методы write(DataOutput out) и readFields(DataInput in) в своем классе DnaWritable.

person Ashish Doneriya    schedule 20.09.2016
comment
Я также поставил код DnaWritable. Пожалуйста, взгляните на это. Спасибо - person Hamid_UMB; 20.09.2016

Рассмотрите также реализацию ComparableWritable следующим образом, а также добавьте конструктор без аргументов.

public class DnaWritable implements Writable WritableComparable<DnaWritable>  {

 //Consider add a non-args constructor
 public DnaWritable(){
 }

    //Add this static method as well
 public static DnaWritable read(DataInput in) throws IOException {
        DnaWritable dnaWritable = new DnaWritable();
        dnaWritable.readFields(in);
        return dnaWritable;
 }

 @Override
 public int compareTo(DnaWritable dnaWritable) {
      //Put your comparison logic there.
 }

}

Если это все еще не удается, я log4.properties, чтобы вы могли видеть, есть ли какая-либо основная ошибка, которую вы не видите.

источник/основной/ресурсы

hadoop.root.logger=DEBUG, console
log4j.rootLogger=INFO, stdout

# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
person Koitoer    schedule 20.09.2016
comment
Когда я пытался реализовать интерфейс WritableComparable, я получил эту ошибку: ошибка: на нестатический метод readFields (DataInput) нельзя ссылаться из статического контекста DnaWritable.readFields (in); .. .как я могу это решить?? - person Hamid_UMB; 20.09.2016
comment
это должно быть dnaWritable , в нижнем регистре мой плохой, должен быть недавно созданный экземпляр. - person Koitoer; 20.09.2016