Вот мое решение, использующее только системные вызовы pipe
и select
для реализации MPSCQ. На следующей диаграмме показано, как это работает:
<producer-thread-1> {msg produced in heap}
\
\ /* only address of msg objects were sent to pipe[1] */
\
pipe[1] >>>(kernel)>>> pipe[0] <consumer-thread>:
/ 1. polling from pipe[0]
/ 2. restore msg object via address ptr
/ 3. process then delete the msg object
<producer-thread-2> {msg produced in heap}
Демонстрационный код написан на C++ для инкапсуляции очереди в класс, и функции C++11/14/17 не использовались. Во-первых, это класс очереди в форме шаблона:
// mpscq.hpp
#include <sys/select.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#define PTR_SIZE (sizeof(void*))
template<class T> class MPSCQ { // Multi Producer Single Consumer Queue
public:
MPSCQ() {
int pipe_fd_set[2];
pipe(pipe_fd_set); // err-handler omitted for this demo
_fdProducer = pipe_fd_set[1];
_fdConsumer = pipe_fd_set[0];
}
~MPSCQ() { /* pipe close omitted for this demo */ }
int producerPush(const T* t) {
// will be blocked when pipe is full, should always return PTR_SIZE
return t == NULL ? 0 : write(_fdProducer, &t, PTR_SIZE);
}
T* consumerPoll(int timeout = 1);
private:
int _selectFdConsumer(int timeout);
private:
int _fdProducer; // pipe_fd_set[1]
int _fdConsumer; // pipe_fd_set[0]
};
template<class T> T* MPSCQ<T>::consumerPoll(int timeout) {
if (_selectFdConsumer(timeout) <= 0) { // timeout or error
return NULL;
}
char ptr_buff[PTR_SIZE];
ssize_t r = read(_fdConsumer, ptr_buff, PTR_SIZE);
if (r <= 0) {
fprintf(stderr, "consumer read EOF or error, r=%d, errno=%d\n", r, errno);
return NULL;
}
T* t;
memcpy(&t, ptr_buff, PTR_SIZE); // cast received bytes to T*
return t;
}
template<class T> int MPSCQ<T>::_selectFdConsumer(int timeout) {
int nfds = _fdConsumer + 1;
fd_set readfds;
struct timeval tv;
while (true) {
tv.tv_sec = timeout;
tv.tv_usec = 0;
FD_ZERO(&readfds);
FD_SET(_fdConsumer, &readfds);
int r = select(nfds, &readfds, NULL, NULL, &tv);
if (r < 0 && errno == EINTR) {
continue;
}
return r;
}
}
Затем идет тестовый случай: 4 потока-производителя выдают 1..100000, а один поток-потребитель суммирует их.
// g++ -o mpscq mpscq.cpp -lpthread
#include "mpscq.hpp"
#include <sys/types.h>
#include <pthread.h>
#define PER_THREAD_LOOPS 25000
#define SAMPLE_INTERVAL 10000
#define PRODUCER_THREAD_NUM 4
struct TestMsg {
int _msgId; // a dummy demo member
int64_t _val; // _val < 0 is an end flag
TestMsg(int msg_id, int64_t val) :
_msgId(msg_id),
_val(val) { };
};
static MPSCQ<TestMsg> TEST_QUEUE;
void* functor_producer(void* arg) {
int* task_seg = (int*) arg;
TestMsg* msg;
for (int i = 0; i <= PER_THREAD_LOOPS; ++ i) {
int64_t id = PER_THREAD_LOOPS * (*task_seg) + i;
msg = new TestMsg(id, i >= PER_THREAD_LOOPS ? -1 : id + 1);
TEST_QUEUE.producerPush(msg);
}
delete task_seg;
return NULL;
}
void* functor_consumer(void* arg) {
int64_t* sum = (int64_t*)arg;
int msg_cnt = 0;
int stop_cnt = 0; // for shutdown gracefully
TestMsg* msg;
while (true) {
if ((msg = TEST_QUEUE.consumerPoll()) == NULL) {
continue;
}
int64_t val = msg->_val;
delete msg; // this delete is essential to prevent memory leak
if (val <= 0) {
if ((++ stop_cnt) >= PRODUCER_THREAD_NUM) {
printf("all done, sum=%ld\n", *sum);
break;
}
} else {
*sum += val;
if ((++ msg_cnt) % SAMPLE_INTERVAL == 0) {
printf("msg_cnt=%d, sum=%ld\n", msg_cnt, *sum);
}
}
}
return NULL;
}
int main(int argc, char* const* argv) {
int64_t sum = 0;
printf("PTR_SIZE: %d, target: sum(1..%d)\n", PTR_SIZE, PRODUCER_THREAD_NUM * PER_THREAD_LOOPS);
pthread_t consumer;
pthread_create(&consumer, NULL, functor_consumer, &sum);
pthread_t producers[PRODUCER_THREAD_NUM];
for (int i = 0; i < PRODUCER_THREAD_NUM; ++ i) {
pthread_create(&producers[i], NULL, functor_producer, new int(i));
}
for (int i = 0; i < PRODUCER_THREAD_NUM; ++ i) {
pthread_join(producers[i], NULL);
}
pthread_join(consumer, NULL);
return 0;
}
Пример результата теста:
$ ./mpscq
PTR_SIZE: 8, target: sum(1..100000)
msg_cnt=10000, sum=490096931
msg_cnt=20000, sum=888646187
msg_cnt=30000, sum=1282852073
msg_cnt=40000, sum=1606611602
msg_cnt=50000, sum=2088863858
msg_cnt=60000, sum=2573791058
msg_cnt=70000, sum=3180398370
msg_cnt=80000, sum=3768718659
msg_cnt=90000, sum=4336431164
msg_cnt=100000, sum=5000050000
all done, sum=5000050000
Реализованный здесь MPSCQ представляет собой шаблон передачи сообщений и позволяет ядру справиться со сложностью операций с внутренней очередью. Побочным эффектом трюка является то, что при большой рабочей нагрузке на стороне потребителя будет слишком много вызовов select
, что значительно повлияет на производительность. (В этой демонстрации каждый раз, когда потребитель просто извлекает 8 байтов. Чтобы облегчить это, потребитель должен поддерживать дополнительный буфер приема.)
person
peihan
schedule
14.12.2020