首页 > 代码库 > 线程安全的环形缓冲区实现

线程安全的环形缓冲区实现

来源:http://blog.csdn.net/lezhiyong
    应用背景:线程1将每次数量不一的音频采样点(PCM音频数据)写入环形缓冲区,线程2每次取固定数量采样点送音频编码器,线程1线程2在平均时间内的读写数据量相等。(倒入桶中的水量有时大有时小,但每次取一瓢喝:)
   该环形缓冲区借鉴CoolPlayer音频播放器中的环形缓冲区代码实现,在读写操作函数中加了锁,允许多线程同时操作。CPs_CircleBuffer基于内存段的读写,比用模板实现的环形缓冲队列适用的数据类型更广些, CPs_CircleBuffer修改成C++中基于对象的实现,加上详细注释,m_csCircleBuffer锁变量为自用的lock类型(将CRITICAL_SECTION封装起来),调用lock()加锁,调用unlock()解锁。使用效果良好,分享出来。

CPs_CircleBuffer环形缓冲还不具备当待写数据量超出空余缓冲时自动分配内存的功能,这个将在后续进行优化。

CPs_CircleBuffer使用步骤:

 

[cpp] view plaincopy
  1. 1、创建对象  
  2. CPs_CircleBuffer* m_pCircleBuffer;  
  3. m_pCircleBuffer = new CPs_CircleBuffer(bufsize);  
  4. 2、写  
  5. if (m_pCircleBuffer->GetFreeSize() < CIC_READCHUNKSIZE)  
  6.  {  
  7.      Sleep(20);  
  8.      continue;  
  9.  }  
  10. m_pCircleBuffer->Write(internetbuffer.lpvBuffer,internetbuffer.dwBufferLength);  
  11. 3、读  
  12. m_pCircleBuffer->Read(pDestBuffer,iBytesToRead, piBytesRead);  
  13.    
  14. 4、其他调用  
  15. if(m_pCircleBuffer->IsComplete())  
  16.     break;          
  17. iUsedSpace =m_pCircleBuffer->GetUsedSize();  
  18. m_pCircleBuffer->SetComplete();  

 

CPs_CircleBuffer修改为类的定义:

[cpp] view plaincopy
  1. class  CPs_CircleBuffer  
  2. {  
  3. public:  
  4.        CPs_CircleBuffer(const unsigned int iBufferSize);  
  5.        ~CPs_CircleBuffer();  
  6. public:  
  7.         // Public functions  
  8.         void  Uninitialise();  
  9.         void  Write(const void* pSourceBuffer, const unsigned int iNumBytes);  
  10.         bool  Read(void* pDestBuffer, const size_t iBytesToRead, size_t* pbBytesRead);  
  11.         void  Flush();  
  12.         unsigned int GetUsedSize();  
  13.         unsigned int GetFreeSize();  
  14.         void  SetComplete();  
  15.         bool  IsComplete();  
  16.   
  17. private:         
  18.         unsigned char*  m_pBuffer;  
  19.         unsigned int    m_iBufferSize;  
  20.         unsigned int    m_iReadCursor;  
  21.         unsigned int    m_iWriteCursor;  
  22.         HANDLE          m_evtDataAvailable;  
  23.         Vlock           m_csCircleBuffer;  
  24.         bool            m_bComplete;        
  25. };  

CPs_CircleBuffer修改为类的实现:

