Потоковый асинхронный буферизованный поток с низкой задержкой C ++ (предназначенный для ведения журнала) - Boost

Вопрос:

3, а приведенные ниже циклы содержат закомментированный код. Я ищу ("TAG1", "TAG2" и "TAG3") для легкой идентификации. Я просто хочу, чтобы циклы while ждали, пока проверяемое условие станет истинным, прежде чем продолжить, минимизируя ресурсы ЦП в максимально возможной степени. Сначала я попытался использовать переменные условия Boost, но есть условие гонки. Перевод потока в спящий режим на «x» микросекунд неэффективен, поскольку нет способа точно рассчитать время пробуждения. Наконец, boost :: this_thread :: yield (), похоже, ничего не делает. Вероятно, потому что у меня только 2 активных потока в двухъядерной системе. В частности, как я могу сделать три указанные ниже области с тегами работать более эффективно, вводя как можно меньше ненужных блокировок.

ИСТОРИЯ

Задача:

У меня есть приложение, которое регистрирует много данных. После профилирования я обнаружил, что много времени уходит на операции регистрации (запись текста или двоичного файла в файл на локальном жестком диске). Моя цель - уменьшить задержку при вызовах logData, заменив непоточные вызовы прямой записи на вызовы поточного регистратора с буферизацией потоков.

Изученные варианты:

  • Обновить медленный жесткий диск эпохи 2005 года до SSD ... возможно. Стоимость не является непомерно высокой ... но требует большого объема работы ... необходимо обновить более 200 компьютеров ...
  • Увеличьте ASIO ... Мне не нужны все накладные расходы проактора / сети, я ищу что-то более простое и легкое.

Дизайн:

  • При использовании шаблона потока-производителя и потребителя приложение записывает данные в буфер, а фоновый поток затем записывает их на диск через некоторое время. Таким образом, конечная цель состоит в том, чтобы функция writeMessage, вызываемая уровнем приложения, возвращалась как можно быстрее, в то время как данные корректно / полностью записываются в файл журнала в порядке FIFO через некоторое время.
  • Только один поток приложения, только один поток писателя.
  • На основе кольцевого буфера. Причина этого решения - использовать как можно меньше блокировок и в идеале ... и, пожалуйста, поправьте меня, если я ошибаюсь ... Я не думаю, что они мне нужны.
  • Буфер - это статически распределенный массив символов, но при необходимости / желании его можно переместить в кучу из соображений производительности.
  • У буфера есть начальный указатель, который указывает на следующий символ, который должен быть записан в файл. У буфера есть конечный указатель, который указывает на индекс массива после последнего символа, который должен быть записан в файл. Конечный указатель НИКОГДА не проходит мимо начального указателя. Если приходит сообщение, размер которого превышает размер буфера, то писатель ждет, пока буфер не будет опустошен, и записывает новое сообщение в файл напрямую, не помещая сообщение слишком большого размера в буфер (как только буфер опустошается, рабочий поток не буду ничего писать, так что никаких споров).
  • Писатель (рабочий поток) обновляет только начальный указатель кольцевого буфера.
  • Основной (поток приложения) обновляет только конечный указатель кольцевого буфера, и, опять же, он только вставляет новые данные в буфер, когда есть доступное пространство ... в противном случае он либо ждет, пока пространство в буфере станет доступным, либо записывает напрямую, как описано выше.
  • Рабочий поток постоянно проверяет, есть ли данные для записи (на это указывает случай, когда указатель начала буфера! = Указатель конца буфера). Если нет данных для записи, рабочий поток в идеале должен перейти в спящий режим и проснуться после того, как поток приложения вставил что-то в буфер (и изменил конечный указатель буфера таким образом, чтобы он больше не указывал на тот же индекс, что и начальный указатель). То, что у меня ниже, включает в себя циклы while, непрерывно проверяющие это условие. Это очень плохой / неэффективный способ ожидания в буфере.

Полученные результаты:

  • На моем двухъядерном ноутбуке 2009 года с твердотельным накопителем я вижу, что общее время записи в тесте с потоковой / буферизацией по сравнению с прямой записью составляет примерно 1: 6 (0,609 с против 0,095 с), но сильно варьируется. Часто тест буферизованной записи на самом деле медленнее, чем прямая запись. Я считаю, что изменчивость связана с плохой реализацией ожидания освобождения места в буфере, ожидания опустошения буфера и ожидания рабочего потока, когда работа станет доступной. Я измерил, что некоторые из циклов while потребляют более 10000 циклов, и я подозреваю, что эти циклы фактически конкурируют за аппаратные ресурсы, которые требуются другому потоку (рабочему или приложению) для завершения ожидаемых вычислений.
  • Выход, кажется, проверяется. При включенном режиме TEST и небольшом размере буфера 10 в качестве стресс-теста я сравнил сотни мегабайт вывода и обнаружил, что он равен входному.

