Выходной каталог Hadoop Pig не задан

Я пишу свой собственный класс Pig Store, где я не хочу хранить его в файле, я планирую отправить его в какое-то стороннее хранилище данных (за исключением вызовов API).

примечание: я запускаю его на образе Cloudera VirtualBox.

Я написал свои классы Java (перечислены ниже) и создал mystore.jar, который я использую в приведенном ниже сценарии id.pig:

store B INTO 'mylocation' USING MyStore('mynewlocation')

при запуске этого скрипта с помощью pig я вижу следующие ошибки: ОШИБКА 6000: не удалось проверить местоположение вывода для: 'file://home/cloudera/test/id.out Дополнительная информация: выходной каталог не установлен.

or.apache.pig.impl.plan.VisitorException: ERROR 6000:
at or.apache.pig.newplan.logical.rules.InputOutputFileValidator$InputOutputFileValidator.visit(InputOutputFileValidator.java:95)

Пожалуйста помоги!

-------------------- MyStore.java ----------------------

public class MyStore extends StoreFunc {
    protected RecordWriter writer = null;
    private String location = null;


    public MyStore () {
        location= null;
    }

    public MyStore (String location) {
        this.location= location;
    }

    @Override
    public OutputFormat getOutputFormat() throws IOException {
        return new MyStoreOutputFormat(location);
    }

    @Override
    public void prepareToWrite(RecordWriter writer) throws IOException {
        this.writer = writer;
    }

    @Override
    public void putNext(Tuple tuple) throws IOException {
        //write tuple to location

        try {
            writer.write(null, tuple.toString());
        } catch (InterruptedException e) {          
            e.printStackTrace();
        }
    }

    @Override
    public void setStoreLocation(String location, Job job) throws IOException {
        if(location!= null)
            this.location= location;
    }

}

-------------------- MyStoreOutputFormat.java ----------------------

import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pig.data.Tuple;

public class MyStoreOutputFormat extends
        TextOutputFormat<WritableComparable, Tuple> {
    private String location = null;

    public MyStoreOutputFormat(String location) {

        this.location = location;
    }

    @Override
    public RecordWriter<WritableComparable, Tuple> getRecordWriter(
            TaskAttemptContext job) throws IOException, InterruptedException {

        Configuration conf = job.getConfiguration();

        String extension = location;
        Path file = getDefaultWorkFile(job, extension);     
        FileSystem fs = file.getFileSystem(conf);

        FSDataOutputStream fileOut = fs.create(file, false);

        return new MyStoreRecordWriter(fileOut);
    }

    protected static class MyStoreRecordWriter extends
            RecordWriter<WritableComparable, Tuple> {

        DataOutputStream out = null;

        public MyStoreRecordWriter(DataOutputStream out) {
            this.out = out;
        }

        @Override
        public void close(TaskAttemptContext taskContext) throws IOException,
                InterruptedException {
            // close the location
        }

        @Override
        public void write(WritableComparable key, Tuple value)
                throws IOException, InterruptedException {

            // write the data to location
            if (out != null) {
                out.writeChars(value.toString()); // will be calling API later. let me first dump to the location!
            }
        }

    }
}

я что-то пропустил здесь?


person Java Spring Coder    schedule 06.02.2013    source источник
comment
пожалуйста помоги. Мне это нужно срочно. Спасибо!   -  person Java Spring Coder    schedule 06.02.2013


Ответы (1)


Во-первых, я думаю, вы должны использовать конфигурацию задания для хранения значения местоположения, а не переменной экземпляра.

Ваше назначение локальной переменной «местоположение» в методе setStoreLocation вызывается при планировании задания, но вызов getOutputFormat может быть сделан только на этапе выполнения, когда переменная местоположения может больше не устанавливаться (новый экземпляр вашего класса может быть создан).

Если вы посмотрите на источник для PigStorage.setStoreLocation, вы должны заметить, что они сохраняют местоположение в конфигурации задания (2-я строка):

@Override
public void setStoreLocation(String location, Job job) throws IOException {
    job.getConfiguration().set("mapred.textoutputformat.separator", "");
    FileOutputFormat.setOutputPath(job, new Path(location));

    if( "true".equals( job.getConfiguration().get( "output.compression.enabled" ) ) ) {
        FileOutputFormat.setCompressOutput( job, true );
        String codec = job.getConfiguration().get( "output.compression.codec" );
        try {
            FileOutputFormat.setOutputCompressorClass( job,  (Class<? extends CompressionCodec>) Class.forName( codec ) );
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("Class not found: " + codec );
        }
    } else {
        // This makes it so that storing to a directory ending with ".gz" or ".bz2" works.
        setCompression(new Path(location), job);
    }
}

Поэтому я думаю, что вы должны сохранить местоположение в переменной задания:

@Override
public void setStoreLocation(String location, Job job) throws IOException {
    if(location!= null)
        job.getConfiguration().set("mylocation", location);
}

Какой ваш пользовательский формат вывода затем можно извлечь в методе createRecordReader:

@Override
public RecordWriter<WritableComparable, Tuple> getRecordWriter(
        TaskAttemptContext job) throws IOException, InterruptedException {

    Configuration conf = job.getConfiguration();

    String extension = conf.get("mylocation");
    Path file = getDefaultWorkFile(job, extension);     
    FileSystem fs = file.getFileSystem(conf);

    FSDataOutputStream fileOut = fs.create(file, false);

    return new MyStoreRecordWriter(fileOut);
}

Наконец (и, вероятно, фактическая причина ошибки, которую вы видите), ваш формат вывода расширяет TextOutputFormat, и вы используете метод getDefaultWorkFile в своем средстве записи - этот метод должен знать, куда вы выводите файл в HDFS, и вы не вызвал FileOutputFormat.setOutputPath(job, new Path(location)); в вашем методе setStoreLocation (см. ранее вставленный метод PigStorage.setStoreLocation). Итак, ошибка в том, что он не знает, где создать рабочий файл по умолчанию.

person Chris White    schedule 06.02.2013
comment
спасибо Крис. Мне не хватало FileOutputFormat.setOutputPath(job, new Path(location)); вызов. изменил мой код в соответствии с вашими входными данными. - person Java Spring Coder; 06.02.2013