[cpp] view plaincopy
  1. #define CIC_WAITTIMEOUT  3000  
  2.   
  3. CPs_CircleBuffer::CPs_CircleBuffer(const unsigned int iBufferSize)  
  4. {  
  5.     m_iBufferSize = iBufferSize;  
  6.     m_pBuffer = (unsigned char*)malloc(iBufferSize);  
  7.     m_iReadCursor = 0;  
  8.     m_iWriteCursor = 0;  
  9.     m_bComplete = false;  
  10.     m_evtDataAvailable = CreateEvent(NULL, FALSE, FALSE, NULL);  
  11. }  
  12.   
  13. CPs_CircleBuffer::~CPs_CircleBuffer()  
  14. {  
  15.     Uninitialise();  
  16. }  
  17.   
  18. // Public functions  
  19. void CPs_CircleBuffer::Uninitialise()//没有必要public这个接口函数,long120817  
  20. {  
  21.     CloseHandle(m_evtDataAvailable);  
  22.     free(m_pBuffer);  
  23. }  
  24.   
  25. //Write前一定要调用m_pCircleBuffer->GetFreeSize(),如果FreeSize不够需要等待,long120817  
  26.   
  27. void  CPs_CircleBuffer::Write(const void* _pSourceBuffer, const unsigned int _iNumBytes)  
  28. {  
  29.     unsigned int iBytesToWrite = _iNumBytes;  
  30.     unsigned char* pSourceReadCursor = (unsigned char*)_pSourceBuffer;  
  31.   
  32.     //CP_ASSERT(iBytesToWrite <= GetFreeSize());//修改为没有足够空间就返回,write前一定要加GetFreeSize判断,否则进入到这里相当于丢掉数据,         // long120817  
  33.     if (iBytesToWrite > GetFreeSize())  
  34.     {  
  35.         return;  
  36.     }  
  37.     _ASSERT(m_bComplete == false);  
  38.   
  39.     m_csCircleBuffer.Lock();  
  40.   
  41.     if (m_iWriteCursor >= m_iReadCursor)  
  42.     {  
  43.         //              0                                            m_iBufferSize  
  44.         //              |-----------------|===========|--------------|  
  45.         //                                pR->        pW->   
  46.         // 计算尾部可写空间iChunkSize,long120817  
  47.         unsigned int iChunkSize = m_iBufferSize - m_iWriteCursor;  
  48.   
  49.         if (iChunkSize > iBytesToWrite)  
  50.         {  
  51.             iChunkSize = iBytesToWrite;  
  52.         }  
  53.   
  54.         // Copy the data  
  55.         memcpy(m_pBuffer + m_iWriteCursor,pSourceReadCursor, iChunkSize);  
  56.   
  57.         pSourceReadCursor += iChunkSize;  
  58.   
  59.         iBytesToWrite -= iChunkSize;  
  60.   
  61.         // 更新m_iWriteCursor  
  62.         m_iWriteCursor += iChunkSize;  
  63.   
  64.         if (m_iWriteCursor >= m_iBufferSize)//如果m_iWriteCursor已经到达末尾  
  65.             m_iWriteCursor -= m_iBufferSize;//返回到起点0位置,long120817  
  66.   
  67.     }  
  68.   
  69.     //剩余数据从Buffer起始位置开始写  
  70.     if (iBytesToWrite)  
  71.     {  
  72.         memcpy(m_pBuffer + m_iWriteCursor,pSourceReadCursor, iBytesToWrite);  
  73.         m_iWriteCursor += iBytesToWrite;  
  74.         _ASSERT(m_iWriteCursor < m_iBufferSize);//这个断言没什么意思,应该_ASSERT(m_iWriteCursor <= m_iReadCursor);long20120817  
  75.     }  
  76.   
  77.     SetEvent(m_evtDataAvailable);//设置数据写好信号量  
  78.   
  79.     m_csCircleBuffer.UnLock();  
  80. }  
  81.   
  82. bool  CPs_CircleBuffer::Read(void* pDestBuffer, const size_t _iBytesToRead, size_t* pbBytesRead)  
  83. {  
  84.     size_t iBytesToRead = _iBytesToRead;  
  85.     size_t iBytesRead = 0;  
  86.     DWORD dwWaitResult;  
  87.     bool bComplete = false;  
  88.   
  89.     while (iBytesToRead > 0 && bComplete == false)  
  90.     {  
  91.         dwWaitResult = WaitForSingleObject(m_evtDataAvailable, CIC_WAITTIMEOUT);//等待数据写好,long120817  
  92.   
  93.         if (dwWaitResult == WAIT_TIMEOUT)  
  94.         {  
  95.             //TRACE_INFO2("Circle buffer - did not fill in time!");  
  96.             *pbBytesRead = iBytesRead;  
  97.             return FALSE;//等待超时则返回  
  98.         }  
  99.   
  100.         m_csCircleBuffer.Lock();  
  101.   
  102.         if (m_iReadCursor > m_iWriteCursor)  
  103.         {  
  104.             //              0                                                    m_iBufferSize  
  105.             //              |=================|-----|===========================|  
  106.             //                                pW->  pR->   
  107.             unsigned int iChunkSize = m_iBufferSize - m_iReadCursor;  
  108.   
  109.             if (iChunkSize > iBytesToRead)  
  110.                 iChunkSize = (unsigned int)iBytesToRead;  
  111.   
  112.             //读取操作  
  113.             memcpy((unsigned char*)pDestBuffer + iBytesRead,m_pBuffer + m_iReadCursor,iChunkSize);  
  114.   
  115.             iBytesRead += iChunkSize;  
  116.             iBytesToRead -= iChunkSize;  
  117.   
  118.             m_iReadCursor += iChunkSize;  
  119.   
  120.             if (m_iReadCursor >= m_iBufferSize)//如果m_iReadCursor已经到达末尾  
  121.                 m_iReadCursor -= m_iBufferSize;//返回到起点0位置,long120817  
  122.         }  
  123.   
  124.         if (iBytesToRead && m_iReadCursor < m_iWriteCursor)  
  125.         {  
  126.             unsigned int iChunkSize = m_iWriteCursor - m_iReadCursor;  
  127.   
  128.             if (iChunkSize > iBytesToRead)  
  129.                 iChunkSize = (unsigned int)iBytesToRead;  
  130.   
  131.             //读取操作  
  132.             memcpy((unsigned char*)pDestBuffer + iBytesRead,m_pBuffer + m_iReadCursor,iChunkSize);  
  133.   
  134.             iBytesRead += iChunkSize;  
  135.             iBytesToRead -= iChunkSize;  
  136.             m_iReadCursor += iChunkSize;  
  137.         }  
  138.   
  139.         //如果有更多的数据要写  
  140.         if (m_iReadCursor == m_iWriteCursor)  
  141.         {  
  142.             if (m_bComplete)//跳出下一个while循环,该值通过SetComplete()设置,此逻辑什么意思?long120817  
  143.                 bComplete = true;  
  144.         }  
  145.         else//还有数据可以读,SetEvent,在下一个while循环开始可以不用再等待,long120817  
  146.             SetEvent(m_evtDataAvailable);  
  147.   
  148.         m_csCircleBuffer.UnLock();  
  149.     }  
  150.   
  151.     *pbBytesRead = iBytesRead;  
  152.   
  153.     return bComplete ? false : true;  
  154.   
  155. }  
  156. //  0                                                m_iBufferSize  
  157. //  |------------------------------------------------|  
  158. //  pR  
  159. //  pW  
  160. //读写指针归零  
  161. void  CPs_CircleBuffer::Flush()  
  162. {  
  163.     m_csCircleBuffer.Lock();  
  164.     m_iReadCursor = 0;  
  165.     m_iWriteCursor = 0;  
  166.     m_csCircleBuffer.UnLock();  
  167.   
  168. }  
  169. //获取已经写的内存  
  170. unsigned int CPs_CircleBuffer::GetUsedSize()  
  171. {  
  172.      return m_iBufferSize - GetFreeSize();  
  173.   
  174. }  
  175.   
  176.   
  177. unsigned int CPs_CircleBuffer::GetFreeSize()  
  178. {  
  179.     unsigned int iNumBytesFree;  
  180.   
  181.     m_csCircleBuffer.Lock();  
  182.   
  183.     if (m_iWriteCursor < m_iReadCursor)  
  184.     {  
  185.         //              0                                                    m_iBufferSize  
  186.         //              |=================|-----|===========================|  
  187.         //                                pW->  pR->   
  188.         iNumBytesFree = (m_iReadCursor - 1) - m_iWriteCursor;  
  189.     }  
  190.     else if (m_iWriteCursor == m_iReadCursor)  
  191.     {  
  192.         iNumBytesFree = m_iBufferSize;  
  193.     }  
  194.     else  
  195.     {  
  196.         //              0                                                    m_iBufferSize  
  197.         //              |-----------------|=====|---------------------------|  
  198.         //                                pR->   pW->   
  199.         iNumBytesFree = (m_iReadCursor - 1) + (m_iBufferSize - m_iWriteCursor);  
  200.     }  
  201.   
  202.     m_csCircleBuffer.UnLock();  
  203.   
  204.     return iNumBytesFree;  
  205.   
  206. }  
  207. //该函数什么时候调用?long120817  
  208. void  CPs_CircleBuffer::SetComplete()  
  209. {  
  210.     m_csCircleBuffer.Lock();  
  211.     m_bComplete = true;  
  212.     SetEvent(m_evtDataAvailable);  
  213.     m_csCircleBuffer.UnLock();  
  214. }  

 

