Использование Hadoop MapReduce DistributedCache

Я пытаюсь воспроизвести пример Bloom Filtering из книги Шаблон проектирования MapReduce.
Далее я покажу только интересующий код:

public static class BloomFilteringMapper extends Mapper<Object, Text, Text, NullWritable>
{
    private BloomFilter filter = new BloomFilter();

    protected void setup( Context context ) throws IOException
    {
        URI[] files = DistributedCache.getCacheFiles( context.getConfiguration() );
        String path = files[0].getPath();
        System.out.println( "Reading Bloom Filter from: " + path );
        DataInputStream strm = new DataInputStream( new FileInputStream( path ) );
        filter.readFields( strm );
        strm.close();
    }
    //...
}
public static void main( String[] args ) throws Exception
{
    Job job = new Job( new Configuration(), "description" );
    URI uri = new URI("hdfs://localhost:9000/user/draxent/comment.bloomfilter");
    DistributedCache.addCacheFile( uri, job.getConfiguration() );
    //...
}

Когда я пытаюсь выполнить его, я получаю следующую ошибку:
java.io.FileNotFoundException: /user/draxent/comment.bloomfilte
r

Но выполнение команды:

bin/hadoop fs -ls

Я вижу файл:

-rw-r--r--   1 draxent supergroup        405 2015-11-25 17:12 /user/draxent/comment.bloomfilter

Так что я совершенно уверен, что проблема в линии:

URI uri = new URI("hdfs://localhost:9000/user/draxent/comment.bloomfilter");

Но я пробовал несколько разных конфигураций, например:
"hdfs://user/draxent/comment.bloomfilter"
"/user/draxent/comment.bloomfilter "
"comment.bloomfilter"

И никто не работает.

Я попытался найти реализация cfeduke, но не смог решить свою проблему.

Ответить на комментарии:

  • ravindra: URI files[0] содержит строковый элемент, переданный в main;
  • Манджунат Баллур: да, вы правы. Но поскольку файл существует (вы можете увидеть его из bin/hadoop fs -ls), это означает, что проблема связана со строкой path, переданной в FileInputStream. Но я передаю строку этому, как всегда. Я проверил, значение пути: comment.bloomfilter... значит, оно должно быть правильным.

person Draxent    schedule 25.11.2015    source источник
comment
Можете ли вы отладить значения, которые вы получили из вызова файлов URI[]? Я подозреваю, что контекст не был правильно установлен.   -  person Ravindra babu    schedule 26.11.2015
comment
Проблема не в строке: URI uri = new URI(hdfs://localhost:9000/user/draxent/comment.bloomfilter);. Эта строка может генерировать только URISyntaxException. Кажется, проблема связана со строкой: DataInputStream strm = new DataInputStream(new FileInputStream(путь));. FileInputStream() генерирует исключение FileNotFoundException.   -  person Manjunath Ballur    schedule 26.11.2015


Ответы (2)


API распределенного кэша устарел.

Вы можете расширить те же функции с помощью нового API. Ознакомьтесь с документацией здесь: http://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/Job.html

В коде драйвера: -

Job job = new Job(); ... job.addCacheFile(new Path(filename).toUri());

В методе настройки картографа: -

Path[] localPaths = context.getLocalCacheFiles();
person Deepan Ram    schedule 02.03.2017

Должно работать следующее: удалите строку с URI uri = new URI(... и измените следующую строку на:

DistributedCache.addCacheFile(new Path("/user/draxent/comment.bloomfilter").toUri(), job.getConfiguration());
person Denis    schedule 02.03.2017