首页 > 代码库 > ICOPclient版本号,异步connect
ICOPclient版本号,异步connect
之前在网上看到一个服务端的ICOP模块,比較小巧,感觉还不错,后来在工作中,需要开发一个挂号的程序,监视大量server执行情况,初期连接数大概六七百,我就把这个ICOP模块改造成了一个client版本号。后来发现因为是同步的connect,有时候会卡在connect过程非常久,也不方便设置connect的超时,想到使用ConnectEx做异步连接,感觉ConnectEx过于繁琐,还得自己获取函数指针,必需要先调用bind等,断开连接要调用DisconnectEx。后来我自己想到一种办法,在调用connect之前,用ioctlsocket把socket先设置为非堵塞模式,然后在连接成功后再设置回堵塞模式,但这有一个问题,ICOP里面设置为非堵塞模式,怎么推断连接成功、失败、超时呢?我是这么做的,调用connect成功之后,投递事件,在connect事件里,调用getsockopt(clt->fd, SOL_SOCKET, 0x700C/*SO_CONNECT_TIME*/, (char*)&Connect_Time, &len)来检測连接时间,假设返回-1表示连接没有成功,然后推断是否超时,假设超时直接失败,否则断续投递事件,直到连接成功或者超时,以下直接上代码,关键代码段在:int connect()函数和case T::EV_CONNECT: 段:
#ifndef iocptcpclient_h__ #define iocptcpclient_h__ #include <Winsock2.h> #include <windows.h> #include <MSTCPiP.h> #pragma comment(lib, "Ws2_32.lib") namespace iocp { template<typename T> class Scheduler { public: void start(); void stop(); void push(T * clt); public: int scheds; HANDLE iocp; }; template<typename T> void Scheduler<T>::start() { iocp = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, scheds); if (NULL == iocp) { throw (int)::WSAGetLastError(); } } template<typename T> void Scheduler<T>::stop() { } template<typename T> void Scheduler<T>::push(T * clt) { ::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)clt, NULL); } template<typename T> class Processor { public: void start(); void stop(); public: static DWORD WINAPI run(LPVOID param); public: int threads; Scheduler<T> * scheder; }; template<typename T> void Processor<T>::start() { for (int i = 0; i < threads; i++) { DWORD tid; HANDLE thd = ::CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)run, this, 0, &tid); if (NULL == thd) { throw (int)::GetLastError(); } ::CloseHandle(thd); } } template<typename T> void Processor<T>::stop() { } template<typename T> DWORD WINAPI Processor<T>::run(LPVOID param) { Processor<T>& procor = *(Processor<T> *)param; Scheduler<T>& scheder = *procor.scheder; HANDLE iocp = scheder.iocp; DWORD ready; ULONG_PTR key; WSAOVERLAPPED * overlap; while (true) { ::GetQueuedCompletionStatus(iocp, &ready, &key, (LPOVERLAPPED *)&overlap, INFINITE); T * clt = (T *)key; switch (clt->event) { case T::EV_RECV: { if (0 >= ready) { clt->event = T::EV_DISCONNECT; ::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)clt, NULL); } else { clt->OnRecv(ready); } } break; case T::EV_CONNECT: { int Connect_Time; int len = sizeof(Connect_Time); int result = getsockopt(clt->fd, SOL_SOCKET, 0x700C/*SO_CONNECT_TIME*/, (char*)&Connect_Time, &len); if (Connect_Time == -1){ if (GetTickCount() - clt->dwConnTime >= clt->maxConnTime){ clt->OnConnectFailed(); ::closesocket(clt->fd); clt->fd = INVALID_SOCKET; } else { Sleep(1); ::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)clt, NULL); } } else { unsigned long ul = 0; ioctlsocket(clt->fd, FIONBIO, &ul); //设置为堵塞模式*/ if (NULL == ::CreateIoCompletionPort((HANDLE)clt->fd, iocp, (ULONG_PTR)clt, 0)) { clt->OnConnectFailed(); ::closesocket(clt->fd); clt->fd = INVALID_SOCKET; //delete clt; } else { clt->OnConnect(); } } } break; case T::EV_DISCONNECT: { clt->OnDisconnect(); ::closesocket(clt->fd); clt->fd = INVALID_SOCKET; //delete clt; } break; case T::EV_SEND: break; } } return 0; } class Client { public: enum EVENT { EV_CONNECT, EV_DISCONNECT, EV_RECV, EV_SEND }; Client(){ fd = INVALID_SOCKET; maxConnTime = 5000; } virtual ~Client(){}; int connect(){ this->event = EV_CONNECT; dwConnTime = GetTickCount(); struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_addr.s_addr = ip; addr.sin_port = htons(port); DWORD dwError = 0, dwBytes = 0; tcp_keepalive sKA_Settings = { 0 }, sReturned = { 0 }; sKA_Settings.onoff = 1; sKA_Settings.keepalivetime = 30000; // Keep Alive in 30 sec. sKA_Settings.keepaliveinterval = 3000; // Resend if No-Reply if (WSAIoctl(fd, SIO_KEEPALIVE_VALS, &sKA_Settings, sizeof(sKA_Settings), &sReturned, sizeof(sReturned), &dwBytes, NULL, NULL) != 0) { dwError = WSAGetLastError(); } unsigned long ul = 1; ioctlsocket(fd, FIONBIO, &ul); //设置为非堵塞模式 int ret = -1; if (::connect(fd, (struct sockaddr *)&addr, sizeof(struct sockaddr)) == -1 && WSAGetLastError() == WSAEWOULDBLOCK) { ret = 0; } return ret; } void send(const char * buff, int len){ ::send(fd, buff, len, 0); } void recv(char * buff, int len){ this->event = EV_RECV; ::memset(&overlap, 0, sizeof(overlap)); WSABUF buf; buf.buf = buff; buf.len = len; DWORD ready = 0; DWORD flags = 0; if (0 != ::WSARecv(fd, &buf, 1, &ready, &flags, &overlap, NULL) && WSA_IO_PENDING != WSAGetLastError()) { this->event = EV_DISCONNECT; ::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)this, NULL); } } void close(){ ::shutdown(fd, SD_BOTH); } virtual void OnConnect(){};//连接成功 virtual void OnConnectFailed(){};//连接失败 virtual void OnDisconnect(){}; //连接断开 virtual void OnRecv(int len){}; virtual void OnSend(int len){}; public: int ip; int port; void * srv; HANDLE iocp; EVENT event; SOCKET fd; DWORD maxConnTime; DWORD dwConnTime; WSAOVERLAPPED overlap; }; template<typename T> class TCPClt { public: void start(); void stop(); bool addclt(T* clt, int ip, int port); public: int scheds; int threads; iocp::Scheduler<T> scheder; iocp::Processor<T> procor; }; template<typename T> void TCPClt<T>::start() { WSADATA wsadata; int wsaversion = WSAStartup(MAKEWORD(2, 2), &wsadata); if (threads <= 0) { threads = 1; } scheder.scheds = scheds; scheder.start(); procor.threads = threads; procor.scheder = &scheder; procor.start(); } template<typename T> void TCPClt<T>::stop() { } template<typename T> bool TCPClt<T>::addclt(T* clt, int ip, int port){ clt->ip = ip; clt->port = port; clt->iocp = scheder.iocp; clt->fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED); if (clt->fd == INVALID_SOCKET) { return false; } if (clt->connect() != 0) { closesocket(clt->fd); return false; } scheder.push(clt); return true; } } #endif // iocptcpclient_h__
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。