Компилируется с текущей версией Boost (1.55)

Заголовок

    #ifndef BufferedLogStream_h
    #define BufferedLogStream_h

    #include <stdio.h>
    #include <iostream>
    #include <iostream>
    #include <cstdlib>
    #include "boost\chrono\chrono.hpp"
    #include "boost\thread\thread.hpp"
    #include "boost\thread\locks.hpp"
    #include "boost\thread\mutex.hpp"
    #include "boost\thread\condition_variable.hpp"
    #include <time.h>

    using namespace std;

    #define BENCHMARK_STR_SIZE 128
    #define NUM_BENCHMARK_WRITES 524288
    #define TEST 0
    #define BENCHMARK 1
    #define WORKER_LOOP_WAIT_MICROSEC 20
    #define MAIN_LOOP_WAIT_MICROSEC 10

    #if(TEST)
    #define BUFFER_SIZE 10 
    #else 
    #define BUFFER_SIZE 33554432 //4 MB
    #endif

    class BufferedLogStream {
        public:
            BufferedLogStream();
            void openFile(char* filename);
            void flush();
            void close();
            inline void writeMessage(const char* message, unsigned int length);
            void writeMessage(string message);
            bool operator() () { return start != end; }

        private:
            void threadedWriter();
            inline bool hasSomethingToWrite();
            inline unsigned int getFreeSpaceInBuffer();
            void appendStringToBuffer(const char* message, unsigned int length);

            FILE* fp;
            char* start;
            char* end;
            char* endofringbuffer;
            char ringbuffer[BUFFER_SIZE];
            bool workerthreadkeepalive;
            boost::mutex mtx;
            boost::condition_variable waitforempty;
            boost::mutex workmtx;
            boost::condition_variable waitforwork;

            #if(TEST)
            struct testbuffer {
                int length;
                char message[BUFFER_SIZE * 2];
            };

            public:
                void test();

            private:
                void getNextRandomTest(testbuffer &tb);
                FILE* datatowrite;
            #endif

        #if(BENCHMARK)
            public:
                void runBenchmark();

            private:
                void initBenchmarkString();
                void runDirectWriteBaseline();
                void runBufferedWriteBenchmark();

                char benchmarkstr[BENCHMARK_STR_SIZE];
        #endif
    };

    #if(TEST)
    int main() {
        BufferedLogStream* bl = new BufferedLogStream();
        bl->openFile("replicated.txt");
        bl->test();
        bl->close();
        cout << "Done" << endl;
        cin.get();
        return 0;
    }
    #endif

    #if(BENCHMARK)
    int main() {
        BufferedLogStream* bl = new BufferedLogStream();
        bl->runBenchmark();
        cout << "Done" << endl;
        cin.get();
        return 0;
    }
    #endif //for benchmark

    #endif

