Nodejs Чтение очень большого файла (~ 10 ГБ), обработка построчно, а затем запись в другой файл

У меня есть файл журнала 10 ГБ в определенном формате, я хочу обработать этот файл построчно, а затем записать вывод в другой файл после применения некоторых преобразований. Я использую узел для этой операции.

Хотя этот метод хорош, но для этого требуется чертовски много времени. Я смог сделать это в течение 30-45 минут в JAVA, но в узле на выполнение той же работы уходит более 160 минут. Ниже приведен код:

Ниже приведен код запуска, который считывает каждую строку из ввода.

var path = '../10GB_input_file.txt';
var output_file = '../output.txt';

function fileopsmain(){

    fs.exists(output_file, function(exists){
        if(exists) {
            fs.unlink(output_file, function (err) {
                if (err) throw err;
                console.log('successfully deleted ' + output_file);
            });
        }
    });

    new lazy(fs.createReadStream(path, {bufferSize: 128 * 4096}))
        .lines
        .forEach(function(line){
            var line_arr = line.toString().split(';');
            perform_line_ops(line_arr, line_arr[6], line_arr[7], line_arr[10]);
        }
    );

}

Это метод, который выполняет некоторую операцию над этой строкой и передает входные данные методу записи, чтобы записать их в выходной файл.

function perform_line_ops(line_arr, range_start, range_end, daynums){

    var _new_lines = '';
    for(var i=0; i<days; i++){
        //perform some operation to modify line pass it to print
    }

    write_line_ops(_new_lines);
}

Следующий метод используется для записи данных в новый файл.

function write_line_ops(line) {
    if(line != null && line != ''){
        fs.appendFileSync(output_file, line);
    }
}

Я хочу сократить это время до 15-20 минут. Возможно ли это сделать.

Также для протокола я пробую это на процессоре Intel i7 с 8 ГБ оперативной памяти.


person HVT7    schedule 17.07.2015    source источник
comment
Один из оперативных вопросов заключается в том, считывает ли модуль lazy весь файл в память перед его обработкой, а не передает его построчно? Вам может быть интересен модуль node-byline.   -  person jfriend00    schedule 17.07.2015
comment
Первым шагом, если бы я работал над этим, было бы время каждого шага на гораздо меньшем файле, чтобы увидеть, что именно вызывает замедление. Оттуда вы можете начать оптимизировать эту часть кода.   -  person Kevin B    schedule 17.07.2015
comment
@ jfriend00 No lazy module не загружает весь файл в память, поскольку я одновременно отслеживаю использование памяти.   -  person HVT7    schedule 17.07.2015
comment
@Kevin B. Я делаю то же самое, я работаю над файлом размером 400 МБ, который обрабатывается примерно за 2,5 минуты. Хотя я не совсем уверен, в чем здесь проблема.   -  person HVT7    schedule 17.07.2015
comment
Я бы предложил, чтобы вы сначала связали проблему здесь. Создайте простое тестовое приложение, которое просто создает поток чтения и считывает весь файл, не беспокоясь о строках и записи на диск. Посмотрите, сколько времени это займет. Если это быстро, то вы можете добавлять по одной части головоломки за раз и отслеживать свой прогресс по ходу дела. Затем добавьте его в новое имя файла и посмотрите на производительность. Если исходное чтение медленное, то проблема находится ниже в потоковой передаче nodejs, и вам придется перейти на более низкий уровень, чтобы исправить производительность.   -  person jfriend00    schedule 17.07.2015
comment
@ jfriend00 Спасибо за предложение, но я пытаюсь выяснить, использую ли я здесь правильный подход? Потому что если нет, то, по крайней мере, меня можно направить, чтобы я бежал в правильном направлении.   -  person HVT7    schedule 17.07.2015
comment
Итак, я хотел бы проверить с помощью нескольких тестовых приложений, достаточно ли производительна обычная потоковая передача nodejs, прежде чем вводить lazy в уравнение, чтобы вы знали, какая подсистема вызывает у вас проблему.   -  person jfriend00    schedule 17.07.2015
comment
И я советую вам, как решить, будет ли этот подход работать достаточно хорошо. Все зависит от профиля производительности инструментов, которые вы используете. В вашем подходе нет ничего неправильного с архитектурной точки зрения, если только используемые вами инструменты недостаточно быстры.   -  person jfriend00    schedule 17.07.2015
comment
@jfriend00 Хорошо =) . тогда какие инструменты вы предлагаете для выполнения этой задачи?   -  person HVT7    schedule 17.07.2015
comment
Я предложил вам написать простое тестовое приложение, чтобы убедиться, что простые потоки nodejs достаточно быстры для вас. Удалите из уравнения все остальные переменные (например, lazy и обработку строк). Запустите простое тестовое приложение для вашего большого файла, чтобы просто прочитать его по частям, используя потоки. Кажется, я уже описывал это несколько раз. Вы должны сделать несколько тестов, чтобы увидеть, что будет работать для вас.   -  person jfriend00    schedule 17.07.2015


