首页 > 代码库 > 生产者/消费者模式(一)
生产者/消费者模式(一)
生产者消费者问题是一个多线程同步问题的经典案例,大多数多线程编程问题都是以生产者-消费者模式为基础,扩展衍生来的。在生产者消费者模式中,缓冲区起到了连接两个模块的作用:生产者把数据放入缓冲区,而消费者从缓冲区取出数据,如下图所示:
可以看出Buffer缓冲区作为一个中介,将生产者和消费者分开,使得两部分相对独立,生产者消费者不需要知道对方的实现逻辑,对其中一个的修改,不会影响另一个,从设计模式的角度看,降低了耦合度。而对于图中处在多线程环境中Buffer,需要共享给多个多个生产者和消费者,为了保证读写数据和操作的正确性与时序性,程序需要对Buffer结构进行同步处理。通常情况下,生产者-消费者模式中的广泛使用的Buffer缓冲区结构是阻塞队列。
阻塞队列的特点为:“当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻 塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从 队列中移除一个或者多个元素,或者完全清空队列。” 阻塞队列的实现通常是对普通队列进行同步控制,封装起来。一个linux下的通用队列定义如下,代码所示的队列没有为满的情况,不设定队列元素总数的上限:
1 template<class T> 2 class BlockQueue 3 { 4 public: 5 BlockQueue(); 6 ~BlockQueue(); 7 8 void Add(T *pData); 9 T *Get(int timeout=0); 10 int Count(); 11 void SetBlockFlag(); 12 private: 13 typedef struct _Node 14 { 15 T * m_pData; 16 struct _Node * m_pNext; 17 } Node; 18 Node * m_pFirst; 19 Node * m_pLast; 20 int m_nCount; 21 22 bool m_bBlockFlag; //是否阻塞 23 CCond m_condQueue; 24 }; 25 26 template<class T> 27 BlockQueue<T>::BlockQueue(int nMetuxFlag) 28 { 29 m_pFirst = NULL; //首元素 30 m_pLast = NULL; //末元素 31 m_nCount = 0; //元素格式 32 33 m_bBlockFlag = false; 34 } 35 36 template<class T> 37 void BlockQueue<T>::SetBlockFlag() 38 { 39 m_bBlockFlag = true; 40 } 41 42 template<class T> 43 BlockQueue<T>::~BlockQueue() 44 { 45 Node *pTmp = NULL; 46 while (m_pFirst != NULL) 47 { 48 pTmp = m_pFirst; 49 m_pFirst = m_pFirst->m_pNext; 50 delete pTmp; 51 pTmp = NULL; 52 } 53 m_pLast = NULL; 54 } 55 56 template<class T> 57 void BlockQueue<T>::Add(T *pData) 58 { 59 (m_condQueue.GetMutex()).EnterMutex(); 60 61 Node *pTmp = new Node; 62 if (pTmp == NULL) 63 { 64 (m_condQueue.GetMutex()).LeaveMutex(); 65 return; 66 } 67 pTmp->m_pData =http://www.mamicode.com/ pData; 68 pTmp->m_pNext = NULL; 69 if (m_nCount == 0) 70 { 71 m_pFirst = pTmp; 72 m_pLast = pTmp; 73 } 74 else 75 { 76 m_pLast->m_pNext = pTmp; 77 m_pLast = pTmp; 78 } 79 m_nCount++; 80 81 if (m_bBlockFlag) 82 { 83 m_condQueue.Signal(); 84 } 85 86 (m_condQueue.GetMutex()).LeaveMutex(); 87 } 88 89 template<class T> 90 T *BlockQueue<T>::Get(int timeout) 91 { 92 (m_condQueue.GetMutex()).EnterMutex(); 93 94 while (m_nCount == 0) 95 { 96 if (!m_bBlockFlag) 97 { 98 (m_condQueue.GetMutex()).LeaveMutex(); 99 return NULL;100 }101 else102 {103 if (m_condQueue.WaitNoLock(timeout) == 1)104 {105 (m_condQueue.GetMutex()).LeaveMutex();106 return NULL;107 } 108 }109 }110 111 T * pTmpData = http://www.mamicode.com/m_pFirst->m_pData;112 Node *pTmp = m_pFirst;113 114 m_pFirst = m_pFirst->m_pNext;115 delete pTmp;116 pTmp = NULL;117 m_nCount --;118 if (m_nCount == 0)119 {120 m_pLast = NULL;121 }122 123 (m_condQueue.GetMutex()).LeaveMutex();124 125 return pTmpData;126 }127 128 template<class T>129 int BlockQueue<T>::Count()130 {131 (m_condQueue.GetMutex()).EnterMutex();132 int nCount = m_nCount;133 (m_condQueue.GetMutex()).LeaveMutex();134 135 return nCount;136 }
队列中使用了Cond条件变量,实现多线程环境中队列的同步与阻塞:
- 条件变量自己持有mutex,在向队列中Add/Get元素时,mutex将队列锁住,同一时刻只允许一个线程对队列进行操作;
- 消费者从队列中获取数据时,如果设置了阻塞模式,那么当队列元素为空时,需要阻塞在条件变量上进行等待,其他线程调用Add函数,向队列中添加元素后,唤醒阻塞在条件变量上的线程。注意这里用到的等待条件是while(count == 0)而不是if(count == 0),因为在多核处理器环境中,Signal唤醒操作可能会激活多于一个线程(阻塞在条件变量上的线程),使得多个调用等待的线程返回。所以用while循环对condition多次判断,可以避免这种假唤醒。
- 队列元素为包含T类型指针的Node类型指针,析构时只负责释放队列中的Node元素,而真正指向T类型数据的指针,需要调用者进行分配和释放,一般由生产者调用new分配,消费者使用后调用delete释放。这里需要特别注意,因为如果忘记释放,会造成内存泄露。
这段代码是我在多线程环境下,较为简单的生产者-消费者模式中,通用的一个队列模型,目前队列在服务器中稳定运行,还未出现太大的性能问题,当然这与应用模块逻辑较为简单、交易量较小有很大关。仔细分析这段代码,几乎对每个函数的操作都进行了加锁保护,临界区从始到末,粒度较大;再者,队列中持有的T对象指针,均是由调用者动态分配和释放的,在交易量很大的情况下,频繁调用Add和Get函数,不仅会导致加锁解锁的性能消耗,也会使系统产生大量内存碎片。
未完待续……
生产者/消费者模式(一)