首页 > 代码库 > winsock编程IOCP模型实现代码

winsock编程IOCP模型实现代码

winsock编程IOCP模型实现代码

  话不多说,上代码。借鉴《windows核心编程》部分源码和CSDN小猪部分代码。

  stdafx.h依赖头文件:

 1 #include <iostream>
 2 #include <WinSock2.h>
 3 #include <MSWSock.h>
 4 #include <vector>
 5 #include "Singleton.h"
 6 #include "IOCPWrapper.h"
 7 #include "OverlappedIOInfo.h"
 8 #include "TaskSvc.h"
 9 
10 using namespace std;

  其中,TaskSvc.h、Singleton.h源码可以在我的blog里面找到。

  IOCPWrapper.h源码:

 1 /******************************************************************************
 2 Module:  IOCP.h
 3 Notices: Copyright (c) 2007 Jeffrey Richter & Christophe Nasarre
 4 Purpose: This class wraps an I/O Completion Port.
 5 Revise:    IOCP封装类,由《windows核心编程》第10章示例程序源码改编所得
 6 ******************************************************************************/
 7 #pragma once   
 8 
 9 class CIOCP 
10 {
11 public:
12    CIOCP(int nMaxConcurrency = -1)
13    {
14        m_hIOCP = NULL; 
15        if (nMaxConcurrency != -1)
16            Create(nMaxConcurrency);
17    }
18    ~CIOCP()
19    {
20        if (m_hIOCP != NULL) 
21            VERIFY(CloseHandle(m_hIOCP)); 
22    }
23 
24    //关闭IOCP
25    BOOL Close()
26    {
27        BOOL bResult = CloseHandle(m_hIOCP);
28        m_hIOCP = NULL;
29        return(bResult);
30    }
31 
32    //创建IOCP,nMaxConcurrency指定最大线程并发数量,0默认为cpu数量
33    BOOL Create(int nMaxConcurrency = 0)
34    {
35        m_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nMaxConcurrency);
36        ASSERT(m_hIOCP != NULL);
37        return(m_hIOCP != NULL);
38    }
39 
40    //为设备(文件、socket、邮件槽、管道等)关联一个IOCP
41    BOOL AssociateDevice(HANDLE hDevice, ULONG_PTR CompKey)
42    {
43        BOOL fOk = (CreateIoCompletionPort(hDevice, m_hIOCP, CompKey, 0) == m_hIOCP);
44        ASSERT(fOk);
45        return(fOk);
46    }
47 
48    //为socket关联一个IOCP
49    BOOL AssociateSocket(SOCKET hSocket, ULONG_PTR CompKey)
50    {
51        return(AssociateDevice((HANDLE) hSocket, CompKey));
52    }
53 
54    //为iocp传递事件通知
55    BOOL PostStatus(ULONG_PTR CompKey, DWORD dwNumBytes = 0,OVERLAPPED* po = NULL)
56    {
57        BOOL fOk = PostQueuedCompletionStatus(m_hIOCP, dwNumBytes, CompKey, po);
58        ASSERT(fOk);
59        return(fOk);
60    }
61 
62    //从IO完成队列中获取事件通知。IO完成队列无事件时,该函数将阻塞
63    BOOL GetStatus(ULONG_PTR* pCompKey, PDWORD pdwNumBytes,OVERLAPPED** ppo, DWORD dwMilliseconds = INFINITE)
64    {
65        return(GetQueuedCompletionStatus(m_hIOCP, pdwNumBytes,pCompKey, ppo, dwMilliseconds));
66    }
67 
68    //获取IOCP对象
69    const HANDLE  GetIOCP()
70    {
71        return m_hIOCP;
72    }
73 private:
74     //IOCP句柄
75    HANDLE m_hIOCP;
76 };
77 
78 ///////////////////////////////// End of File /////////////////////////////////