附自动初始化和摧毁的锁对象Vlock的实现:

[cpp] view plaincopy
    1. #ifdef WIN32  
    2. #include <windows.h>  
    3.   
    4. #define  V_MUTEX            CRITICAL_SECTION //利用临界区实现的锁变量  
    5. #define  V_MUTEX_INIT(m)        InitializeCriticalSection(m)  
    6. #define  V_MUTEX_LOCK(m)        EnterCriticalSection(m)  
    7. #define  V_MUTEX_UNLOCK(m)      LeaveCriticalSection(m)  
    8. #define  V_MUTEX_DESTORY(m)     DeleteCriticalSection(m)  
    9.   
    10. #else  
    11.   
    12. #define  V_MUTEX                pthread_mutex_t  
    13. #define  V_MUTEX_INIT(m)        pthread_mutex_init(m,NULL)  
    14. #define  V_MUTEX_LOCK(m)        pthread_mutex_Lock(m)  
    15. #define  V_MUTEX_UNLOCK(m)      pthread_mutex_unLock(m)  
    16. #define  V_MUTEX_DESTORY(m)     pthread_mutex_destroy(m)  
    17.   
    18. #endif  
    19.   
    20.   
    21. class  Vlock  
    22. {  
    23. public:  
    24.     Vlock(void)  
    25.     {  
    26.         V_MUTEX_INIT(&m_Lock);  
    27.     }  
    28.     ~Vlock(void)  
    29.     {  
    30.         V_MUTEX_DESTORY(&m_Lock);  
    31.     }  
    32. public:  
    33.     void Lock(){V_MUTEX_LOCK(&m_Lock);}  
    34.     void UnLock(){V_MUTEX_UNLOCK(&m_Lock);}  
    35. private:  
    36.     V_MUTEX m_Lock;  
    37. };