производитель/потребитель многопоточных программ [boost]

Я играю с библиотекой boost и С++. Я хочу создать многопоточную программу, содержащую производителя, потребителя и стек. Производитель заполняет стек, потребитель удаляет элементы (int) из стека. все работает (поп, пуш, мьютекс) Но когда я вызываю поп/пуш в потоке, я не получаю никакого эффекта

я сделал этот простой код:

#include "stdafx.h"
#include <stack>
#include <iostream>
#include <algorithm>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/date_time.hpp> 
#include <boost/signals2/mutex.hpp>
#include <ctime>

using namespace std;

/ *
* this class reprents a stack which is proteced by mutex
* Pop and push are executed by one thread each time.
*/
class ProtectedStack{
private : 
stack<int> m_Stack;
boost::signals2::mutex m;

public : 
ProtectedStack(){
}
ProtectedStack(const ProtectedStack & p){

}
void push(int x){
    m.lock();
    m_Stack.push(x);
    m.unlock();
}

void pop(){
    m.lock();
    //return m_Stack.top();
    if(!m_Stack.empty())
        m_Stack.pop();
    m.unlock(); 
}
int size(){
    return m_Stack.size();
}
bool isEmpty(){
    return m_Stack.empty();
}
int top(){
    return m_Stack.top();
}
};

/*
*The producer is the class that fills the stack. It encapsulate the thread object 
*/

class Producer{
public:
Producer(int number ){
    //create thread here but don't start here
m_Number=number;


}
void fillStack (ProtectedStack& s ) {
    int object = 3; //random value
    s.push(object);
    //cout<<"push object\n";
}

void produce (ProtectedStack & s){
    //call fill within a thread 
    m_Thread = boost::thread(&Producer::fillStack,this, s);  
}

 private :
int m_Number;
boost::thread m_Thread;

};


/* The consumer will consume the products produced by the producer */ 

class Consumer {
private : 
int m_Number;
boost::thread m_Thread;
public:
Consumer(int n){
    m_Number = n;
}

void remove(ProtectedStack &s ) {

     if(s.isEmpty()){ // if the stack is empty sleep and wait for the producer      to fill the stack
        //cout<<"stack is empty\n";
        boost::posix_time::seconds workTime(1); 
        boost::this_thread::sleep(workTime);
     }
     else{
        s.pop(); //pop it
        //cout<<"pop object\n";

     }

}

void consume (ProtectedStack & s){
    //call remove within a thread 
    m_Thread = boost::thread(&Consumer::remove, this, s);  
}

};


int main(int argc, char* argv[])  
{  



ProtectedStack s;


    Producer p(0);
    p.produce(s);

    Producer p2(1);
    p2.produce(s);

    cout<<"size after production "<<s.size()<<endl;
    Consumer c(0);
    c.consume(s);
    Consumer c2(1);
    c2.consume(s);
    cout<<"size after consumption  "<<s.size()<<endl;

getchar();
return 0;  
}  

После того, как я запустил это в VC++ 2010/win7, я получил: 0 0

Не могли бы вы помочь мне понять, почему, когда я вызываю функцию fillStack из основного, я получаю эффект, но когда я вызываю ее из потока, ничего не происходит? Спасибо


person ezzakrem    schedule 23.10.2012    source источник


Ответы (3)


Основная проблема с вашим кодом заключается в том, что ваши потоки не синхронизированы. Помните, что по умолчанию выполнение потоков не упорядочено и не упорядочено, поэтому потоки-потребители на самом деле могут быть (и в вашем конкретном случае) завершены до того, как любой поток-производитель создаст какие-либо данные.

Чтобы убедиться, что потребители будут запущены после того, как производители закончат свою работу, вам нужно использовать функцию thread::join() в потоках производителей, она остановит выполнение основного потока до выхода производителей:

// Start producers
...

p.m_Thread.join();  // Wait p to complete
p2.m_Thread.join(); // Wait p2 to complete

// Start consumers
...

Это поможет, но, вероятно, это не очень хорошо для типичного случая использования производителя-потребителя.

Чтобы получить более полезный случай, вам нужно исправить потребительскую функцию. Ваша потребительская функция на самом деле не ждет произведенных данных, она просто завершит работу, если стек пуст, и никогда не потребляет никаких данных, если данные еще не были созданы.

Это должно быть так:

void remove(ProtectedStack &s)
{
   // Place your actual exit condition here,
   // e.g. count of consumed elements or some event
   // raised by producers meaning no more data available etc.
   // For testing/educational purpose it can be just while(true)
   while(!_some_exit_condition_)
   {
      if(s.isEmpty())
      {
          // Second sleeping is too big, use milliseconds instead
          boost::posix_time::milliseconds workTime(1); 
          boost::this_thread::sleep(workTime);               
      }               
      else
      {
         s.pop();
      }
   }
} 

Другая проблема заключается в неправильном использовании конструктора thread:

m_Thread = boost::thread(&Producer::fillStack, this, s);  

Цитата из Boost.Thread документация:

Конструктор потока с аргументами

template <class F,class A1,class A2,...> thread(F f,A1 a1,A2 a2,...);

Предварительные условия: F и каждый An должны быть копируемыми или перемещаемыми.

Эффекты: Как будто thread(boost::bind(f,a1,a2,...)). Следовательно, f и каждый an копируются во внутреннюю память для доступа нового потока.

Это означает, что каждый ваш поток получает свою собственную копию s, и все изменения применяются не к s, а к локальным копиям потока. Это тот же случай, когда вы передаете объект в аргумент функции по значению. Вместо этого вам нужно передать объект s по ссылке, используя boost::ref:

