Вопрос:
3, а приведенные ниже циклы содержат закомментированный код. Я ищу ("TAG1", "TAG2" и "TAG3") для легкой идентификации. Я просто хочу, чтобы циклы while ждали, пока проверяемое условие станет истинным, прежде чем продолжить, минимизируя ресурсы ЦП в максимально возможной степени. Сначала я попытался использовать переменные условия Boost, но есть условие гонки. Перевод потока в спящий режим на «x» микросекунд неэффективен, поскольку нет способа точно рассчитать время пробуждения. Наконец, boost :: this_thread :: yield (), похоже, ничего не делает. Вероятно, потому что у меня только 2 активных потока в двухъядерной системе. В частности, как я могу сделать три указанные ниже области с тегами работать более эффективно, вводя как можно меньше ненужных блокировок.
ИСТОРИЯ
Задача:
У меня есть приложение, которое регистрирует много данных. После профилирования я обнаружил, что много времени уходит на операции регистрации (запись текста или двоичного файла в файл на локальном жестком диске). Моя цель - уменьшить задержку при вызовах logData, заменив непоточные вызовы прямой записи на вызовы поточного регистратора с буферизацией потоков.
Изученные варианты:
- Обновить медленный жесткий диск эпохи 2005 года до SSD ... возможно. Стоимость не является непомерно высокой ... но требует большого объема работы ... необходимо обновить более 200 компьютеров ...
- Увеличьте ASIO ... Мне не нужны все накладные расходы проактора / сети, я ищу что-то более простое и легкое.
Дизайн:
- При использовании шаблона потока-производителя и потребителя приложение записывает данные в буфер, а фоновый поток затем записывает их на диск через некоторое время. Таким образом, конечная цель состоит в том, чтобы функция writeMessage, вызываемая уровнем приложения, возвращалась как можно быстрее, в то время как данные корректно / полностью записываются в файл журнала в порядке FIFO через некоторое время.
- Только один поток приложения, только один поток писателя.
- На основе кольцевого буфера. Причина этого решения - использовать как можно меньше блокировок и в идеале ... и, пожалуйста, поправьте меня, если я ошибаюсь ... Я не думаю, что они мне нужны.
- Буфер - это статически распределенный массив символов, но при необходимости / желании его можно переместить в кучу из соображений производительности.
- У буфера есть начальный указатель, который указывает на следующий символ, который должен быть записан в файл. У буфера есть конечный указатель, который указывает на индекс массива после последнего символа, который должен быть записан в файл. Конечный указатель НИКОГДА не проходит мимо начального указателя. Если приходит сообщение, размер которого превышает размер буфера, то писатель ждет, пока буфер не будет опустошен, и записывает новое сообщение в файл напрямую, не помещая сообщение слишком большого размера в буфер (как только буфер опустошается, рабочий поток не буду ничего писать, так что никаких споров).
- Писатель (рабочий поток) обновляет только начальный указатель кольцевого буфера.
- Основной (поток приложения) обновляет только конечный указатель кольцевого буфера, и, опять же, он только вставляет новые данные в буфер, когда есть доступное пространство ... в противном случае он либо ждет, пока пространство в буфере станет доступным, либо записывает напрямую, как описано выше.
- Рабочий поток постоянно проверяет, есть ли данные для записи (на это указывает случай, когда указатель начала буфера! = Указатель конца буфера). Если нет данных для записи, рабочий поток в идеале должен перейти в спящий режим и проснуться после того, как поток приложения вставил что-то в буфер (и изменил конечный указатель буфера таким образом, чтобы он больше не указывал на тот же индекс, что и начальный указатель). То, что у меня ниже, включает в себя циклы while, непрерывно проверяющие это условие. Это очень плохой / неэффективный способ ожидания в буфере.
Полученные результаты:
- На моем двухъядерном ноутбуке 2009 года с твердотельным накопителем я вижу, что общее время записи в тесте с потоковой / буферизацией по сравнению с прямой записью составляет примерно 1: 6 (0,609 с против 0,095 с), но сильно варьируется. Часто тест буферизованной записи на самом деле медленнее, чем прямая запись. Я считаю, что изменчивость связана с плохой реализацией ожидания освобождения места в буфере, ожидания опустошения буфера и ожидания рабочего потока, когда работа станет доступной. Я измерил, что некоторые из циклов while потребляют более 10000 циклов, и я подозреваю, что эти циклы фактически конкурируют за аппаратные ресурсы, которые требуются другому потоку (рабочему или приложению) для завершения ожидаемых вычислений.
- Выход, кажется, проверяется. При включенном режиме TEST и небольшом размере буфера 10 в качестве стресс-теста я сравнил сотни мегабайт вывода и обнаружил, что он равен входному.
Компилируется с текущей версией Boost (1.55)
Заголовок
#ifndef BufferedLogStream_h
#define BufferedLogStream_h
#include <stdio.h>
#include <iostream>
#include <iostream>
#include <cstdlib>
#include "boost\chrono\chrono.hpp"
#include "boost\thread\thread.hpp"
#include "boost\thread\locks.hpp"
#include "boost\thread\mutex.hpp"
#include "boost\thread\condition_variable.hpp"
#include <time.h>
using namespace std;
#define BENCHMARK_STR_SIZE 128
#define NUM_BENCHMARK_WRITES 524288
#define TEST 0
#define BENCHMARK 1
#define WORKER_LOOP_WAIT_MICROSEC 20
#define MAIN_LOOP_WAIT_MICROSEC 10
#if(TEST)
#define BUFFER_SIZE 10
#else
#define BUFFER_SIZE 33554432 //4 MB
#endif
class BufferedLogStream {
public:
BufferedLogStream();
void openFile(char* filename);
void flush();
void close();
inline void writeMessage(const char* message, unsigned int length);
void writeMessage(string message);
bool operator() () { return start != end; }
private:
void threadedWriter();
inline bool hasSomethingToWrite();
inline unsigned int getFreeSpaceInBuffer();
void appendStringToBuffer(const char* message, unsigned int length);
FILE* fp;
char* start;
char* end;
char* endofringbuffer;
char ringbuffer[BUFFER_SIZE];
bool workerthreadkeepalive;
boost::mutex mtx;
boost::condition_variable waitforempty;
boost::mutex workmtx;
boost::condition_variable waitforwork;
#if(TEST)
struct testbuffer {
int length;
char message[BUFFER_SIZE * 2];
};
public:
void test();
private:
void getNextRandomTest(testbuffer &tb);
FILE* datatowrite;
#endif
#if(BENCHMARK)
public:
void runBenchmark();
private:
void initBenchmarkString();
void runDirectWriteBaseline();
void runBufferedWriteBenchmark();
char benchmarkstr[BENCHMARK_STR_SIZE];
#endif
};
#if(TEST)
int main() {
BufferedLogStream* bl = new BufferedLogStream();
bl->openFile("replicated.txt");
bl->test();
bl->close();
cout << "Done" << endl;
cin.get();
return 0;
}
#endif
#if(BENCHMARK)
int main() {
BufferedLogStream* bl = new BufferedLogStream();
bl->runBenchmark();
cout << "Done" << endl;
cin.get();
return 0;
}
#endif //for benchmark
#endif
Реализация
#include "BufferedLogStream.h"
BufferedLogStream::BufferedLogStream() {
fp = NULL;
start = ringbuffer;
end = ringbuffer;
endofringbuffer = ringbuffer + BUFFER_SIZE;
workerthreadkeepalive = true;
}
void BufferedLogStream::openFile(char* filename) {
if(fp) close();
workerthreadkeepalive = true;
boost::thread t2(&BufferedLogStream::threadedWriter, this);
fp = fopen(filename, "w+b");
}
void BufferedLogStream::flush() {
fflush(fp);
}
void BufferedLogStream::close() {
workerthreadkeepalive = false;
if(!fp) return;
while(hasSomethingToWrite()) {
boost::unique_lock<boost::mutex> u(mtx);
waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
}
flush();
fclose(fp);
fp = NULL;
}
void BufferedLogStream::threadedWriter() {
while(true) {
if(start != end) {
char* currentend = end;
if(start < currentend) {
fwrite(start, 1, currentend - start, fp);
}
else if(start > currentend) {
if(start != endofringbuffer) fwrite(start, 1, endofringbuffer - start, fp);
fwrite(ringbuffer, 1, currentend - ringbuffer, fp);
}
start = currentend;
waitforempty.notify_one();
}
else { //start == end...no work to do
if(!workerthreadkeepalive) return;
boost::unique_lock<boost::mutex> u(workmtx);
waitforwork.wait_for(u, boost::chrono::microseconds(WORKER_LOOP_WAIT_MICROSEC));
}
}
}
bool BufferedLogStream::hasSomethingToWrite() {
return start != end;
}
void BufferedLogStream::writeMessage(string message) {
writeMessage(message.c_str(), message.length());
}
unsigned int BufferedLogStream::getFreeSpaceInBuffer() {
if(end > start) return (start - ringbuffer) + (endofringbuffer - end) - 1;
if(end == start) return BUFFER_SIZE-1;
return start - end - 1; //case where start > end
}
void BufferedLogStream::appendStringToBuffer(const char* message, unsigned int length) {
if(end + length <= endofringbuffer) { //most common case for appropriately-sized buffer
memcpy(end, message, length);
end += length;
}
else {
int lengthtoendofbuffer = endofringbuffer - end;
if(lengthtoendofbuffer > 0) memcpy(end, message, lengthtoendofbuffer);
int remainderlength = length - lengthtoendofbuffer;
memcpy(ringbuffer, message + lengthtoendofbuffer, remainderlength);
end = ringbuffer + remainderlength;
}
}
void BufferedLogStream::writeMessage(const char* message, unsigned int length) {
if(length > BUFFER_SIZE - 1) { //if string is too large for buffer, wait for buffer to empty and bypass buffer, write directly to file
while(hasSomethingToWrite()); {
boost::unique_lock<boost::mutex> u(mtx);
waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
}
fwrite(message, 1, length, fp);
}
else {
//wait until there is enough free space to insert new string
while(getFreeSpaceInBuffer() < length) {
boost::unique_lock<boost::mutex> u(mtx);
waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
}
appendStringToBuffer(message, length);
}
waitforwork.notify_one();
}
#if(TEST)
void BufferedLogStream::getNextRandomTest(testbuffer &tb) {
tb.length = 1 + (rand() % (int)(BUFFER_SIZE * 1.05));
for(int i = 0; i < tb.length; i++) {
tb.message[i] = rand() % 26 + 65;
}
tb.message[tb.length] = '\n';
tb.length++;
tb.message[tb.length] = '\0';
}
void BufferedLogStream::test() {
cout << "Buffer size is: " << BUFFER_SIZE << endl;
testbuffer tb;
datatowrite = fopen("orig.txt", "w+b");
for(unsigned int i = 0; i < 7000000; i++) {
if(i % 1000000 == 0) cout << i << endl;
getNextRandomTest(tb);
writeMessage(tb.message, tb.length);
fwrite(tb.message, 1, tb.length, datatowrite);
}
fflush(datatowrite);
fclose(datatowrite);
}
#endif
#if(BENCHMARK)
void BufferedLogStream::initBenchmarkString() {
for(unsigned int i = 0; i < BENCHMARK_STR_SIZE - 1; i++) {
benchmarkstr[i] = rand() % 26 + 65;
}
benchmarkstr[BENCHMARK_STR_SIZE - 1] = '\n';
}
void BufferedLogStream::runDirectWriteBaseline() {
clock_t starttime = clock();
fp = fopen("BenchMarkBaseline.txt", "w+b");
for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
fwrite(benchmarkstr, 1, BENCHMARK_STR_SIZE, fp);
}
fflush(fp);
fclose(fp);
clock_t elapsedtime = clock() - starttime;
cout << "Direct write baseline took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
}
void BufferedLogStream::runBufferedWriteBenchmark() {
clock_t starttime = clock();
openFile("BufferedBenchmark.txt");
cout << "Opend file" << endl;
for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
writeMessage(benchmarkstr, BENCHMARK_STR_SIZE);
}
cout << "Wrote" << endl;
close();
cout << "Close" << endl;
clock_t elapsedtime = clock() - starttime;
cout << "Buffered write took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
}
void BufferedLogStream::runBenchmark() {
cout << "Buffer size is: " << BUFFER_SIZE << endl;
initBenchmarkString();
runDirectWriteBaseline();
runBufferedWriteBenchmark();
}
#endif
Обновление: 25 ноября 2013 г.
Я обновил приведенный ниже код, используя boost :: condition_variables, в частности метод wait_for (), рекомендованный Евгением Панасюком. Это позволяет избежать ненужной проверки одного и того же условия снова и снова. В настоящее время я вижу, что версия с буферизацией выполняется примерно в 1/6 раза быстрее, чем версия без буферизации / с прямой записью. Это не идеальный случай, потому что оба случая ограничены жестким диском (в моем случае SSD 2010 года). Я планирую использовать приведенный ниже код в среде, где жесткий диск не будет узким местом, и в большинстве случаев, если не всегда, в буфере должно быть место для размещения запросов writeMessage. Это подводит меня к следующему вопросу. Насколько большим я должен сделать буфер? Я не против выделить 32 МБ или 64 МБ, чтобы он никогда не заполнялся. Код будет работать в системах, в которых это можно сэкономить. Интуитивно я считаю, что статически выделять массив символов размером 32 МБ - плохая идея. Это? Во всяком случае, я ожидаю, что когда я запущу приведенный ниже код для предполагаемого приложения, задержка вызовов logData () будет значительно уменьшена, что приведет к значительному сокращению общего времени обработки.
Если кто-то видит способ улучшить приведенный ниже код (быстрее, надежнее, компактнее и т. Д.), Сообщите мне. Я ценю обратную связь. Лазин, как бы ваш подход был быстрее или эффективнее, чем то, что я опубликовал ниже? Мне нравится идея иметь только один буфер и сделать его достаточно большим, чтобы он практически никогда не заполнялся. Тогда мне не нужно беспокоиться о чтении из разных буферов. Евгений Панасюк, мне нравится подход по возможности использовать существующий код, особенно если это уже существующая библиотека boost. Однако я также не вижу, насколько spcs_queue более эффективен, чем то, что показано ниже. Я бы предпочел иметь дело с одним большим буфером, чем с множеством более мелких, и мне пришлось бы беспокоиться о разделении моего входного потока на вход и объединении его обратно на выходе. Ваш подход позволит мне перенести форматирование из основного потока в рабочий поток. Это резкий подход. Но я еще не уверен, сэкономит ли это много времени, и чтобы получить полную выгоду, мне пришлось бы изменить код, которым я не владею.
// Конец обновления
boost::lockfree::spsc_queue
- это очередь с одним производителем и одним потребителем без ожидания. Его можно настроить на время компиляции (размер внутреннего кольцевого буфера). - person Evgeny Panasyuk   schedule 25.11.2013