首页 > 代码库 > Linux组件封装之五:生产者消费者问题
Linux组件封装之五:生产者消费者问题
生产者,消费者问题是有关互斥锁(MutexLock)、条件变量(Condition)、线程(Thread)的经典案例;
描述的问题可以叙述为 生产者往buffer中投放产品,而消费者则从buffer中消费产品。
生产着消费者问题的难点在于:
为了缓冲区数据的安全性,一次只允许一个线程进入缓冲区投放或者消费产品,这个buffer就是所谓的临界资源。
生产者往缓冲区中投放产品时,如果缓冲区已满,那么该线程需要等待,即进入阻塞状态,一直到消费者取走产品为止。
相应的,消费者欲取走产品,如果此时缓冲区为空,即没有产品,那么消费者则需要等待,一直到有生产者投放产品为止。
第一个问题属于互斥问题,我们需要一把互斥锁实现互斥访问(MutexLock), 以确保实现缓冲区的安全访问。
后两个问题则属于同步问题,两类线程相互协作,我们需要两个条件变量,一个用于通知消费者从缓冲区取走产品,另一个通知生产者往缓冲区投放产品。
生产者的大概流程为:
1、加锁;2、若缓冲区已满,则进入等待状态;否则执行 3;3、生产产品;4、解锁;5、通知消费者取走产品
消费者的大概流程为:
1、加锁;2、若缓冲区已空,则进入等待状态;否则执行 3;3、取走产品;4、解锁;5、通知生产者生产产品
为此,我们设计出一个缓冲区类,把互斥锁和条件变量作为其成员变量;
1 #ifndef BUFFER_H_ 2 #define BUFFER_H_ 3 4 #include "NonCopyable.h" 5 #include "MutexLock.h" 6 #include "Condition.h" 7 #include <queue> 8 9 class Buffer:NonCopyable10 {11 public:12 Buffer(size_t size);//attention13 14 void push(int val);//投放产品15 int pop();//取走产品16 17 bool isEmpty()const;18 size_t size()const;19 private:20 mutable MutexLock mutex_;//注意声明次序,不能改变21 Condition full_;22 Condition empty_;23 24 size_t size_;//缓冲区大小25 std::queue<int> q_;26 };27 28 #endif
这里注意, 我们把同步与互斥的操作都放入Buffer中,是得生产者和消费者线程不必考虑其中的细节,这符合软件设计的“高内聚,低耦合”原则;
还有一点, mutex被声明为mutable类型,意味着mutex的状态在const函数中仍然可以被改变,是符合程序逻辑的,把mutex声明为mutable,是一种标准实现。
Buffer的具体实现代码如下:
1 #include "Buffer.h" 2 #include <iostream> 3 Buffer::Buffer(size_t size) 4 :size_(size), 5 full_(mutex_), //用mutex初始化Condition的一个对象 6 empty_(mutex_)//用mutex初始化Condition的另一个对象 7 {} 8 9 void Buffer::push(int val)10 {11 { //attention 作用域问题12 MutexGuard lock(mutex_);13 while(q_.size()>= size_)14 empty_.wait();15 q_.push(val);16 }17 full_.signal();18 } 19 20 int Buffer::pop()//attention21 {22 int tmp= 0;23 {24 MutexGuard lock(mutex_);25 while(q_.empty())26 full_.wait();27 tmp = q_.front();28 q_.pop();29 }30 empty_.signal();31 return tmp;32 }33 34 35 bool Buffer::isEmpty()const36 {37 //after 38 MutexGuard lock(mutex_);//作用域仅限于花括号内39 return q_.empty();40 }41 42 size_t Buffer::size()const43 {44 MutexGuard lock(mutex_);45 return q_.size();46 }
注意:
1、条件变量的等待必须使用While, 这是一种最佳实践,原因可见Condition的封装 Linux组件封装之二:Condition;
2、可以先notify,也可以先解锁,不过推荐先解锁,原因是如果线程A先notify,唤醒一个线程B,但是A还未解锁,此时如果线程切换至刚唤醒的线程B,B马上尝试lock,但是肯定失败,然后阻塞,这增加了一次线程切换的开销。
这里还有一个问题,就是我们在main函数中,必须一个一个的声明生产者,消费者,一个一个的去start、join,那么为了防止这种麻烦,我们可以怎么做呢?
我们可以将缓冲区与多个生产者、消费者封装成一个 车间类。代码如下:
1 #ifndef WORKSHOP_H_ 2 #define WORKSHOP_H_ 3 4 #include "NonCopyable.h" 5 #include "Buffer.h" 6 #include <vector> 7 8 class ProducerThread; 9 class ConsumerThread;10 class Buffer;11 class WorkShop:NonCopyable12 {13 public:14 WorkShop(size_t bufferSize,15 size_t producerSize,16 size_t consumerSize);17 18 ~WorkShop();19 void startWorking();20 21 private:22 size_t bufferSize_;23 Buffer buffer_;24 25 size_t producerSize_;26 size_t consumerSize_;27 std::vector<ProducerThread*> producers_;28 std::vector<ConsumerThread*> consumers_;29 };30 31 #endif
实现如下(注意之处放在cpp中);
1 #include "WorkShop.h" 2 #include "ProducerThread.h" 3 #include "ConsumerThread.h" 4 5 //version 1 6 WorkShop::WorkShop(size_t buffersize, 7 size_t producerSize, 8 size_t consumerSize) 9 :bufferSize_(buffersize),10 buffer_(bufferSize_),11 producerSize_(producerSize),12 consumerSize_(consumerSize),13 producers_(producerSize_, new ProducerThread(buffer_)), 14 consumers_(consumerSize_, new ConsumerThread(buffer_)) 15 {16 17 }18 19 //version 220 /*21 WorkShop::WorkShop(size_t buffersize,22 size_t producerSize,23 size_t consumerSize)24 :bufferSize_(buffersize),25 buffer_(bufferSize_),26 producerSize_(producerSize),27 consumerSize_(consumerSize),28 producers_(producerSize_,NULL),//vector 的初始化 29 consumers_(consumerSize_,NULL) 30 {31 for(std::vector<ProducerThread*>::iterator it = producers_.begin();32 it != producers_.end();33 ++it)34 {35 *it = new ProducerThread(buffer_);36 }37 38 for(std::vector<ConsumerThread*>::iterator it = consumers_.begin();39 it != consumers_.end();40 ++it)41 {42 *it = new ConsumerThread(buffer_);43 }44 }45 */46 47 WorkShop::~WorkShop()48 {49 for(std::vector<ProducerThread*>::iterator it = producers_.begin();50 it != producers_.end();51 ++it)52 {53 delete *it ;54 }55 56 for(std::vector<ConsumerThread*>::iterator it = consumers_.begin();57 it != consumers_.end();58 ++it)59 {60 delete *it ;61 }62 }63 64 void WorkShop::startWorking()65 {66 for(std::vector<ProducerThread*>::iterator it = producers_.begin();67 it != producers_.end();68 ++it)69 {70 //注意,此循环不能同时调用start,join->发生阻塞(只能产生一个 ProducerThread)71 (*it)->start() ;72 }73 for(std::vector<ConsumerThread*>::iterator it = consumers_.begin();74 it != consumers_.end();75 ++it)76 {77 (*it)->start() ;78 }79 80 for(std::vector<ProducerThread*>::iterator it = producers_.begin();81 it != producers_.end();82 ++it)83 {84 (*it)->join() ;85 }86 for(std::vector<ConsumerThread*>::iterator it = consumers_.begin();87 it != consumers_.end();88 ++it)89 {90 (*it)->join() ;91 }92 }
这样我们就可以同时指定 buffer的大小,生产者的数目,消费者的数目。
Linux组件封装之五:生产者消费者问题