void produce(ProtectedStack& s)
{
   m_Thread = boost::thread(&Producer::fillStack, this, boost::ref(s));
}

void consume(ProtectedStack& s)
{
   m_Thread = boost::thread(&Consumer::remove, this, boost::ref(s));
}  

Еще одна проблема связана с использованием мьютекса. Это не лучшее из возможного.

  1. Почему вы используете мьютекс из библиотеки Signals2? Просто используйте boost::mutex из Boost.Thread и удалите ненужную зависимость от библиотеки Signals2.

  2. Используйте оболочку RAII boost::lock_guard< /a> вместо прямых вызовов lock/unlock.

  3. Как упоминали другие люди, вы должны защитить замком всех членов ProtectedStack.

Образец:

boost::mutex m;

void push(int x)
{ 
   boost::lock_guard<boost::mutex> lock(m);
   m_Stack.push(x);
} 

void pop()
{
   boost::lock_guard<boost::mutex> lock(m);
   if(!m_Stack.empty()) m_Stack.pop();
}              

int size()
{
   boost::lock_guard<boost::mutex> lock(m);
   return m_Stack.size();
}

bool isEmpty()
{
   boost::lock_guard<boost::mutex> lock(m);
   return m_Stack.empty();
}

int top()
{
   boost::lock_guard<boost::mutex> lock(m);
   return m_Stack.top();
}
person Rost    schedule 23.10.2012
comment
Спасибо за ваш ответ. Я изменил код, как вы упомянули, но результат все тот же. На самом деле проблема, похоже, вызвана вызовом потока. Когда я пишу p.fillStack, я заполняю стек. когда я вызываю p.fillStack из boost::thread, ничего не происходит. Мне это кажется странным, так как я новичок в C++/boost. - person ezzakrem; 23.10.2012
comment
@ezzakrem Похоже, я нашел основную причину вашей проблемы. Взгляните на обновленный ответ. - person Rost; 24.10.2012
comment
1 Большое спасибо!! он работал отлично. Я уже использовал указатели вместо ссылок, но с boost:: ref он работал отлично, особенно при использовании boost:: mutex с lock_gard. Ваш совет мне очень помог, спасибо еще раз. - person ezzakrem; 24.10.2012

В вашем примере кода есть несколько проблем с синхронизацией, как отмечают другие:

  • Отсутствуют блокировки при вызовах некоторых членов ProtectedStack.
  • Основной поток мог выйти, не позволив рабочим потокам присоединиться.
  • Производитель и потребитель не зацикливаются, как можно было бы ожидать. Производители должны всегда (когда они могут) производить, а потребители должны продолжать потреблять по мере того, как новые элементы помещаются в стек.
  • cout в основном потоке вполне может быть выполнен до того, как производители или потребители еще не успели поработать.

Я бы рекомендовал использовать переменную условия для синхронизации между вашими производителями и потребителями. Взгляните на пример производителя/потребителя здесь: http://en.cppreference.com/w/cpp/thread/condition_variable Это довольно новая функция в стандартной библиотеке C++11, которая поддерживается начиная с VS2012. До VS2012 вам нужно было либо повысить скорость, либо использовать вызовы Win32.

Использование условной переменной для решения проблемы производитель/потребитель хорошо, потому что оно почти навязывает использование мьютекса для блокировки общих данных и предоставляет механизм сигнализации, позволяющий потребителям знать, что что-то готово к использованию, чтобы они не имели такой спины. (что всегда является компромиссом между скоростью отклика потребителя и использованием ЦП при опросе очереди). Он также сам является атомарным, что предотвращает возможность того, что потоки пропустят сигнал о том, что есть что-то для потребления, как описано здесь: https://en.wikipedia.org/wiki/Sleeping_barber_problem

Чтобы дать краткое изложение того, как условная переменная заботится об этом...

  • Производитель выполняет все трудоемкие действия в своем потоке, не владея мьютексом.
  • Производитель блокирует мьютекс, добавляет созданный им элемент в глобальную структуру данных (вероятно, в какую-то очередь), отпускает мьютекс и сигнализирует об этом одному потребителю — в указанном порядке.
  • Потребитель, ожидающий переменную условия, автоматически повторно захватывает мьютекс, удаляет элемент из очереди и выполняет над ним некоторую обработку. В это время производитель уже работает над созданием нового элемента, но должен ждать, пока потребитель закончит, прежде чем он сможет поставить элемент в очередь.

Это окажет следующее влияние на ваш код:

  • Больше нет необходимости в ProtectedStack, подойдет обычная структура данных стека/очереди.
  • Нет необходимости в ускорении, если вы используете достаточно новый компилятор - удаление зависимостей сборки всегда приятно.

У меня такое ощущение, что многопоточность для вас довольно нова, поэтому я могу только посоветовать посмотреть, как другие решили проблемы синхронизации, поскольку очень трудно понять это. Путаница в отношении того, что происходит в среде с несколькими потоками и общими данными, обычно приводит к таким проблемам, как взаимоблокировки в будущем.

person Sean Cline    schedule 23.10.2012

Вы не проверяете, что производящий поток был выполнен, прежде чем пытаться потреблять. Вы также не блокируете размер/пусто/сверху... это небезопасно, если контейнер обновляется.

person Tony Delroy    schedule 23.10.2012
comment
спасибо за ответ. На самом деле, я добавил sleep(1), если потребитель найдет пустой стек. Да, мне не пришла в голову идея защитить размер/пусто/верх. - person ezzakrem; 23.10.2012