Реализация

    #include "BufferedLogStream.h"

    BufferedLogStream::BufferedLogStream() {
        fp = NULL;
        start = ringbuffer;
        end = ringbuffer;
        endofringbuffer = ringbuffer + BUFFER_SIZE;
        workerthreadkeepalive = true;
    }

    void BufferedLogStream::openFile(char* filename) {
        if(fp) close();
        workerthreadkeepalive = true;
        boost::thread t2(&BufferedLogStream::threadedWriter, this);
        fp = fopen(filename, "w+b");
    }

    void BufferedLogStream::flush() {
        fflush(fp); 
    }

    void BufferedLogStream::close() {
        workerthreadkeepalive = false;
        if(!fp) return;
        while(hasSomethingToWrite()) {
            boost::unique_lock<boost::mutex> u(mtx);
            waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
        }
        flush();        
        fclose(fp);             
        fp = NULL;          
    }

    void BufferedLogStream::threadedWriter() {
        while(true) {
            if(start != end) {
                char* currentend = end;
                if(start < currentend) {
                    fwrite(start, 1, currentend - start, fp);
                }
                else if(start > currentend) {
                    if(start != endofringbuffer) fwrite(start, 1, endofringbuffer - start, fp); 
                    fwrite(ringbuffer, 1, currentend - ringbuffer, fp);
                }
                start = currentend;
                waitforempty.notify_one();
            }
            else { //start == end...no work to do
                if(!workerthreadkeepalive) return;
                boost::unique_lock<boost::mutex> u(workmtx);
                waitforwork.wait_for(u, boost::chrono::microseconds(WORKER_LOOP_WAIT_MICROSEC));
            }
        }
    }

    bool BufferedLogStream::hasSomethingToWrite() {
        return start != end;
    }

    void BufferedLogStream::writeMessage(string message) {
        writeMessage(message.c_str(), message.length());
    }

    unsigned int BufferedLogStream::getFreeSpaceInBuffer() {
        if(end > start) return (start - ringbuffer) + (endofringbuffer - end) - 1;
        if(end == start) return BUFFER_SIZE-1;
        return start - end - 1; //case where start > end
    }

    void BufferedLogStream::appendStringToBuffer(const char* message, unsigned int length) {
        if(end + length <= endofringbuffer) { //most common case for appropriately-sized buffer
            memcpy(end, message, length);
            end += length;
        }
        else {
            int lengthtoendofbuffer = endofringbuffer - end;
            if(lengthtoendofbuffer > 0) memcpy(end, message, lengthtoendofbuffer);
            int remainderlength =  length - lengthtoendofbuffer;
            memcpy(ringbuffer, message + lengthtoendofbuffer, remainderlength);
            end = ringbuffer + remainderlength;
        }
    }

    void BufferedLogStream::writeMessage(const char* message, unsigned int length) {
        if(length > BUFFER_SIZE - 1) { //if string is too large for buffer, wait for buffer to empty and bypass buffer, write directly to file
            while(hasSomethingToWrite()); {
                boost::unique_lock<boost::mutex> u(mtx);
                waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
            }
            fwrite(message, 1, length, fp);
        }
        else {
            //wait until there is enough free space to insert new string
            while(getFreeSpaceInBuffer() < length) {
                boost::unique_lock<boost::mutex> u(mtx);
                waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
            }
            appendStringToBuffer(message, length);
        }
        waitforwork.notify_one();
    }

    #if(TEST)
        void BufferedLogStream::getNextRandomTest(testbuffer &tb) {
            tb.length = 1 + (rand() % (int)(BUFFER_SIZE * 1.05));
            for(int i = 0; i < tb.length; i++) {
                tb.message[i] = rand() % 26 + 65;
            }
            tb.message[tb.length] = '\n';
            tb.length++;
            tb.message[tb.length] = '\0';
        }

        void BufferedLogStream::test() {
            cout << "Buffer size is: " << BUFFER_SIZE << endl;
            testbuffer tb;
            datatowrite = fopen("orig.txt", "w+b");
            for(unsigned int i = 0; i < 7000000; i++) {
                if(i % 1000000 == 0) cout << i << endl;
                getNextRandomTest(tb);
                writeMessage(tb.message, tb.length);
                fwrite(tb.message, 1, tb.length, datatowrite);
            }       
            fflush(datatowrite);
            fclose(datatowrite);
        }
    #endif

    #if(BENCHMARK) 
        void BufferedLogStream::initBenchmarkString() {
            for(unsigned int i = 0; i < BENCHMARK_STR_SIZE - 1; i++) {
                benchmarkstr[i] = rand() % 26 + 65;
            }
            benchmarkstr[BENCHMARK_STR_SIZE - 1] = '\n';
        }

        void BufferedLogStream::runDirectWriteBaseline() {
            clock_t starttime = clock();
            fp = fopen("BenchMarkBaseline.txt", "w+b");
            for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
                fwrite(benchmarkstr, 1, BENCHMARK_STR_SIZE, fp);
            }   
            fflush(fp);
            fclose(fp);
            clock_t elapsedtime = clock() - starttime;
            cout << "Direct write baseline took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
        }

        void BufferedLogStream::runBufferedWriteBenchmark() {
            clock_t starttime = clock();
            openFile("BufferedBenchmark.txt");
            cout << "Opend file" << endl;
            for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
                writeMessage(benchmarkstr, BENCHMARK_STR_SIZE);
            }   
            cout << "Wrote" << endl;
            close();
            cout << "Close" << endl;
            clock_t elapsedtime = clock() - starttime;
            cout << "Buffered write took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
        }

        void BufferedLogStream::runBenchmark() {
            cout << "Buffer size is: " << BUFFER_SIZE << endl;
            initBenchmarkString();
            runDirectWriteBaseline();
            runBufferedWriteBenchmark();
        }
    #endif

Обновление: 25 ноября 2013 г.

