Несколько производителей один потребитель

У меня проблемы с пониманием проблемы нескольких производителей и одного потребителя. Я работаю над заданием, и я не уверен, как работает создание двух производителей. Я понимаю, как работает проблема с одним производителем/потребителем, но я не могу понять, как работать с несколькими производителями, мне нужно создать два отдельных потока для каждого производителя, если это так, как заполнение очереди своими «произведенными данными» должно ли одному из производителей спать, в то время как другой производитель заполняет, скажем, один элемент данных, а затем они переключаются туда и обратно, пока буфер очереди не заполнится?

Просто ищу объяснение, так как я не совсем понимаю, как это будет работать (прежде чем кто-то сделает предложение, я ищу кого-то, кто сделает мою домашнюю работу, это не тот случай, просто ищу полезную информацию, чтобы прояснить свои мысли по этому поводу, чтобы я мог реализовать это я сам)

Я просмотрел множество других вопросов/тем по этому поводу на этом и других веб-сайтах и ​​до сих пор не смог прийти к выводу о своем ответе.

Спасибо!


person Tom    schedule 14.10.2016    source источник
comment
Быстрый пример. Это как туалет на заправке. Вы (потребитель) должны получить ключ от туалета (мьютекс/замок) от обслуживающего персонала (производителя), чтобы использовать ресурс (туалет). На заправочной станции может быть много обслуживающего персонала, проверяющего клиентов, моющего пол и т. д., и все заняты своими делами в ожидании возврата ключа. На заправочной станции может быть много потребителей, которые заправляются бензином, покупают еду и т. д. и все заняты своими делами, ожидая освобождения в туалете. Когда это так, другой потребитель может получить ключ от туалета.   -  person yano    schedule 14.10.2016
comment
Хорошо, это определенно полезно, так что я понимаю, что я могу просто определить два потока производителя, и заставить их обоих «принимать заказы», ​​и какой из них доберется до него первым, доберется до него первым? Мне не нужно было бы, чтобы один из них мог «работать» одновременно, но просто соревноваться за способность выполнять задачу?   -  person Tom    schedule 14.10.2016
comment
Единственное время, когда потоки должны конкурировать, — это когда они должны совместно использовать ресурс. Например, буфер. Потоки-производители вставляют информацию в буфер, а потребители извлекают ее. Доступ к буферу должен быть защищен, чтобы 2 потока-производителя не записывали свои данные в одно и то же место; в противном случае вы получите поврежденные данные. То же самое и с потребительскими потоками... без синхронизации потоков с общим ресурсом у вас никогда не будет четко определенного состояния для ресурса, и тогда наступит хаос. Когда потоки не обращаются к общему ресурсу, вы хотите, чтобы они не выполняли   -  person yano    schedule 16.10.2016
comment
их собственное дело параллельно, вот где вы получаете повышение скорости от потоков. Вы хотите, чтобы критический раздел (часть, где потоки обращаются к общему ресурсу) был как можно меньше. Если у вас есть 100 потоков, но они тратят все свое время на попытки доступа к общему ресурсу, вы не станете лучше, чем если бы у вас был только 1 поток, и на самом деле вы, вероятно, будете хуже b/c раскручивание нитей требует времени.   -  person yano    schedule 16.10.2016


Ответы (1)


Вот мое решение, использующее только системные вызовы pipe и select для реализации MPSCQ. На следующей диаграмме показано, как это работает:

<producer-thread-1>  {msg produced in heap}
  \
   \  /* only address of msg objects were sent to pipe[1] */
    \
     pipe[1] >>>(kernel)>>> pipe[0]  <consumer-thread>:
    /                                1. polling from pipe[0]
   /                                 2. restore msg object via address ptr
  /                                  3. process then delete the msg object
<producer-thread-2>  {msg produced in heap}

Демонстрационный код написан на C++ для инкапсуляции очереди в класс, и функции C++11/14/17 не использовались. Во-первых, это класс очереди в форме шаблона:

// mpscq.hpp
#include <sys/select.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>

#define PTR_SIZE (sizeof(void*))

template<class T> class MPSCQ { // Multi Producer Single Consumer Queue
public:
        MPSCQ() {
                int pipe_fd_set[2];
                pipe(pipe_fd_set); // err-handler omitted for this demo
                _fdProducer = pipe_fd_set[1];
                _fdConsumer = pipe_fd_set[0];
        }
        ~MPSCQ() { /* pipe close omitted for this demo */ }
        int producerPush(const T* t) {
                // will be blocked when pipe is full, should always return PTR_SIZE
                return t == NULL ? 0 : write(_fdProducer, &t, PTR_SIZE);
        }
        T* consumerPoll(int timeout = 1);
private:
        int _selectFdConsumer(int timeout);
private:
        int _fdProducer; // pipe_fd_set[1]
        int _fdConsumer; // pipe_fd_set[0]
};

