Обработка больших выходных значений шага сокращения в Hadoop

На этапе сокращения моей программы MapReduce единственной операцией, которую я выполняю, является объединение каждого значения в предоставленном итераторе, как показано ниже:

public void reduce(Text key, Iterator<text> values,
                    OutputCollector<Text, Text> output, Reporter reporter) {
    Text next;
    Text outKey = new Text()
    Text outVal = new Text();
    StringBuilder sb = new StringBuilder();
    while(values.hasNext()) {
        next = values.next();
        sb.append(next.toString());
        if (values.hasNext())
            sb.append(',');
    }
    outKey.set(key.toString());
    outVal.set(sb.toSTring());
    output.collect(outKey,outVal);
}

Моя проблема в том, что некоторые выходные значения сокращения представляют собой огромные строки текста; настолько большой, что даже при очень большом начальном размере строковый буфер должен увеличить (удвоить) свой размер несколько раз, чтобы вместить весь контекст итератора, вызывая проблемы с памятью.

В традиционном Java-приложении это означало бы, что буферизованная запись в файл будет предпочтительным методом записи вывода. Как вы справляетесь с чрезвычайно большими выходными парами ключ-значение в Hadoop? Должен ли я передавать результаты непосредственно в файл на HDFS (один файл на вызов сокращения)? Есть ли способ буферизовать вывод, кроме метода output.collect?

Примечание. Я уже максимально увеличил объем памяти/кучи. Кроме того, несколько источников указали, что увеличение количества редьюсеров может помочь с проблемами памяти/кучи, но здесь проблема напрямую связана с использованием SringBuilder, когда он расширяет свою емкость.

Спасибо


person Syllepsis    schedule 13.04.2012    source источник


Ответы (2)


Не то чтобы я понимаю, почему вы хотели бы иметь огромное значение, но есть способ сделать это.

Если вы напишите свой собственный OutputFormat, вы можете исправить поведение метода RecordWriter.write(Key, Value) для обработки конкатенации значений в зависимости от того, является ли значение ключа нулевым или нет.

Таким образом, в вашем редьюсере вы можете написать свой код следующим образом (первый вывод для ключа — это фактический ключ, а все, что после него, — нулевой ключ:

public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output, Reporter reporter) {
  boolean firstKey = true;
  for (Text value : values) {
    output.collect(firstKey ? key : null, value);
    firstKey = false;
  }
}

Фактический RecordWriter.write() имеет следующую логику для обработки логики конкатенации нулевого ключа/значения:

    public synchronized void write(K key, V value) throws IOException {

        boolean nullKey = key == null || key instanceof NullWritable;
        boolean nullValue = value == null || value instanceof NullWritable;
        if (nullKey && nullValue) {
            return;
        }

        if (!nullKey) {
            // if we've written data before, append a new line
            if (dataWritten) {
                out.write(newline);
            }

            // write out the key and separator
            writeObject(key);
            out.write(keyValueSeparator);
        } else if (!nullValue) {
            // write out the value delimiter
            out.write(valueDelimiter);
        }

        // write out the value
        writeObject(value);

        // track that we've written some data
        dataWritten = true;
    }

    public synchronized void close(Reporter reporter) throws IOException {
        // if we've written out any data, append a closing newline
        if (dataWritten) {
            out.write(newline);
        }

        out.close();
    }

Вы заметите, что в метод close также были внесены изменения, чтобы добавить завершающую новую строку к последней выписанной записи.

Полный листинг кода можно найти на pastebin, и вот результат теста:

key1    value1
key2    value1,value2,value3
key3    value1,value2
person Chris White    schedule 14.04.2012

Если одно выходное ключ-значение может быть больше, чем память, это означает, что стандартный механизм вывода не подходит, поскольку по дизайну интерфейса он требует передачи пары ключ-значение, а не потока.
Я думаю, что самым простым решением будет потоковая передача вывода прямо в файл HDFS.
Если у вас есть причины передавать данные через формат вывода - я бы предложил следующее решение: а) Записать в локальный временный каталог
б) Передавать имя файла в качестве значения для формата вывода.

Вероятно, наиболее эффективным, но немного сложным решением было бы использование файла с отображением памяти в качестве буфера. Он будет находиться в памяти до тех пор, пока памяти достаточно, а при необходимости ОС позаботится об эффективном сбросе на диск.

person David Gruzman    schedule 13.04.2012