Я обновил приведенный ниже код, используя boost :: condition_variables, в частности метод wait_for (), рекомендованный Евгением Панасюком. Это позволяет избежать ненужной проверки одного и того же условия снова и снова. В настоящее время я вижу, что версия с буферизацией выполняется примерно в 1/6 раза быстрее, чем версия без буферизации / с прямой записью. Это не идеальный случай, потому что оба случая ограничены жестким диском (в моем случае SSD 2010 года). Я планирую использовать приведенный ниже код в среде, где жесткий диск не будет узким местом, и в большинстве случаев, если не всегда, в буфере должно быть место для размещения запросов writeMessage. Это подводит меня к следующему вопросу. Насколько большим я должен сделать буфер? Я не против выделить 32 МБ или 64 МБ, чтобы он никогда не заполнялся. Код будет работать в системах, в которых это можно сэкономить. Интуитивно я считаю, что статически выделять массив символов размером 32 МБ - плохая идея. Это? Во всяком случае, я ожидаю, что когда я запущу приведенный ниже код для предполагаемого приложения, задержка вызовов logData () будет значительно уменьшена, что приведет к значительному сокращению общего времени обработки.

Если кто-то видит способ улучшить приведенный ниже код (быстрее, надежнее, компактнее и т. Д.), Сообщите мне. Я ценю обратную связь. Лазин, как бы ваш подход был быстрее или эффективнее, чем то, что я опубликовал ниже? Мне нравится идея иметь только один буфер и сделать его достаточно большим, чтобы он практически никогда не заполнялся. Тогда мне не нужно беспокоиться о чтении из разных буферов. Евгений Панасюк, мне нравится подход по возможности использовать существующий код, особенно если это уже существующая библиотека boost. Однако я также не вижу, насколько spcs_queue более эффективен, чем то, что показано ниже. Я бы предпочел иметь дело с одним большим буфером, чем с множеством более мелких, и мне пришлось бы беспокоиться о разделении моего входного потока на вход и объединении его обратно на выходе. Ваш подход позволит мне перенести форматирование из основного потока в рабочий поток. Это резкий подход. Но я еще не уверен, сэкономит ли это много времени, и чтобы получить полную выгоду, мне пришлось бы изменить код, которым я не владею.

// Конец обновления


person 486DX2-66    schedule 25.11.2013    source источник
comment
Вы писали, что После профилирования я обнаружил, что на операции записи в журнал уходит много времени. Вам нужно разбить это время на форматирование и запись.   -  person Maxim Egorushkin    schedule 25.11.2013
comment
Рассмотрим boost::lockfree::spsc_queue - это очередь с одним производителем и одним потребителем без ожидания. Его можно настроить на время компиляции (размер внутреннего кольцевого буфера).   -  person Evgeny Panasyuk    schedule 25.11.2013
comment
Я также только что нашел это: boost.org/doc/libs/1_54_0/doc/html/atomic/ Вопрос не столько в кольцевом буфере и spcs_queue. У меня есть вопрос, как заставить рабочий поток работать, как только есть работа, и спать, когда работы нет. Думаю, мне может понадобиться назначить директора для координации. Во-первых, я собираюсь сделать 2 указателя на буфер атомарными. Я предполагал, что, поскольку каждый из них будет изменен только одним потоком или другим, но никогда обоими, операция будет атомарной.   -  person 486DX2-66    schedule 25.11.2013
comment
@ Егорушкин: Я хочу максимально сократить время форматирования и записи, поэтому планирую сократить и то, и другое. Приведенный выше тест показывает, что латентность сокращается по крайней мере в 6 раз (я думаю, что это соотношение будет более чем на порядок, как только я найду способ исправить циклы while) просто из-за одной только части записи (fwrite, использованный выше, делает без форматирования, просто записывает символы прямо в файл). Кроме того, форматирование, происходящее на уровне приложения, довольно простое ... не выполняется сериализация объектов или что-то сложное. В основном просто Int2Char и Double2Char.   -  person 486DX2-66    schedule 25.11.2013
comment
Также полезно: csd.uwo.ca/~moreno/HPC -Slides / Synchronizing_without_Locks.pdf   -  person 486DX2-66    schedule 26.11.2013
comment
Я ценю обратную связь. Лазин, насколько ваш подход будет быстрее или эффективнее, чем то, что я опубликовал ниже? Я не очень тщательно анализирую ваше решение, прежде чем писать свой ответ :). Кольцевой буфер похож на очередь буферов. Я выбираю очередь буферов, потому что ее проще описать. Ключевые идеи в моем решении: алгоритм Наггла (пишите, когда вы готовы писать, буферизуйте сообщения, если вы не готовы); двухфазное обновление буфера; писать синхронно сообщения с высоким приоритетом.   -  person Evgeny Lazin    schedule 27.11.2013
comment
Самая главная идея ведения журнала - сообщение журнала должно появляться в файле как можно быстрее. Рассмотрим этот пример - ваша программа дает сбой, но незадолго до сбоя она записывает в журнал некоторые сообщения с информацией об ошибках, которые приводят к этому сбою. Но если вы все буферизируете - вы потеряете эту конфиденциальную информацию. Из-за этого - ошибки должны регистрироваться синхронно.   -  person Evgeny Lazin    schedule 27.11.2013