OverlappedIOInfo.h源码

 

 1 /******************************************************************************
 2 Module:  OverlappedIOInfo.h
 3 Notices: Copyright (c) 20161201  whg
 4 Purpose:
 5 IOCP网络编程模型中,需要用到GetQueuedCompletionStatus函数获取已完成事件。
 6 但该函数的返回参数无socket或buffer的描述信息。
 7 
 8 一个简单的解决办法,创建一个新的结构,该结构第一个参数是OVERLAPPED。
 9 由于AcceptEx、WSASend等重叠IO操作传入的是Overlapped结构体的地址,调用AcceptEx等重叠IO操作,
10 在Overlapped结构体后面开辟新的空间,写入socket或buffer的信息,即可将socket或buffer的信息由
11 GetQueuedCompletionStatus带回。
12 
13 参考《windows核心编程》和CSDN PiggyXP
14 ******************************************************************************/
15 
16 #pragma once
17 
18 #define MAXBUF 8*1024
19 
20 enum IOOperType{
21     TYPE_ACP,            //accept事件到达,有新连接请求    
22     TYPE_RECV,            //数据接收事件
23     TYPE_SEND,            //数据发送事件
24     TYPE_CLOSE,            //关闭事件
25     TYPE_NO_OPER
26 };
27 
28 class COverlappedIOInfo :    public OVERLAPPED
29 {
30 public:
31     COverlappedIOInfo(void)
32     {
33         m_sSock = INVALID_SOCKET;
34         ResetOverlapped();
35         ResetRecvBuffer();
36         ResetSendBuffer();
37     }
38     ~COverlappedIOInfo(void)
39     {
40         if (m_sSock != INVALID_SOCKET)
41         {
42             closesocket(m_sSock);
43             m_sSock = INVALID_SOCKET;
44         }
45     }
46     void ResetOverlapped()
47     {
48         Internal = InternalHigh = 0;   
49         Offset = OffsetHigh = 0;   
50         hEvent = NULL;
51     }
52     void ResetRecvBuffer()
53     {
54         ZeroMemory(m_cRecvBuf,MAXBUF);
55         m_recvBuf.buf = m_cRecvBuf;
56         m_recvBuf.len = MAXBUF;
57     }
58     void ResetSendBuffer()
59     {
60         ZeroMemory(m_cSendBuf,MAXBUF);
61         m_sendBuf.buf = m_cSendBuf;
62         m_sendBuf.len = MAXBUF;
63     }
64 public:
65     //套接字
66     SOCKET        m_sSock;            
67     //接收缓冲区,用于AcceptEx、WSARecv操作
68     WSABUF        m_recvBuf;            
69     char        m_cRecvBuf[MAXBUF];
70     //发送缓冲区,用于WSASend操作
71     WSABUF        m_sendBuf;
72     char        m_cSendBuf[MAXBUF];    
73     //对端地址
74     sockaddr_in    m_addr;                
75 };

 