Ответы (4)


Вы можете сделать это легко без модуля. Например:

var fs = require('fs');
var inspect = require('util').inspect;

var buffer = '';
var rs = fs.createReadStream('foo.log');
rs.on('data', function(chunk) {
  var lines = (buffer + chunk).split(/\r?\n/g);
  buffer = lines.pop();
  for (var i = 0; i < lines.length; ++i) {
    // do something with `lines[i]`
    console.log('found line: ' + inspect(lines[i]));
  }
});
rs.on('end', function() {
  // optionally process `buffer` here if you want to treat leftover data without
  // a newline as a "line"
  console.log('ended on non-empty buffer: ' + inspect(buffer));
});
person mscdex    schedule 17.07.2015
comment
Да, можно написать собственный код обработки строк. Но, если вы посмотрите на потребности OP, они должны иметь доступ к нескольким строкам одновременно, поэтому теперь ваш код должен добавить буферизацию групп строк и так далее. Весь смысл подхода OP состоит в том, чтобы попытаться использовать существующие инструменты, которые решают эти проблемы за вас, а не писать свои собственные с нуля. И неясно, как это решает проблему ОП. - person jfriend00; 17.07.2015
comment
Код OP читает файл построчно, что и делает мой код. Я хочу сказать, что в этом конкретном случае сделать это самостоятельно очень просто, а также убедиться, что весь файл не буферизуется сразу перед обработкой. - person mscdex; 17.07.2015
comment
Вы сделали дикое предположение, что вызывает проблему с производительностью, и предложили часть альтернативного решения. Мне кажется, мы еще не знаем, в чем проблема с производительностью, не проведя некоторые тесты. - person jfriend00; 17.07.2015
comment
@mscdex Сейчас я тестирую этот код бок о бок, так что теперь этот вопрос можно оставить открытым. Это решение может быть полезным. Обязательно сообщим всем о результате описанного выше подхода. - person HVT7; 17.07.2015
comment
@mscdex Это все еще занимает столько же времени. Я предполагаю, что должна быть некоторая задержка при записи в файл. - person HVT7; 17.07.2015
comment
@ HVT7 Какую версию node/io.js вы используете? Кроме того, вы можете показать код Java, который вы используете для обработки журналов. - person mscdex; 17.07.2015
comment
Код @mscdex JAVA прост, используя потоки ввода-вывода, и логика, применяемая для преобразований, такая же. В нем не используются сторонние библиотеки. - person HVT7; 17.07.2015

Я не могу угадать, где в вашем коде находится возможное узкое место.

  • Можете ли вы добавить библиотеку или исходный код функции lazy?
  • Сколько операций выполняет ваш perform_line_ops? (if/else, switch/case, вызовы функций)

Я создал пример на основе вашего кода, я знаю, что это не отвечает на ваш вопрос, но, возможно, поможет вам понять, как узел обрабатывает такой случай.

const fs = require('fs')
const path = require('path')

const inputFile = path.resolve(__dirname, '../input_file.txt')
const outputFile = path.resolve(__dirname, '../output_file.txt')

function bootstrap() {
    // fs.exists is deprecated
    // check if output file exists
    // https://nodejs.org/api/fs.html#fs_fs_exists_path_callback
    fs.exists(outputFile, (exists) => {
        if (exists) {
            // output file exists, delete it
            // https://nodejs.org/api/fs.html#fs_fs_unlink_path_callback
            fs.unlink(outputFile, (err) => {
                if (err) {
                    throw err
                }

                console.info(`successfully deleted: ${outputFile}`)
                checkInputFile()
            })
        } else {
            // output file doesn't exist, move on
            checkInputFile()
        }
    })
}