Ответы (2)


Общее решение.

Я думаю, вам стоит взглянуть на алгоритм Naggle. Для одного производителя и одного потребителя это будет выглядеть так:

  • В начале буфер пуст, рабочий поток простаивает и ожидает событий.
  • Производитель записывает данные в буфер и уведомляет рабочий поток.
  • Рабочий поток проснулся и начал операцию записи.
  • Производитель пытается записать другое сообщение, но буфер используется рабочим, поэтому производитель выделяет другой буфер и записывает в него сообщение.
  • Производитель пытается записать другое сообщение, ввод-вывод все еще выполняется, поэтому производитель записывает сообщение в ранее выделенный буфер.
  • Рабочий поток завершил запись буфера в файл и видит, что есть другой буфер с данными, поэтому захватывает его и начинает запись.
  • Самый первый буфер используется производителем для записи всех последовательных сообщений, пока не будет выполняться вторая операция записи.

Эта схема поможет достичь требований к низкой задержке, одно сообщение будет записано на диск мгновенно, но большое количество событий будет записано большими пакетами для большей пропускной способности.

Если ваши сообщения журнала имеют уровни - вы можете немного улучшить эту схему. Все сообщения об ошибках имеют высокий приоритет (уровень) и должны быть немедленно сохранены на диске (потому что они редки, но очень ценны), но сообщения отладки и трассировки имеют низкий приоритет и могут быть буферизированы для экономии полосы пропускания (потому что они очень часты, но не так ценны). в виде сообщений об ошибках и информационных сообщений). Поэтому, когда вы пишете error сообщение, вы должны дождаться, пока рабочий поток закончит запись вашего сообщения (и всех сообщений, находящихся в том же буфере), а затем продолжить, но сообщения отладки и трассировки можно просто записать в буфер.

Поток.

Создание рабочего потока для каждого потока приложения является дорогостоящим. Вы должны использовать один поток записи для каждого файла журнала. Буферы записи должны быть разделены между потоками. Каждый буфер должен иметь два указателя - commit_pointer и prepare_pointer. Все буферное пространство между началом буфера и commit_pointer доступно для рабочего потока. Буферное пространство между commit_pointer и prepare_pointer в настоящее время обновляется потоками приложений. Инвариант: commit_pointer ‹= prepare_pointer.

Операции записи можно выполнить в два этапа.

  1. Prepare write. This operation reserves space in a buffer.
    • Producer calculates len(message) and atomically updates prepare_pointer;
    • Старое значение prepare_pointer и len сохраняются потребителем;
  2. Commit write.
    • Producer writes message at the beginning of the reserved buffer space (old prepare_pointer value).
    • Производитель занят-ждет, пока commit_pointer не станет равным старому значению prepare_pointer, которое он сохраняет в локальной переменной.
    • Производитель фиксирует операцию записи, выполняя commit_pointer = commit_pointer + len атомарно.

Чтобы предотвратить ложное совместное использование, len (message) может быть округлено до размера строки кеша, а все лишнее пространство может быть заполнено пробелами.