template<class T> T* MPSCQ<T>::consumerPoll(int timeout) {
        if (_selectFdConsumer(timeout) <= 0) {  // timeout or error
                return NULL;
        }
        char ptr_buff[PTR_SIZE];
        ssize_t r = read(_fdConsumer, ptr_buff, PTR_SIZE);
        if (r <= 0) {
                fprintf(stderr, "consumer read EOF or error, r=%d, errno=%d\n", r, errno);
                return NULL;
        }
        T* t;
        memcpy(&t, ptr_buff, PTR_SIZE); // cast received bytes to T*
        return t;
}

template<class T> int MPSCQ<T>::_selectFdConsumer(int timeout) {
        int nfds = _fdConsumer + 1;
        fd_set readfds;
        struct timeval tv;
        while (true) {
                tv.tv_sec = timeout;
                tv.tv_usec = 0;
                FD_ZERO(&readfds);
                FD_SET(_fdConsumer, &readfds);
                int r = select(nfds, &readfds, NULL, NULL, &tv);
                if (r < 0 && errno == EINTR) {
                        continue;
                }
                return r;
        }
}

Затем идет тестовый случай: 4 потока-производителя выдают 1..100000, а один поток-потребитель суммирует их.

// g++ -o mpscq mpscq.cpp -lpthread
#include "mpscq.hpp"
#include <sys/types.h>
#include <pthread.h>

#define PER_THREAD_LOOPS        25000
#define SAMPLE_INTERVAL         10000
#define PRODUCER_THREAD_NUM     4

struct TestMsg {
        int _msgId;     // a dummy demo member
        int64_t _val;   // _val < 0 is an end flag
        TestMsg(int msg_id, int64_t val) :
                _msgId(msg_id),
                _val(val) { };
};

static MPSCQ<TestMsg> TEST_QUEUE;

void* functor_producer(void* arg) {
        int* task_seg = (int*) arg;
        TestMsg* msg;
        for (int i = 0; i <= PER_THREAD_LOOPS; ++ i) {
                int64_t id = PER_THREAD_LOOPS * (*task_seg) + i;
                msg = new TestMsg(id, i >= PER_THREAD_LOOPS ? -1 : id + 1);
                TEST_QUEUE.producerPush(msg);
        }
        delete task_seg;
        return NULL;
}

void* functor_consumer(void* arg) {
        int64_t* sum = (int64_t*)arg;
        int msg_cnt = 0;
        int stop_cnt = 0; // for shutdown gracefully
        TestMsg* msg;
        while (true) {
                if ((msg = TEST_QUEUE.consumerPoll()) == NULL) {
                        continue;
                }
                int64_t val = msg->_val;
                delete msg; // this delete is essential to prevent memory leak
                if (val <= 0) {
                        if ((++ stop_cnt) >= PRODUCER_THREAD_NUM) {
                                printf("all done, sum=%ld\n", *sum);
                                break;
                        }
                } else {
                        *sum += val;
                        if ((++ msg_cnt) % SAMPLE_INTERVAL == 0) {
                                printf("msg_cnt=%d, sum=%ld\n", msg_cnt, *sum);
                        }
                }
        }
        return NULL;
}

int main(int argc, char* const* argv) {
        int64_t sum = 0;
        printf("PTR_SIZE: %d, target: sum(1..%d)\n", PTR_SIZE, PRODUCER_THREAD_NUM * PER_THREAD_LOOPS);
        pthread_t consumer;
        pthread_create(&consumer, NULL, functor_consumer, &sum);
        pthread_t producers[PRODUCER_THREAD_NUM];
        for (int i = 0; i < PRODUCER_THREAD_NUM; ++ i) {
                pthread_create(&producers[i], NULL, functor_producer, new int(i));
        }
        for (int i = 0; i < PRODUCER_THREAD_NUM; ++ i) {
                pthread_join(producers[i], NULL);
        }
        pthread_join(consumer, NULL);
        return 0;
}

Пример результата теста:

$ ./mpscq 
PTR_SIZE: 8, target: sum(1..100000)
msg_cnt=10000, sum=490096931
msg_cnt=20000, sum=888646187
msg_cnt=30000, sum=1282852073
msg_cnt=40000, sum=1606611602
msg_cnt=50000, sum=2088863858
msg_cnt=60000, sum=2573791058
msg_cnt=70000, sum=3180398370
msg_cnt=80000, sum=3768718659
msg_cnt=90000, sum=4336431164
msg_cnt=100000, sum=5000050000
all done, sum=5000050000

Реализованный здесь MPSCQ представляет собой шаблон передачи сообщений и позволяет ядру справиться со сложностью операций с внутренней очередью. Побочным эффектом трюка является то, что при большой рабочей нагрузке на стороне потребителя будет слишком много вызовов select, что значительно повлияет на производительность. (В этой демонстрации каждый раз, когда потребитель просто извлекает 8 байтов. Чтобы облегчить это, потребитель должен поддерживать дополнительный буфер приема.)

person peihan    schedule 14.12.2020