server.h

 1 #pragma once
 2 
 3 
 4 class CServer:public CTaskSvc
 5 {
 6 #define ACCEPT_SOCKET_NUM  10
 7 
 8 public:
 9     CServer(void);
10     ~CServer(void);
11     bool    StartListen(unsigned short port,std::string ip);
12 
13 protected:
14     virtual void svc();
15 
16 private:
17     //启动CPU*2个线程,返回已启动线程个数
18     UINT    StartThreadPull();
19     //获取AcceptEx和GetAcceptExSockaddrs函数指针
20     bool    GetLPFNAcceptEXAndGetAcceptSockAddrs();
21     //利用AcceptEx监听accept请求
22     bool    PostAccept(COverlappedIOInfo* ol);
23     //处理accept请求,NumberOfBytes=0表示没有收到第一帧数据,>0表示收到第一帧数据
24     bool    DoAccept(COverlappedIOInfo* ol,DWORD NumberOfBytes=0);
25     //投递recv请求
26     bool    PostRecv(COverlappedIOInfo* ol);
27     //处理recv请求
28     bool    DoRecv(COverlappedIOInfo* ol);
29     //从已连接socket列表中移除socket及释放空间
30     bool    DeleteLink(SOCKET s);
31     //释放3个部分步骤:
32     //1:清空IOCP线程队列,退出线程
33     //2: 清空等待accept的套接字m_vecAcps
34     //3: 清空已连接的套接字m_vecContInfo并清空缓存
35     void    CloseServer();
36 private:
37     //winsock版本类型
38     WSAData                        m_wsaData;
39     //端口监听套接字
40     SOCKET                        m_sListen;
41     //等待accept的套接字,这些套接字是没有使用过的,数量为ACCEPT_SOCKET_NUM。同时会有10个套接字等待accept
42     std::vector<SOCKET>            m_vecAcps;            
43     //已建立连接的信息,每个结构含有一个套接字、发送缓冲和接收缓冲,以及对端地址
44     std::vector<COverlappedIOInfo*>    m_vecContInfo;    
45     //操作vector的互斥访问锁
46     CThreadLockCs                m_lsc;                    
47     //IOCP封装类
48     CIOCP                        m_iocp;        
49     //AcceptEx函数指针
50     LPFN_ACCEPTEX                m_lpfnAcceptEx;        
51     //GetAcceptSockAddrs函数指针
52     LPFN_GETACCEPTEXSOCKADDRS    m_lpfnGetAcceptSockAddrs;
53 };
54 
55 typedef CSingleton<CServer> SERVER;

