首页 > 代码库 > 无锁环形队列
无锁环形队列
1 #include "stdafx.h" 2 #include <process.h> 3 #include <stdio.h> 4 #include <Windows.h> 5 #include <stdlib.h> 6 #include <assert.h> 7 8 #define MAX_VIDEO_BUFF 1024 9 10 struct Header 11 { 12 WORD wSize; 13 char data[0]; 14 }; 15 16 #define TEST_MSG_MAX_SIZE 128 17 #define MAX_VIDEO_ELEMENT (TEST_MSG_MAX_SIZE + sizeof(Header)) 18 19 class VideoQueue 20 { 21 public: 22 VideoQueue() : r(0), w(0), buf(0), buf_extend(0) {} 23 24 ////////////////////////////////////////////////////////////////////// 25 //////////////////// WRITE /////////////////////////////////////////// 26 ////////////////////////////////////////////////////////////////////// 27 28 bool Write(const char* p, int len) 29 { 30 if (GetSpaceSize() < len) 31 return false; 32 33 int toendlen = MAX_VIDEO_BUFF - w; 34 if (toendlen >= len) 35 memcpy(&buf[w], p, len); 36 else 37 { 38 memcpy(&buf[w], p, toendlen); 39 memcpy(&buf[0], p + toendlen, len - toendlen); 40 } 41 42 w = (w + len) % MAX_VIDEO_BUFF; 43 return true; 44 } 45 bool IsFull() { 46 return ((w + 1) % MAX_VIDEO_BUFF == r); 47 } 48 int GetSpaceSize() 49 { 50 int rt = r + MAX_VIDEO_BUFF - w - 1; 51 if (rt >= MAX_VIDEO_BUFF) 52 rt -= MAX_VIDEO_BUFF; 53 return rt; 54 } 55 56 ////////////////////////////////////////////////////////////////////// 57 //////////////////// READ //////////////////////////////////////////// 58 ////////////////////////////////////////////////////////////////////// 59 60 const char* GetDataPtr(int len) 61 { 62 if (GetDataLength() < len) 63 return 0; 64 65 if (len > MAX_VIDEO_ELEMENT) 66 return 0; 67 68 int toendlen = MAX_VIDEO_BUFF - r; 69 if (toendlen >= len) 70 return &buf[r]; 71 else 72 { 73 memcpy(buf_extend, &buf[r], toendlen); 74 memcpy(((char*)buf_extend) + toendlen, &buf[0], len - toendlen); 75 return (const char*)buf_extend; 76 } 77 } 78 bool IsEmpty() { 79 return (w == r); 80 } 81 bool Skip(int len) 82 { 83 assert(GetDataLength() >= len); 84 r = (r + len) % MAX_VIDEO_BUFF; 85 return true; 86 } 87 int GetDataLength() 88 { 89 int rt = w + MAX_VIDEO_BUFF - r; 90 if (rt >= MAX_VIDEO_BUFF) 91 rt -= MAX_VIDEO_BUFF; 92 return rt; 93 } 94 95 96 ////// 97 bool CreateBuff() 98 { 99 if (buf == 0) 100 buf = (char*)malloc(MAX_VIDEO_BUFF); 101 102 if (buf_extend == 0) 103 buf_extend = (char*)malloc(MAX_VIDEO_ELEMENT); 104 105 return (buf != 0); 106 } 107 108 private: 109 char* buf;//数据实际上只能写入MAX_BUFF-1 110 int r; 111 int w; 112 char* buf_extend; 113 }; 114 115 116 VideoQueue* q = 0; 117 118 CRITICAL_SECTION cs; 119 120 #define TEST_MSG_COUNT 100000 121 void ThreadA(void*) 122 { 123 // 创建消息,不断向队列里填充 124 char s_Buff[TEST_MSG_MAX_SIZE + sizeof(Header)]; 125 Header *pHeader = (Header *)s_Buff; 126 int iLeftCount = 0; 127 int iPos = 0; 128 int iSendSize = 0; 129 int iPackIndex = 0; 130 int iTotalSize = 0; 131 132 while (1) 133 { 134 //EnterCriticalSection(&cs); 135 136 if (q->IsFull()) 137 { 138 //LeaveCriticalSection(&cs); 139 //Sleep(0); 140 continue; 141 } 142 143 if (iLeftCount == 0) 144 { 145 if (iPackIndex++ >= TEST_MSG_COUNT) 146 { 147 //LeaveCriticalSection(&cs); 148 break; 149 } 150 151 // Create a Msg; 152 pHeader->wSize = rand() % (TEST_MSG_MAX_SIZE - 32) + 32; 153 for (WORD i = 0; i < pHeader->wSize; ++i) 154 pHeader->data[i] = char(pHeader->wSize + i); 155 iTotalSize += pHeader->wSize; 156 iLeftCount = pHeader->wSize + sizeof(Header); 157 iPos = 0; 158 } 159 160 int iSpace = q->GetSpaceSize(); 161 if (iLeftCount > TEST_MSG_MAX_SIZE / 4) 162 iSendSize = rand() % iLeftCount; 163 else 164 iSendSize = iLeftCount; 165 166 if (iSpace < iSendSize) 167 { 168 assert(q->Write(s_Buff + iPos, iSpace)); 169 iLeftCount -= iSpace; 170 iPos += iSpace; 171 } 172 else 173 { 174 assert(q->Write(s_Buff + iPos, iSendSize)); 175 iLeftCount -= iSendSize; 176 iPos += iSendSize; 177 } 178 179 //LeaveCriticalSection(&cs); 180 //Sleep(0); 181 } 182 printf("Send %d Msg, Size:%d!\n", TEST_MSG_COUNT, iTotalSize); 183 184 // __EndThreadEx 185 } 186 187 void ThreadB(void*) 188 { 189 // 不断从队列里读消息 190 Header *pHeader = NULL; 191 192 int iMsgCount = 0; 193 int iTotalSize = 0; 194 int iErrorPack = 0; 195 196 while (1) 197 { 198 //EnterCriticalSection(&cs); 199 if (q->IsEmpty()) 200 { 201 //LeaveCriticalSection(&cs); 202 //Sleep(0); 203 continue; 204 } 205 206 pHeader = (Header *)q->GetDataPtr(sizeof(Header)); 207 if (pHeader) 208 { 209 pHeader = (Header *)q->GetDataPtr(sizeof(Header) + pHeader->wSize); 210 if (pHeader) 211 { 212 for (WORD i = 0; i < pHeader->wSize; ++i) 213 if (pHeader->data[i] != char(pHeader->wSize + i)) 214 { 215 iErrorPack++; 216 break; 217 } 218 219 iMsgCount ++; 220 iTotalSize += pHeader->wSize; 221 q->Skip(sizeof(Header) + pHeader->wSize); 222 223 if (iMsgCount >= TEST_MSG_COUNT) 224 { 225 //LeaveCriticalSection(&cs); 226 break; 227 } 228 } 229 } 230 //LeaveCriticalSection(&cs); 231 //Sleep(0); 232 } 233 printf("Recv %d Msg, Size:%d, ErrorPack:%d!\n", TEST_MSG_COUNT, iTotalSize,iErrorPack); 234 235 // __EndThreadEx 236 } 237 238 int _tmain(int argc, _TCHAR* argv[]) 239 { 240 srand(1); 241 q = new VideoQueue; 242 if (!q->CreateBuff()) 243 { 244 printf("createbuff fail\n"); 245 getchar(); 246 return 0; 247 } 248 InitializeCriticalSection(&cs); 249 250 HANDLE hWrite = (HANDLE)_beginthread(ThreadA, 0, 0); 251 HANDLE hRead = (HANDLE)_beginthread(ThreadB, 0, 0); 252 253 WaitForSingleObject(hWrite, INFINITE); 254 WaitForSingleObject(hRead, INFINITE); 255 256 getchar(); 257 258 return 0; 259 }
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。