首页 > 代码库 > 无锁环形队列

无锁环形队列

  1 #include "stdafx.h"
  2 #include <process.h>
  3 #include <stdio.h>
  4 #include <Windows.h>
  5 #include <stdlib.h>
  6 #include <assert.h>
  7 
  8 #define MAX_VIDEO_BUFF 1024
  9 
 10 struct Header
 11 {
 12     WORD wSize;
 13     char data[0];
 14 };
 15 
 16 #define TEST_MSG_MAX_SIZE 128
 17 #define MAX_VIDEO_ELEMENT (TEST_MSG_MAX_SIZE + sizeof(Header))
 18 
 19 class VideoQueue
 20 {
 21 public:
 22     VideoQueue() : r(0), w(0), buf(0), buf_extend(0) {}
 23 
 24 //////////////////////////////////////////////////////////////////////
 25 //////////////////// WRITE ///////////////////////////////////////////
 26 //////////////////////////////////////////////////////////////////////
 27 
 28     bool Write(const char* p, int len)
 29     {
 30         if (GetSpaceSize() < len)
 31             return false;
 32 
 33         int toendlen = MAX_VIDEO_BUFF - w;
 34         if (toendlen >= len)
 35             memcpy(&buf[w], p, len);
 36         else 
 37         {
 38             memcpy(&buf[w], p, toendlen);
 39             memcpy(&buf[0], p + toendlen, len - toendlen);
 40         }
 41 
 42         w = (w + len) % MAX_VIDEO_BUFF;
 43         return true;
 44     }
 45     bool IsFull() { 
 46         return ((w + 1) % MAX_VIDEO_BUFF == r); 
 47     }
 48     int GetSpaceSize()    
 49     {
 50         int rt = r + MAX_VIDEO_BUFF - w - 1;
 51         if (rt >= MAX_VIDEO_BUFF)
 52             rt -= MAX_VIDEO_BUFF;
 53         return rt;
 54     }
 55 
 56 //////////////////////////////////////////////////////////////////////
 57 //////////////////// READ ////////////////////////////////////////////
 58 //////////////////////////////////////////////////////////////////////
 59 
 60     const char* GetDataPtr(int len)
 61     {
 62         if (GetDataLength() < len)
 63             return 0;
 64 
 65         if (len > MAX_VIDEO_ELEMENT)
 66             return 0;
 67 
 68         int toendlen = MAX_VIDEO_BUFF - r;
 69         if (toendlen >= len)
 70             return &buf[r];
 71         else
 72         {
 73             memcpy(buf_extend, &buf[r], toendlen);
 74             memcpy(((char*)buf_extend) + toendlen, &buf[0], len - toendlen);
 75             return (const char*)buf_extend;
 76         }
 77     }
 78     bool IsEmpty() { 
 79         return (w == r); 
 80     }
 81     bool Skip(int len)
 82     { 
 83         assert(GetDataLength() >= len); 
 84         r = (r + len) % MAX_VIDEO_BUFF; 
 85         return true; 
 86     }
 87     int GetDataLength()
 88     {
 89         int rt = w + MAX_VIDEO_BUFF - r;
 90         if (rt >= MAX_VIDEO_BUFF)
 91             rt -= MAX_VIDEO_BUFF;
 92         return rt;
 93     }
 94 
 95 
 96     //////
 97     bool CreateBuff() 
 98     {
 99         if (buf == 0)
100             buf = (char*)malloc(MAX_VIDEO_BUFF);
101 
102         if (buf_extend == 0)
103             buf_extend = (char*)malloc(MAX_VIDEO_ELEMENT);
104 
105         return (buf != 0);
106     }
107     
108 private:
109     char* buf;//数据实际上只能写入MAX_BUFF-1    
110     int r;
111     int w;
112     char* buf_extend;
113 };
114 
115 
116 VideoQueue* q = 0;
117 
118 CRITICAL_SECTION cs;
119 
120 #define TEST_MSG_COUNT 100000
121 void ThreadA(void*)
122 {
123     // 创建消息,不断向队列里填充
124     char s_Buff[TEST_MSG_MAX_SIZE + sizeof(Header)];
125     Header *pHeader = (Header *)s_Buff;
126     int iLeftCount = 0;
127     int iPos = 0;
128     int iSendSize = 0;
129     int iPackIndex = 0;
130     int iTotalSize = 0;
131 
132     while (1)
133     {
134         //EnterCriticalSection(&cs);
135 
136         if (q->IsFull())
137         {
138             //LeaveCriticalSection(&cs);
139             //Sleep(0);
140             continue;
141         }
142 
143         if (iLeftCount == 0)
144         {
145             if (iPackIndex++ >= TEST_MSG_COUNT)
146             {
147                 //LeaveCriticalSection(&cs);
148                 break;
149             }
150 
151             // Create a Msg;
152             pHeader->wSize = rand() % (TEST_MSG_MAX_SIZE - 32) + 32;
153             for (WORD i = 0; i < pHeader->wSize; ++i)
154                 pHeader->data[i] = char(pHeader->wSize + i);
155             iTotalSize += pHeader->wSize;
156             iLeftCount = pHeader->wSize + sizeof(Header);
157             iPos = 0;
158         }
159 
160         int iSpace = q->GetSpaceSize();
161         if (iLeftCount > TEST_MSG_MAX_SIZE / 4)
162             iSendSize = rand() % iLeftCount;
163         else
164             iSendSize = iLeftCount;
165 
166         if (iSpace < iSendSize)
167         {
168             assert(q->Write(s_Buff + iPos, iSpace));
169             iLeftCount -= iSpace;
170             iPos += iSpace;
171         }
172         else
173         {
174             assert(q->Write(s_Buff + iPos, iSendSize));
175             iLeftCount -= iSendSize;
176             iPos += iSendSize;
177         }
178 
179         //LeaveCriticalSection(&cs);
180         //Sleep(0);
181     }
182     printf("Send %d Msg, Size:%d!\n", TEST_MSG_COUNT, iTotalSize);
183 
184     // __EndThreadEx
185 }
186 
187 void ThreadB(void*)
188 {
189     // 不断从队列里读消息
190     Header *pHeader = NULL;
191 
192     int iMsgCount = 0;
193     int iTotalSize = 0;
194     int iErrorPack = 0;
195 
196     while (1)
197     {
198         //EnterCriticalSection(&cs);
199         if (q->IsEmpty())
200         {
201             //LeaveCriticalSection(&cs);
202             //Sleep(0);
203             continue;
204         }
205 
206         pHeader = (Header *)q->GetDataPtr(sizeof(Header));
207         if (pHeader)
208         {
209             pHeader = (Header *)q->GetDataPtr(sizeof(Header) + pHeader->wSize);
210             if (pHeader)
211             {
212                 for (WORD i = 0; i < pHeader->wSize; ++i)
213                     if (pHeader->data[i] != char(pHeader->wSize + i))
214                     {
215                         iErrorPack++;
216                         break;
217                     }
218 
219                 iMsgCount ++;
220                 iTotalSize += pHeader->wSize;
221                 q->Skip(sizeof(Header) + pHeader->wSize);
222 
223                 if (iMsgCount >= TEST_MSG_COUNT)
224                 {
225                     //LeaveCriticalSection(&cs);
226                     break;
227                 }
228             }
229         }
230         //LeaveCriticalSection(&cs);
231         //Sleep(0);
232     }
233     printf("Recv %d Msg, Size:%d, ErrorPack:%d!\n", TEST_MSG_COUNT, iTotalSize,iErrorPack);
234 
235     // __EndThreadEx
236 }
237 
238 int _tmain(int argc, _TCHAR* argv[])
239 {
240     srand(1);
241     q = new VideoQueue;
242     if (!q->CreateBuff())
243     {
244         printf("createbuff fail\n");
245         getchar();
246         return 0;
247     }
248     InitializeCriticalSection(&cs);
249 
250     HANDLE hWrite = (HANDLE)_beginthread(ThreadA, 0, 0);
251     HANDLE hRead = (HANDLE)_beginthread(ThreadB, 0, 0);
252 
253     WaitForSingleObject(hWrite, INFINITE);
254     WaitForSingleObject(hRead, INFINITE);
255 
256      getchar();
257  
258      return 0;
259 }