server.cpp

  1 #include "StdAfx.h"
  2 #include "Server.h"
  3 
  4 CServer::CServer(void)
  5 {
  6     m_lpfnAcceptEx = NULL;
  7     m_lpfnGetAcceptSockAddrs = NULL;
  8     WSAStartup(MAKEWORD(2,2),&m_wsaData);
  9 }
 10 
 11 CServer::~CServer(void)
 12 {
 13     CloseServer();
 14     WSACleanup();
 15 }
 16 
 17 bool CServer::StartListen(unsigned short port,std::string ip)
 18 {
 19     //listen socket需要将accept操作投递到完成端口,因此,listen socket属性必须有重叠IO
 20     m_sListen = WSASocket(AF_INET,SOCK_STREAM,IPPROTO_TCP,NULL,0,WSA_FLAG_OVERLAPPED);
 21     if(m_sListen == INVALID_SOCKET)
 22     {
 23         cout<<"WSASocket create socket error"<<endl;
 24         return false;
 25     }
 26     //创建并设置IOCP并发线程数量
 27     if (m_iocp.Create() == FALSE)
 28     {
 29         cout<<"IOCP create error,error code "<<WSAGetLastError()<<endl;
 30         return false;
 31     }
 32     //将listen socket绑定至iocp
 33     if (!m_iocp.AssociateSocket(m_sListen,TYPE_ACP))
 34     {
 35         cout<<"iocp Associate listen Socket error"<<endl;
 36         return false;
 37     }
 38     sockaddr_in service;
 39     service.sin_family = AF_INET;
 40     service.sin_port = htons(port);
 41     if (ip.empty())
 42     {
 43         service.sin_addr.s_addr = INADDR_ANY;
 44     }
 45     else
 46     {
 47         service.sin_addr.s_addr = inet_addr(ip.c_str());
 48     }
 49 
 50     if (bind(m_sListen,(sockaddr*)&service,sizeof(service)) == SOCKET_ERROR)
 51     {
 52         cout<<"bind() error,error code "<<WSAGetLastError()<<endl;
 53         return false;
 54     }
 55     cout<<"bind ok!"<<endl;
 56 
 57     if (listen(m_sListen,SOMAXCONN) == SOCKET_ERROR)
 58     {
 59         cout<<"listen() error,error code "<<WSAGetLastError()<<endl;
 60         return false;
 61     }
 62     cout<<"listen ok!"<<endl;
 63     //启动工作者线程
 64     int threadnum = StartThreadPull();
 65     cout<<"启动工作者线程,num="<<threadnum<<endl;
 66     //获取AcceptEx和GetAcceptSockAddrs函数指针
 67     if (!GetLPFNAcceptEXAndGetAcceptSockAddrs())
 68     {
 69         return false;
 70     }
 71     //创建10个acceptex
 72     for (int i=0;i<ACCEPT_SOCKET_NUM;i++)
 73     {
 74         //用accept
 75         COverlappedIOInfo* ol = new COverlappedIOInfo;
 76         if (!PostAccept(ol))
 77         {
 78             delete ol;
 79             return false;
 80         }
 81     }
 82 }
 83 
 84 bool CServer::GetLPFNAcceptEXAndGetAcceptSockAddrs()
 85 {
 86     DWORD BytesReturned = 0;
 87     //获取AcceptEx函数指针
 88     GUID GuidAcceptEx = WSAID_ACCEPTEX;
 89     if (SOCKET_ERROR == WSAIoctl(
 90         m_sListen,
 91         SIO_GET_EXTENSION_FUNCTION_POINTER,
 92         &GuidAcceptEx,
 93         sizeof(GuidAcceptEx),
 94         &m_lpfnAcceptEx,
 95         sizeof(m_lpfnAcceptEx),
 96         &BytesReturned,
 97         NULL,NULL))
 98     {
 99         cout<<"WSAIoctl get AcceptEx function error,error code "<<WSAGetLastError()<<endl;
100         return false;
101     }
102     
103     //获取GetAcceptexSockAddrs函数指针
104     GUID GuidGetAcceptexSockAddrs = WSAID_GETACCEPTEXSOCKADDRS; 
105     if (SOCKET_ERROR == WSAIoctl(
106         m_sListen,
107         SIO_GET_EXTENSION_FUNCTION_POINTER,
108         &GuidGetAcceptexSockAddrs,
109         sizeof(GuidGetAcceptexSockAddrs),
110         &m_lpfnGetAcceptSockAddrs,
111         sizeof(m_lpfnGetAcceptSockAddrs),
112         &BytesReturned,
113         NULL,NULL))
114     {
115         cout<<"WSAIoctl get GetAcceptexSockAddrs function error,error code "<<WSAGetLastError()<<endl;
116         return false;
117     }
118     return true;
119 }
120 
121 bool CServer::PostAccept(COverlappedIOInfo* ol)
122 {
123     if (m_lpfnAcceptEx == NULL)
124     {
125         cout << "m_lpfnAcceptEx is NULL"<<endl;
126         return false;
127     }
128     SOCKET s = ol->m_sSock;
129     ol->ResetRecvBuffer();
130     ol->ResetOverlapped();
131     ol->ResetSendBuffer();
132     ol->m_sSock = WSASocket(AF_INET,SOCK_STREAM,IPPROTO_TCP,NULL,0,WSA_FLAG_OVERLAPPED);
133     if (ol->m_sSock == INVALID_SOCKET)
134     {
135         cout<<"WSASocket error ,error code "<<WSAGetLastError()<<endl;
136         return false;
137     }
138     //这里建立的socket用来和对端建立连接,终会加入m_vecContInfo列表
139     //调用acceptex将accept socket绑定至完成端口,并开始进行事件监听
140     //这里需要传递Overlapped,new一个COverlappedIOInfo
141     //AcceptEx是m_listen的监听事件,m_listen已经绑定了完成端口;虽然ol->m_sSock已经创建,
142     //但未使用,现在不必为ol->m_sSock绑定完成端口。在AcceptEx事件发生后,再为ol->m_sSock绑定IOCP
143     DWORD byteReceived = 0;
144     if (FALSE == m_lpfnAcceptEx(
145         m_sListen,
146         ol->m_sSock,
147         ol->m_recvBuf.buf,
148         ol->m_recvBuf.len - (sizeof(SOCKADDR_IN)+16)*2,
149         sizeof(SOCKADDR_IN)+16,
150         sizeof(SOCKADDR_IN)+16,
151         &byteReceived,
152         ol))
153     {
154         DWORD res = WSAGetLastError();
155         if (ERROR_IO_PENDING != res)
156         {
157             cout<<"AcceptEx error , error code "<<res<<endl;
158             return false;
159         }
160     }
161     std::vector<SOCKET>::iterator iter = m_vecAcps.begin();
162     for (;iter != m_vecAcps.end(); iter++)
163     {
164         if (*iter == s)
165         {
166             *iter = ol->m_sSock;
167         }
168     }
169     if (iter == m_vecAcps.end())
170     {
171         m_vecAcps.push_back(ol->m_sSock);
172     }
173     return true;
174 }
175 
176 bool CServer::DoAccept(COverlappedIOInfo* ol,DWORD NumberOfBytes)
177 {
178     //分支用于获取远端地址。
179     //如果接收TYPE_ACP同时收到第一帧数据,则第一帧数据内包含远端地址。
180     //如果没有收到第一帧数据,则通过getpeername获取远端地址
181     if (NumberOfBytes > 0)
182     {
183         //接受的数据分成3部分,第1部分是客户端发来的数据,第2部分是本地地址,第3部分是远端地址。
184         if (m_lpfnGetAcceptSockAddrs)
185         {
186             SOCKADDR_IN* ClientAddr = NULL;
187             SOCKADDR_IN* LocalAddr = NULL;  
188             int remoteLen = sizeof(SOCKADDR_IN), localLen = sizeof(SOCKADDR_IN);  
189             m_lpfnGetAcceptSockAddrs(
190                 ol->m_recvBuf.buf,
191                 ol->m_recvBuf.len - (sizeof(SOCKADDR_IN)+16)*2,
192                 sizeof(SOCKADDR_IN)+16,
193                 sizeof(SOCKADDR_IN)+16,
194                 (LPSOCKADDR*)&LocalAddr,
195                 &localLen,
196                 (LPSOCKADDR*)&ClientAddr,
197                 &remoteLen);
198             cout<<"收到新的连接请求,ip="<<inet_ntoa(ClientAddr->sin_addr)<<",port="<<ClientAddr->sin_port<<endl;
199             COverlappedIOInfo* pol = new COverlappedIOInfo;
200             pol->m_sSock = ol->m_sSock;
201             pol->m_addr =  *ClientAddr;
202             //服务端只收取recv,同时监听recv和send可用设计位偏移,用或运算实现
203             if (m_iocp.AssociateSocket(pol->m_sSock,TYPE_RECV))
204             {
205                 PostRecv(pol);
206                 
207                 m_vecContInfo.push_back(pol);
208             }
209             else
210             {
211                 delete pol;
212                 return false;
213             }
214         }
215     }
216     else if (NumberOfBytes==0)
217     {
218         //未收到第一帧数据
219     }
220     return true;
221 }
222 
223 bool CServer::DoRecv(COverlappedIOInfo* ol)
224 {
225     cout<<"收到客户端数据:ip="<<inet_ntoa(ol->m_addr.sin_addr)<<",port="<<ol->m_addr.sin_port<<
226         ";内容="<<ol->m_recvBuf.buf<<endl;
227     return true;
228 }
229 
230 bool CServer::PostRecv(COverlappedIOInfo* ol)
231 {
232     DWORD BytesRecvd = 0;
233     DWORD dwFlags = 0;
234     ol->ResetOverlapped();
235     ol->ResetRecvBuffer();
236     int recvnum = WSARecv(ol->m_sSock,&ol->m_recvBuf,1,&BytesRecvd,&dwFlags,(OVERLAPPED*)ol,NULL);
237     if (recvnum != 0)
238     {
239         int res = WSAGetLastError();
240         if (WSA_IO_PENDING != res)
241         {
242             cout<<"WSARecv error,error code "<<res<<endl;
243         }
244     }
245     return true;
246 }
247 
248 UINT CServer::StartThreadPull()
249 {
250     //获取系统cpu个数启动线程
251     SYSTEM_INFO si;
252     GetSystemInfo(&si);
253     //启动cpu数量*2个线程
254     return Activate(si.dwNumberOfProcessors * 2);
255 }
256 
257 bool CServer::DeleteLink(SOCKET s)
258 {
259     m_lsc.lock();
260     std::vector<COverlappedIOInfo*>::iterator iter = m_vecContInfo.begin();
261     for (;iter!=m_vecContInfo.end();iter++)
262     {
263         if (s == (*iter)->m_sSock)
264         {
265             COverlappedIOInfo* ol = *iter;
266             closesocket(s);
267             m_vecContInfo.erase(iter);
268             delete ol;
269             break;
270         }
271     }
272     m_lsc.unlock();
273     return true;
274 }
275 
276 void CServer::svc()
277 {
278     while (true)
279     {
280         DWORD  NumberOfBytes = 0;
281         unsigned long CompletionKey = 0;
282         OVERLAPPED*    ol = NULL;
283         if (FALSE != GetQueuedCompletionStatus(m_iocp.GetIOCP(),&NumberOfBytes,&CompletionKey,&ol,WSA_INFINITE))
284         {
285             if (CompletionKey == TYPE_CLOSE)
286             {
287                 break;
288             }
289             COverlappedIOInfo* olinfo = (COverlappedIOInfo*)ol;
290             if (CompletionKey == TYPE_ACP)
291             {
292                 DoAccept(olinfo,NumberOfBytes);
293                 PostAccept(olinfo);
294             }
295             else if(CompletionKey == TYPE_RECV)
296             {
297                 if (NumberOfBytes == 0)
298                 {
299                     //客户端断开连接
300                     cout<<"客户端断开连接,ip="<<inet_ntoa(olinfo->m_addr.sin_addr)<<",port="<<olinfo->m_addr.sin_port<<endl;
301                     DeleteLink(olinfo->m_sSock);
302                     continue;
303                 }
304                 DoRecv(olinfo);
305                 PostRecv(olinfo);
306             }
307         }
308         else
309         {
310             cout<<"workthread GetQueuedCompletionStatus error,error code "<<WSAGetLastError()<<endl;
311             break;
312         }
313     }
314     cout<<"workthread stop"<<endl;
315 }
316 
317 void CServer::CloseServer()
318 {
319     //1:清空IOCP线程队列,退出线程,有多少个线程发送多少个PostQueuedCompletionStatus信息
320     int threadnum = GetThreadsNum();
321     for (int i=0;i<threadnum;i++)
322     {
323         if (FALSE == m_iocp.PostStatus(TYPE_CLOSE))
324         {
325             cout<<"PostQueuedCompletionStatus error,error code "<<WSAGetLastError()<<endl;
326         }
327     }
328     //2:清空等待accept的套接字m_vecAcps
329     std::vector<SOCKET>::iterator iter = m_vecAcps.begin();
330     for (;iter != m_vecAcps.end();iter++)
331     {
332         SOCKET s = *iter;
333         closesocket(s);
334     }
335     m_vecAcps.clear();
336     //3:清空已连接的套接字m_vecContInfo并清空缓存
337     std::vector<COverlappedIOInfo*>::iterator iter2 = m_vecContInfo.begin();
338     for (;iter2 != m_vecContInfo.end();iter2++)
339     {
340         COverlappedIOInfo* ol = *iter2;
341         closesocket(ol->m_sSock);
342         iter2 = m_vecContInfo.erase(iter2);
343         delete ol;
344     }
345     m_vecContInfo.clear();
346 }

调用方法,控制台程序main函数内加入

1 SERVER::Instance()->StartListen(8828,"127.0.0.1");
2 int outId;
3 cin>>outId;
4 if (outId == 0)
5 {
6     SERVER::Close();
7 }

输入0结束服务程序。

测试结果1,接收数据

技术分享

测试结果2,客户端断开连接

技术分享  

  机子比较垃圾,开1000个线程占用20多的CPU。有条件的可以多测试一些。

 

winsock编程IOCP模型实现代码