首页 > 代码库 > 生产者/消费者模式(一)

生产者/消费者模式(一)

  生产者消费者问题是一个多线程同步问题的经典案例,大多数多线程编程问题都是以生产者-消费者模式为基础,扩展衍生来的。在生产者消费者模式中,缓冲区起到了连接两个模块的作用:生产者把数据放入缓冲区,而消费者从缓冲区取出数据,如下图所示:

  

  可以看出Buffer缓冲区作为一个中介,将生产者和消费者分开,使得两部分相对独立,生产者消费者不需要知道对方的实现逻辑,对其中一个的修改,不会影响另一个,从设计模式的角度看,降低了耦合度。而对于图中处在多线程环境中Buffer,需要共享给多个多个生产者和消费者,为了保证读写数据和操作的正确性与时序性,程序需要对Buffer结构进行同步处理。通常情况下,生产者-消费者模式中的广泛使用的Buffer缓冲区结构是阻塞队列。

  阻塞队列的特点为:“当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程将会被阻 塞,直到其他的线程往空的队列插入新的元素。同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从 队列中移除一个或者多个元素,或者完全清空队列。” 阻塞队列的实现通常是对普通队列进行同步控制,封装起来。一个linux下的通用队列定义如下,代码所示的队列没有为满的情况,不设定队列元素总数的上限:

  1 template<class T>  2 class BlockQueue  3 {  4 public:  5     BlockQueue();  6     ~BlockQueue();  7   8     void Add(T *pData);  9     T *Get(int timeout=0); 10     int Count(); 11     void SetBlockFlag(); 12 private: 13     typedef struct _Node 14     { 15         T * m_pData; 16         struct _Node * m_pNext; 17     } Node; 18     Node * m_pFirst; 19     Node * m_pLast; 20     int m_nCount; 21  22     bool m_bBlockFlag;           //是否阻塞 23     CCond m_condQueue; 24 }; 25  26 template<class T> 27 BlockQueue<T>::BlockQueue(int nMetuxFlag) 28 { 29     m_pFirst = NULL;    //首元素 30     m_pLast  = NULL;    //末元素 31     m_nCount = 0;    //元素格式 32  33     m_bBlockFlag = false; 34 } 35  36 template<class T> 37 void BlockQueue<T>::SetBlockFlag() 38 { 39     m_bBlockFlag = true; 40 } 41  42 template<class T> 43 BlockQueue<T>::~BlockQueue() 44 { 45     Node *pTmp = NULL; 46     while (m_pFirst != NULL) 47     { 48         pTmp = m_pFirst; 49         m_pFirst = m_pFirst->m_pNext; 50         delete pTmp; 51         pTmp = NULL; 52     } 53     m_pLast = NULL; 54 } 55  56 template<class T> 57 void BlockQueue<T>::Add(T *pData) 58 { 59     (m_condQueue.GetMutex()).EnterMutex(); 60  61     Node *pTmp = new Node; 62     if (pTmp == NULL) 63     { 64         (m_condQueue.GetMutex()).LeaveMutex(); 65         return; 66     } 67     pTmp->m_pData =http://www.mamicode.com/ pData; 68     pTmp->m_pNext = NULL; 69     if (m_nCount == 0) 70     { 71         m_pFirst = pTmp; 72         m_pLast  = pTmp; 73     } 74     else 75     { 76         m_pLast->m_pNext = pTmp; 77         m_pLast = pTmp; 78     } 79     m_nCount++; 80  81     if (m_bBlockFlag) 82     { 83         m_condQueue.Signal(); 84     } 85       86     (m_condQueue.GetMutex()).LeaveMutex(); 87 } 88  89 template<class T> 90 T *BlockQueue<T>::Get(int timeout) 91 { 92     (m_condQueue.GetMutex()).EnterMutex(); 93  94     while (m_nCount == 0) 95     { 96         if (!m_bBlockFlag) 97         { 98             (m_condQueue.GetMutex()).LeaveMutex(); 99             return NULL;100         }101         else102         {103             if (m_condQueue.WaitNoLock(timeout) == 1)104             {105                 (m_condQueue.GetMutex()).LeaveMutex();106                 return NULL;107             }            108         }109     }110 111     T * pTmpData = http://www.mamicode.com/m_pFirst->m_pData;112     Node *pTmp = m_pFirst;113 114     m_pFirst = m_pFirst->m_pNext;115     delete pTmp;116     pTmp = NULL;117     m_nCount --;118     if (m_nCount == 0)119     {120         m_pLast = NULL;121     }122     123     (m_condQueue.GetMutex()).LeaveMutex();124 125     return pTmpData;126 }127 128 template<class T>129 int BlockQueue<T>::Count()130 {131     (m_condQueue.GetMutex()).EnterMutex();132     int nCount = m_nCount;133     (m_condQueue.GetMutex()).LeaveMutex();134 135     return nCount;136 }

  队列中使用了Cond条件变量,实现多线程环境中队列的同步与阻塞:

  • 条件变量自己持有mutex,在向队列中Add/Get元素时,mutex将队列锁住,同一时刻只允许一个线程对队列进行操作;
  • 消费者从队列中获取数据时,如果设置了阻塞模式,那么当队列元素为空时,需要阻塞在条件变量上进行等待,其他线程调用Add函数,向队列中添加元素后,唤醒阻塞在条件变量上的线程。注意这里用到的等待条件是while(count == 0)而不是if(count == 0),因为在多核处理器环境中,Signal唤醒操作可能会激活多于一个线程(阻塞在条件变量上的线程),使得多个调用等待的线程返回。所以用while循环对condition多次判断,可以避免这种假唤醒。
  • 队列元素为包含T类型指针的Node类型指针,析构时只负责释放队列中的Node元素,而真正指向T类型数据的指针,需要调用者进行分配和释放,一般由生产者调用new分配,消费者使用后调用delete释放。这里需要特别注意,因为如果忘记释放,会造成内存泄露。

  这段代码是我在多线程环境下,较为简单的生产者-消费者模式中,通用的一个队列模型,目前队列在服务器中稳定运行,还未出现太大的性能问题,当然这与应用模块逻辑较为简单、交易量较小有很大关。仔细分析这段代码,几乎对每个函数的操作都进行了加锁保护,临界区从始到末,粒度较大;再者,队列中持有的T对象指针,均是由调用者动态分配和释放的,在交易量很大的情况下,频繁调用Add和Get函数,不仅会导致加锁解锁的性能消耗,也会使系统产生大量内存碎片。

  未完待续……

生产者/消费者模式(一)