// pseudocode
void write(const char* message) {
    int len = strlen(message);  // TODO: round to cache line size
    const char* old_prepare_ptr;
    // Prepare step
    while(1) 
    {
        old_prepare_ptr = prepare_ptr;
        if (
            CAS(&prepare_ptr, 
                 old_prepare_ptr, 
                 prepare_ptr + len) == old_prepare_ptr
            )
            break;
        // retry if another thread perform prepare op.
    }
    // Write message
    memcpy((void*)old_prepare_ptr, (void*)message, len);
    // Commit step
    while(1)
    {
        const char* old_commit_ptr = commit_ptr;
        if (
             CAS(&commit_ptr, 
                  old_commit_ptr, 
                  old_commit_ptr + len) == old_commit_ptr
            )
            break;
        // retry if another thread commits
    }
    notify_worker_thread();
}
person Evgeny Lazin    schedule 25.11.2013
comment
Лазин, спасибо за ваш вклад. Вышеупомянутый подход выглядит надежным. Единственным недостатком является то, что для управления двумя буферами вместо одного требуется один, и поток приложения все равно будет ждать в случае, если оба буфера заполнены, а рабочий поток все еще выполняет запись. В противном случае выделяются новые буферы, и я хочу избежать выделения памяти после создания буферного потока. Я думаю, что вы правы в части CAS ... может быть, мне не стоило упускать это в моем дизайне выше. - person 486DX2-66; 26.11.2013
comment
Вы не ограничены двумя буферами, вы можете создать ограниченную очередь из буферов и предварительно выделить их (и, конечно, использовать их повторно). Если ваш поток записи файлов не может записать все данные, ваш поток-производитель необходимо немного замедлить. На самом деле это противодавление, необходимое для предотвращения ошибки нехватки памяти. - person Evgeny Lazin; 26.11.2013

concurrent_queue<T, Size>

У меня есть вопрос, как заставить рабочий поток работать, как только есть работа, и спать, когда работы нет.

Есть boost::lockfree::spsc_queue - подождите- бесплатная очередь с одним производителем и одним потребителем. Его можно настроить на время компиляции (размер внутреннего кольцевого буфера).

Насколько я понимаю, вам нужно что-то похожее на следующую конфигурацию:

template<typename T, size_t N>
class concurrent_queue
{
    // T can be wrapped into struct with padding in order to avoid false sharing
    mutable boost::lockfree::spsc_queue<T, boost::lockfree::capacity<N>> q;
    mutable mutex m;
    mutable condition_variable c;

    void wait() const
    {
        unique_lock<mutex> u(m);
        c.wait_for(u, chrono::microseconds(1)); // Or whatever period you need.
        // Timeout is required, because modification happens not under mutex
        //     and notification can be lost.
        // Another option is just to use sleep/yield, without notifications.
    }
    void notify() const
    {
        c.notify_one();
    }
public:
    void push(const T &t)
    {
        while(!q.push(t))
            wait();
        notify();
    }
    void pop(T &result)
    {
        while(!q.pop(result))
            wait();
        notify();
    }
};

Когда в очереди есть элементы - pop не блокируется. А когда во внутреннем буфере достаточно места - push не блокируется.


concurrent<T>

Я хочу максимально сократить время форматирования и записи, поэтому планирую сократить и то, и другое.

Посмотрите доклад Херба Саттера на конференции C ++ и последующий период 2012: C ++ Concurrency. На стр. 14 он показывает пример concurrent<T> . По сути, это оболочка вокруг объекта типа T, которая запускает отдельный поток для выполнения всех операций с этим объектом. Использование:

concurrent<ostream*> x(&cout); // starts thread internally
// ...
// x acts as function object.
// It's function call operator accepts action
//   which is performed on wrapped object in separate thread.
int i = 42;
x([i](ostream *out){ *out << "i=" << i; }); // passing lambda as action

Вы можете использовать аналогичный шаблон, чтобы переложить всю работу по форматированию на потребительский поток.


Оптимизация малых объектов

В противном случае выделяются новые буферы, и я хочу избежать выделения памяти после создания буферного потока.

В приведенном выше примере concurrent_queue<T, Size> используется буфер фиксированного размера, который полностью содержится в очереди и не предполагает дополнительных выделений.

Однако в concurrent<T> примере Херба std::function используется для передачи действия рабочему потоку. Это может повлечь за собой дорогостоящие перераспределения.

std::function реализации могут использовать оптимизацию малых объектов (как и большинство реализаций) - небольшие функциональные объекты создаются копией на месте во внутреннем буфере, но нет никакой гарантии, а для функциональных объектов, превышающих пороговое значение, произойдет выделение кучи.

Есть несколько способов избежать этого распределения:

  1. Реализуйте std::function аналог с внутренним буфером, достаточно большим для хранения объектов целевой функции (например, вы можете попробовать изменить boost::function или эту версию ).

  2. Используйте свой собственный объект функции, который будет представлять все типы сообщений журнала. В основном он будет содержать только значения, необходимые для форматирования сообщения. Поскольку потенциально существуют разные типы сообщений, рассмотрите возможность использования boost::variant < / a> (который является литературным объединением в сочетании с тегом типа) для их представления.

Собирая все вместе, вот доказательство концепции (с использованием второго варианта):

Живая демонстрация

#include <boost/lockfree/spsc_queue.hpp>
#include <boost/optional.hpp>
#include <boost/variant.hpp>