function checkInputFile() {
    // check if input file can be read
    // https://nodejs.org/api/fs.html#fs_fs_access_path_mode_callback
    fs.access(inputFile, fs.constants.R_OK, (err) => {
        if (err) {
            // file can't be read, throw error
            throw err
        }

        // file can be read, move on
        loadInputFile()
    })
}

function saveToOutput() {
    // create write stream
    // https://nodejs.org/api/fs.html#fs_fs_createwritestream_path_options
    const stream = fs.createWriteStream(outputFile, {
        flags: 'w'
    })

    // return wrapper function which simply writes data into the stream
    return (data) => {
        // check if the stream is writable
        if (stream.writable) {
            if (data === null) {
                stream.end()
            } else if (data instanceof Array) {
                stream.write(data.join('\n'))
            } else {
                stream.write(data)
            }
        }
    }
}

function parseLine(line, respond) {
    respond([line])
}

function loadInputFile() {
    // create write stream
    const saveOutput = saveToOutput()
    // create read stream
    // https://nodejs.org/api/fs.html#fs_fs_createreadstream_path_options
    const stream = fs.createReadStream(inputFile, {
        autoClose: true,
        encoding: 'utf8',
        flags: 'r'
    })

    let buffer = null

    stream.on('data', (chunk) => {
        // append the buffer to the current chunk
        const lines = (buffer !== null)
            ? (buffer + chunk).split('\n')
            : chunk.split('\n')

        const lineLength = lines.length
        let lineIndex = -1

        // save last line for later (last line can be incomplete)
        buffer = lines[lineLength - 1]

        // loop trough all lines
        // but don't include the last line
        while (++lineIndex < lineLength - 1) {
            parseLine(lines[lineIndex], saveOutput)
        }
    })

    stream.on('end', () => {
        if (buffer !== null && buffer.length > 0) {
            // parse the last line
            parseLine(buffer, saveOutput)
        }

        // Passing null signals the end of the stream (EOF)
        saveOutput(null)
    })
}

// kick off the parsing process
bootstrap()
person Siggy    schedule 28.04.2017

Я знаю, что это старо, но...

При предположении appendFileSync() _write()_s в файловой системе и ждет ответа. Много небольших операций записи, как правило, обходятся дорого. Если вы используете BufferedWriter в Java, вы можете получить более быстрые результаты, пропустив некоторые функции write().

Используйте одну из асинхронных операций записи и посмотрите, разумно ли буферизирует узел, или записывайте строки в большой буфер узла, пока он не заполнится, и всегда записывайте полный (или почти полный) буфер. Настроив размер буфера, вы можете проверить, влияет ли количество операций записи на производительность. Я подозреваю, что да.

person teknopaul    schedule 17.11.2017

Выполнение выполняется медленно, потому что вы не используете асинхронные операции узла. По сути, вы выполняете такой код:

> read some lines
> transform
> write some lines
> repeat

В то время как вы могли бы делать все сразу или, по крайней мере, читать и писать. Некоторые примеры в ответах здесь делают это, но синтаксис, по крайней мере, сложен. Используя scramjet, вы можете сделать это в пару простых строк:

const {StringStream} = require('scramjet');

fs.createReadStream(path, {bufferSize: 128 * 4096})
    .pipe(new StringStream({maxParallel: 128})    // I assume this is an utf-8 file
    .split("\n")                                  // split per line
    .parse((line) => line.split(';'))             // parse line
    .map([line_arr, range_start, range_end, daynums] => {
        return simplyReturnYourResultForTheOtherFileHere(
            line_arr, range_start, range_end, daynums
        );                                         // run your code, return promise if you're doing some async work
    })
    .stringify((result) => result.toString())
    .pipe(fs.createWriteStream)
    .on("finish", () => console.log("done"))
    .on("error", (e) => console.log("error"))

Это, вероятно, будет работать намного быстрее.

person Michał Karpacki    schedule 19.11.2017