#include <condition_variable>
#include <iostream>
#include <cstddef>
#include <thread>
#include <chrono>
#include <mutex>

using namespace std;

/*********************************************/
template<typename T, size_t N>
class concurrent_queue
{
    mutable boost::lockfree::spsc_queue<T, boost::lockfree::capacity<N>> q;
    mutable mutex m;
    mutable condition_variable c;

    void wait() const
    {
        unique_lock<mutex> u(m);
        c.wait_for(u, chrono::microseconds(1));
    }
    void notify() const
    {
        c.notify_one();
    }
public:
    void push(const T &t)
    {
        while(!q.push(t))
            wait();
        notify();
    }
    void pop(T &result)
    {
        while(!q.pop(result))
            wait();
        notify();
    }
};

/*********************************************/
template<typename T, typename F>
class concurrent
{
    typedef boost::optional<F> Job;

    mutable concurrent_queue<Job, 16> q; // use custom size
    mutable T x;
    thread worker;

public:
    concurrent(T x)
        : x{x}, worker{[this]
        {
            Job j;
            while(true)
            {
                q.pop(j);
                if(!j) break;
                (*j)(this->x); // you may need to handle exceptions in some way
            }
        }}
    {}
    void operator()(const F &f)
    {
        q.push(Job{f});
    }
    ~concurrent()
    {
        q.push(Job{});
        worker.join();
    }
};

/*********************************************/
struct LogEntry
{
    struct Formatter
    {
        typedef void result_type;
        ostream *out;

        void operator()(double x) const
        {
            *out << "floating point: " << x << endl;
        }
        void operator()(int x) const
        {
            *out << "integer: " << x << endl;
        }
    };
    boost::variant<int, double> data;

    void operator()(ostream *out)
    {
        boost::apply_visitor(Formatter{out}, data);
    }
};

/*********************************************/
int main()
{
    concurrent<ostream*, LogEntry> log{&cout};

    for(int i=0; i!=1024; ++i)
    {
        log({i});
        log({i/10.});
    }
}
person Evgeny Panasyuk    schedule 25.11.2013
comment
Спасибо за предложение. Мне очень нравится часть о переносе форматирования в рабочий поток. Я посмотрю, есть ли способ включить это. 2 вещи, в которых я не уверен в приведенном выше примере. 1. Мне не нравится c.wait_for (u, chrono :: microseconds (1)). Там тратится ненужное время. 2. Их буфер может быть исправлен, но каково имя шаблона? Если я храню отдельные символы, то ввод и вывод из буфера становится менее эффективным. Если вместо этого я сохраняю строки, буфер должен быть достаточно большим, и я хочу избежать выделения / перераспределения после создания. - person 486DX2-66; 26.11.2013
comment
Настоящая проблема, которую я здесь вижу, заключается не столько в очереди или кольцевом буфере ... она связана с координацией рабочих потоков и потоков приложения - 1. Заставить поток приложения ждать, когда пространство недоступно или когда он должен ждать буфер должен быть очищен, чтобы он мог выполнять прямую запись. или 2. Заставить рабочий поток ждать, когда нет работы, но немедленно просыпаться, когда есть работа (когда buffer_start! = buffer_end). Это сложно. В идеале должна быть такая конструкция wait_on_function (), что блокировка разблокируется, когда funct ptr == true, и блокируется, когда funct_ptr == false в реальном времени. - person 486DX2-66; 26.11.2013
comment
а) Что касается wait_for - обратите внимание, он просыпается, если истекает период времени ИЛИ уведомление получено. Требуется тайм-аут, потому что модификация происходит не под мьютексом, и уведомление может быть потеряно. б) Если вам нужно хранить строки в LogEntry, вы можете использовать фиксированный буфер, например std::array<char, 32> - просто добавьте его в вариант. Если ваша строка журнала превышает пороговое значение, вы можете использовать динамически выделяемую строку (добавить std :: string к варианту: boost::variant<int, double, std::array<char , 32>, std::string> или объединить массив и строку в класс SSO_String) или разделить строку на несколько сообщений. - person Evgeny Panasyuk; 26.11.2013
comment
c) Относительно ожидающего потока приложения в полной очереди: эта ситуация должна быть обработана в любом случае. Использование неограниченных очередей просто замаскирует проблему - очередь по-прежнему не может расти до бесконечности. Если производитель создает значения намного быстрее, чем потребители их обрабатывают, он в конечном итоге потребляет полную память. Существуют разные варианты, например: замедлить производителя (т.е. заблокировать его, как в нашем примере), удалить старые необработанные элементы, удалить текущий элемент. г) Что касается пробуждения рабочего потока - обратите внимание, он может получать уведомление, когда элементы доступны, и просыпается. - person Evgeny Panasyuk; 26.11.2013
comment
Вы измеряете производительность? Форматирование не настолько сложное, чтобы перенести его на другой поток. Это вызовет чрезмерное копирование данных. Кроме того, ostream может быть узким местом, он медленный, с ошибками (дополнительное копирование), и запись в него сообщения за сообщением может быть неэффективной. - person Evgeny Lazin; 26.11.2013
comment
@Lazin О форматировании - это вопрос к OP, я не знаю, какое форматирование он делает - он просто хотел разгрузить все операции. Это вызовет чрезмерное копирование данных. - Ну, это зависит от обстоятельств. Если мы просто отправим 16 байтов значений, которые должны быть отформатированы в строку из 100 символов, то выделение или копирование такой строки в потоке приложения будет медленнее. Что касается ostream - очевидно, что это было просто для целей автономной и рабочей демонстрации (и да, iostreams медленные), конечно, OP не входит в std::cout, и с описанным подходом легко использовать другие типы приемников. - person Evgeny Panasyuk; 26.11.2013
comment
Если он использует форматирование в стиле C, строки формата должны копироваться вместе с параметрами. - person Evgeny Lazin; 26.11.2013
comment
Только в подмножестве случаев. Строка формата может быть связана с соответствующим типом сообщения (см. Код LogEntry в моем примере, я использовал разное форматирование для разных типов). Или, если используется sso_function, тогда [value](Out &o) { o << "format string " << value; }. - person Evgeny Panasyuk; 26.11.2013
comment
Большинство операций форматирования строк в моем случае очень простые, целые числа, числа с плавающей запятой и двойные. Типичный шаблон использования: result = ‹7-символьный токен› _ ‹label› ‹newline› ‹5-символьный токен› ‹двойной / целочисленный результат› ‹newline› ‹5-символьный токен›. Этот шаблон широко используется. Также есть несколько случаев, когда ostrstream используется для выгрузки данных в поток, а затем для него вызывается .str (), чтобы получить массив символов для регистрации. Также есть несколько случаев, когда мы объединяем данные в строку. Это то, что я хочу устранить при первом проходе, затем снова протестировать и посмотреть, что осталось. - person 486DX2-66; 26.11.2013
comment
Кроме того, мне не принадлежит весь код на уровне приложения, который отвечает за обработку / печать / форматирование данных. Поэтому, если бы я использовал новый метод обработки форматирования в другом потоке, мне пришлось бы работать с текущими владельцами. Если рентабельности инвестиций достаточно, то это оправдывает ее. Но пока я знаю, что могу сэкономить много времени, просто заменив текущий класс ведения журнала (который на самом деле просто fwrite ()) приведенным ниже кодом. Вторым этапом будет устранение конкатенации потока и строки, как описано выше. В этом была вся мотивация. - person 486DX2-66; 26.11.2013
comment
В настоящее время мы объединяем в строковый буфер даже тысячи символов, потому что дешевле сделать это и записать один раз, чем сотни раз. Однако, если новый поток регистрации возвращается почти сразу (скорость memcpy для нормального случая, когда буфер не заполнен), тогда я могу писать в поток чаще и избегать конкатенации потоков и строк, а также выделения памяти, которое происходит с этим. И затем я также избегаю шага вызова .str () в потоке или строке C ++ и выделения нового массива символов ... поэтому я думаю, что это сэкономит большую часть времени. - person 486DX2-66; 26.11.2013
comment
@EvgenyPanasyuk, я играл с твоим образцом кода. Я обнаружил, что если я не использую функцию ожидания, время записи в журнал сокращается в 10 раз. Также для простого примера вывод также вообще не изменился даже без использования ожидания. Обязательно ли использовать ожидание? - person Naveen Sharma; 03.01.2015
comment
@NaveenSharma Без wait он занят ожиданием, потребляя процессорное время. Да, это еще должно работать. Вам следует использовать политику, которая лучше соответствует вашим потребностям. Например, возможно, вам нужно что-то на основе condition_variable (например, ). - person Evgeny Panasyuk; 17.01.2015
comment
@EvgenyPanasyuk, спасибо! Все еще изучаю многопоточность и различные способы повышения эффективности. Если возможно, можете ли вы подробнее рассказать о том, какие условия не оправдывают ожидания с занятостью? еще раз спасибо. - person Naveen Sharma; 22.01.2015