首页 > 代码库 > 重叠IO
重叠IO
一、 异步IO
说到重叠模型首先还是提一下异步IO比较好,因为从本质上讲,重叠模型也是一种异步IO模型。
我们知道,相对于计算机执行的其他操作而言,设备IO(文件、管道、套接字等)是比较慢的。于是在多线程结构中就考虑到采用异步的方式进行设备读写操作,即我们告诉系统对设备的读写数据,而同时应用程序的其他代码继续执行,直到获取设备操作完毕的系统通知。
在进行异步IO时,我们先向系统发出IO请求,操作系统队列化各种IO请求,并在内部完成操作,当系统在处理IO请求时,我们的线程可以返回继续执行,当操作系统处理完IO请求之后,通知我们数据操作(发送、接收、出错)完毕。
Windows提供了四种异步IO技术,机制几乎时相同的,区别在于通知结果的方式不同:
1、 使一个设备内核对象变为有信号
Windows将设备句柄看作可同步的对象,即它可以处于有信号或处于无信号状态,当创建设备句柄、以异步的方式发送IO请求时,该句柄处于无信号状态,当异步IO完成之后,该句柄受信,通过WaitForSingleobject或WatiForMultipleObjects函数可以判断设备操作合适完成。该技术只能用于一个设备只发送一个IO请求,否则,若一个设备对应多个操作,当句柄受信时无法判断是该设备的那个操作完成。
2、 使一个事件内核对象变为有信号
针对每个I/O操作绑定一个内核事件对象,并将等待事件等待函数等待该事件的受信,当I/O操作完成后系统使得与该操作绑定的事件受信,从而判断那个操作完成。该技术解决了使一个设备内核对象变为有信号技术中一个设备只能对应一个操作的不足。
3、 警告I/O
在该技术中,当发出设备IO请求时,同时要求我们传递一个被称为完成例程的回调函数,当IO请求完成时调用该回调函数完成我们需要处理的工作。该技术允许单个设备同时进行多个I/O请求。
4、 完成端口
完成端口技术多用于处理大规模的请求,通过内在的进程池技术可以达到很高的性能,此时暂不做深入讨论,若预知后事如何,请自己看,或等下回完成端口部分分解。
好,至此,刀磨的差不多了,估计也飘了~~~~,干正事了。
二、 网络编程中的重叠IO模型理论
在编程中可以有两种方法管理I/O请求:
1、 事件对象通知(event object notification) —对应上面的2
2、 完成例程(completion routines)——对应上面的3
这里只讨论第一种情况,具体思路就是正对每个套接字的每个操作绑定一个事件,然后将所有的事件组成事件数据,运用事件等待函数等待事件的发生,并根据事件与操作的对应关系对与之对应的操作进行处理。
下面说一下实际编程中所用到的数据结构及函数。
1、WSAOVERLAPPED结构
这个结构自然是重叠模型里的核心,用于绑定套接字、操作以及操作对应的事件。
typedef struct _WSAOVERLAPPED {
DWORD Internal;
DWORD InternalHigh;
DWORD Offset; //文件操作中想要开始的低偏移量,网络中不用
DWORD OffsetHigh; //文件操作中想要开始的高偏移量,网络中不用
WSAEVENT hEvent; // 网络编程中唯一需要关注的参数,用来关联WSAEvent对象
} WSAOVERLAPPED, *LPWSAOVERLAPPED;
2. WSARecv系列函数
在重叠模型中,发送接收等函数均由WSASend、WSARecv、WSASendTo、WSARecvFrom等函数代替,通过该函数将I/O操作与 WSAOVERLAPPED结构绑定起来,也既是与一个特定的事件绑定在一起,当系统完成操作之后与该操作绑定的事件受信。
int WSARecv(
SOCKET s, // 当然是投递这个操作的套接字
LPWSABUF lpBuffers, // 接收缓冲区,是WSABUF结构构成的数组
DWORD dwBufferCount, // 数组中WSABUF结构的数量
LPDWORD lpNumberOfBytesRecvd, // 如果接收操作立即完成,返回函数调用所接收到的字节数
LPDWORD lpFlags, // 说来话长了,我们这里设置为0 即可
LPWSAOVERLAPPED lpOverlapped, // “绑定”的重叠结构
LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine); // 完成例程中将会用到的参数,我们这里设置为 NULL
返回值:
WSA_IO_PENDING : 最常见的返回值,这是说明我们的WSARecv操作成功了,但是I/O操作还没有完成,所以我们就需要绑定一个事件来通知我们操作何时完成
3、WSAWaitForMultipleEvents函数
该函数类似于线程中常用的WaitForMultipleObjects函数,都是在等待事件的触发。我们将WSARecv等操作上绑定的LPWSAOVERLAPPED数据结构中的事件组称事件数据,在该函数上等待。
DWORD WSAWaitForMultipleEvents(
DWORD cEvents, // 等候事件的总数量
const WSAEVENT* lphEvents, // 事件数组的指针
BOOL fWaitAll, //当设置为 TRUE,事件数组中所有事件被传信的时候函数才会返回
// FALSE则任何一个事件被传信函数都要返回,此时设为FALSE
DWORD dwTimeout, // 超时时间,如果超时,函数会返回 WSA_WAIT_TIMEOUT
// 如果设置为0,函数会立即返回
BOOL fAlertable); // 在完成例程中会用到这个参数,这里我们先设置为FALSE
返回值:
WSA_WAIT_TIMEOUT :最常见的返回值,等待超时,我们需要做的就是继续Wait
WSA_WAIT_FAILED : 出现了错误,请检查cEvents和lphEvents两个参数是否有效
如果事件数组中有某一个事件受信了,函数会返回这个事件的索引值,但是这个索引值需要减去预定义值 WSA_WAIT_EVENT_0才是这个事件在事件数组中的位置。
注:WSAWaitForMultipleEvents函数只能支持由WSA_MAXIMUM_WAIT_EVENTS对象定义的一个最大值64,就是说WSAWaitForMultipleEvents只能等待64个事件,如果想同时等待多于64个事件,就要 创建额外的工作者线程,就需要通过线程池管理了。
4、WSAGetOverlappedResult函数
我们利用该函数来查询重叠操作的结果,定义如下:
BOOL WSAGetOverlappedResult(
SOCKET s, // SOCKET,需要操作的套接字
LPWSAOVERLAPPED lpOverlapped, //我们想要查询结果的那个重叠结构的指针
LPDWORD lpcbTransfer, // 本次重叠操作的实际接收(或发送)的字节数
BOOL fWait, // 设置为TRUE,除非重叠操作完成,否则函数不会返回
// 设置FALSE,而且操作仍处于挂起状态,那么函数就会返回FALSE
// 此处采用的是设备内核对象受信方式,等待套接字受信。
LPDWORD lpdwFlags); // 指向DWORD的指针,负责接收结果标志
注:如果WSAGetOverlappedResult完成以后,第三个参数返回是 0 ,则说明通信对方已经关闭连接,我们这边的SOCKET, Event之类的也就可以关闭了。
三、编程步骤
1、创建一个套接字,开始在指定的端口上监听连接请求。
2、接收一个入站的连接请求。
3、为接受的套接字创建新的WSAOVERLAPPED结构,并分配事件对象句柄。
4、以WSAOVERLAPPED结构为参数,在套接字上投递WSARecv调用。
5、将所有接受套接字的事件组建事件数组,并调用WSAWaitForMultipleEvents函数,等待与重叠调用关联在一起的事件受信。
6、使用WSAGetOverlappedResult函数,判断重叠调用的返回状态。
7、重新组建事件数组。
8、在套接字上重投递WSARecv请求。
9、重复5~8。
例子:初步封装了OverLapped类
/*SockObject*/
#include <winsock2.h>
class SockObject
{
public:
SOCKET m_sock;
int m_operatNum;
public:
SockObject(void);
SockObject(SOCKET mySock);
SockObject(SockObject &mySockObject);
~SockObject(void);
};
SockObject::SockObject(SOCKET mySock)
{
m_sock = mySock;
m_operatNum = 0;
}
SockObject::SockObject(SockObject &mySockObject)
{
m_sock = mySockObject.m_sock;
m_operatNum = mySockObject.m_operatNum;
}
/******************************************************************************
*数据结构名称:OverObject
*功能:记录一个套接字的一个操作、一个事件和一个重叠I/O的关联
*****************************************************************************/
class OverObject
{
public:
SOCKET m_sock; /*绑定的套接字*/
OVERLAPPED m_overlapped; /*绑定的重叠I/O*/
char *m_buf; /*用于存放数据的缓冲区*/
int m_len; /*缓冲区的长度*/
int m_operation; /*套接字针对的操作*/
public:
bool operator==(const OverObject &myOverObject) const;
public:
OverObject(void);
OverObject(const OverObject &myOverObject);
OverObject(SOCKET mySock, int myLen);
~OverObject(void);
};
/*定义指向重叠对象的指针类型*/
typedef OverObject * PtrOverObject;
OverObject::~OverObject(void)
{
delete m_buf;
}
/********************************************************************************
* 函数介绍:本函数是OverObject类的复制构造函数。
*********************************************************************************/
OverObject::OverObject(const OverObject &myOverObject)
{
m_sock = myOverObject.m_sock;
m_overlapped = myOverObject.m_overlapped;
m_buf = new char[myOverObject.m_len];
strcpy(m_buf,myOverObject.m_buf);
m_len = myOverObject.m_len;
m_operation = myOverObject.m_operation;
}
/********************************************************************************
* 函数介绍:本函数是OverObject类带参数的构造函数。
*********************************************************************************/
OverObject::OverObject(SOCKET mySock, int myLen)
{
m_sock = mySock;
m_buf = new char[myLen];
m_len = myLen;
m_overlapped.hEvent = ::WSACreateEvent();
}
/********************************************************************************
* 函数介绍:本函数是OverObject类的==运算符重载函数
*********************************************************************************/
bool OverObject::operator==(const OverObject &myOverObject) const
{
if(this->m_sock == myOverObject.m_sock && this->m_operation == myOverObject.m_operation && this->m_len == myOverObject.m_len && !strcmp(this->m_buf, myOverObject.m_buf))
{
cout << "the two overObject is eque !" << endl;
return true;
}
else
{
return false;
}
}
/******************************************************************************
*类名称:Overlapped
*功能:记录系统中需要维护的所有重叠I/O
*****************************************************************************/
#define OP_ACCEPT 1
#define OP_READ 2
#define OP_WRITE 3
class Overlapped
{
public:
list<OverObject> overObjects; /*需要维护的所有重叠I/O*/
vector<HANDLE> eventArray; /*所有重叠I/O所对应的事件组成的数组,作为等待函数的参数*/
vector<SockObject> sockArray; /*需要维护的所有套接字,当操作为零时关闭套接字*/
public:
list<OverObject>::iterator GetOverObject(SOCKET mySock, int myLen);
void FreeOverObject(list<OverObject>::iterator myPtrOverObject);
list<OverObject>::iterator Overlapped::FindOverObject(HANDLE myEvent);
void RebuildEventArray();
void CreateAcceptEvent();
void SetAcceptEvent();
void ResetAcceptEvent();
bool IsAcceptEvent(int index);
bool PostRecv(list<OverObject>::iterator myPtrOverObject);
bool PostSend(list<OverObject>::iterator myPtrOverObject);
bool PostAccept(list<OverObject>::iterator myPtrOverObject);
Overlapped(void);
~Overlapped(void);
void InitSocket();
};
/********************************************************************************
* 函数介绍:创建重叠对象的类对象,并插入重叠对象链表中。
*********************************************************************************/
list<OverObject>::iterator Overlapped::GetOverObject(SOCKET mySock, int myLen)
{
OverObject localOverObject(mySock, myLen);
overObjects.push_back(localOverObject);
eventArray.push_back(localOverObject.m_overlapped.hEvent);
list<OverObject>::iterator ret = overObjects.end();
ret--;
return ret;
}
/********************************************************************************
* 函数介绍:释放重叠对象链表中指定的重叠对象。
*********************************************************************************/
void Overlapped::FreeOverObject(list<OverObject>::iterator myPtrOverObject)
{
overObjects.erase(myPtrOverObject);
}
/********************************************************************************
* 函数介绍:从重叠对象列表中查找指定事件所对应的重叠对象。
*********************************************************************************/
list<OverObject>::iterator Overlapped::FindOverObject(HANDLE myEvent)
{
list<OverObject>::iterator localIerator;
for(localIerator = overObjects.begin(); localIerator != overObjects.end(); localIerator++)
{
if(localIerator->m_overlapped.hEvent == myEvent)
{
break;
}
}
return localIerator;
}
/********************************************************************************
* 函数介绍:遍历重叠对象列表,重建重叠对象列表所对应的事件数组。
*********************************************************************************/
void Overlapped::RebuildEventArray()
{
eventArray.clear();
list<OverObject>::iterator overObjIterator;
overObjIterator = overObjects.begin();
for(overObjIterator; overObjIterator != overObjects.end(); ++overObjIterator)
{
eventArray.push_back(overObjIterator->m_overlapped.hEvent);
}
}
/********************************************************************************
* 函数介绍:投放接受操作,即将指定套接字的Recv操作与重叠I/O对象关联起来。
*********************************************************************************/
bool Overlapped::PostRecv(list<OverObject>::iterator myPtrOverObject)
{
myPtrOverObject->m_operation = OP_READ;
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = myPtrOverObject->m_buf;
buf.len = myPtrOverObject->m_len;
memset(buf.buf, 0, buf.len);
if(::WSARecv(myPtrOverObject->m_sock, &buf, 1, &dwBytes, &dwFlags, &myPtrOverObject->m_overlapped,NULL)!=NO_ERROR)
{
if(::WSAGetLastError() != WSA_IO_PENDING)
{
return false;
}
}
return true;
}
/********************************************************************************
* 函数介绍:投放发送操作,即将指定套接字的Send操作与重叠I/O对象关联起来。
*********************************************************************************/
bool Overlapped::PostSend(list<OverObject>::iterator myPtrOverObject)
{
myPtrOverObject->m_operation = OP_WRITE;
DWORD dwBytes;
DWORD dwFlags = 0;
WSABUF buf;
buf.buf = myPtrOverObject->m_buf;
buf.len = myPtrOverObject->m_len;
if(::WSASend(myPtrOverObject->m_sock, &buf, 1, &dwBytes, dwFlags, &myPtrOverObject->m_overlapped,NULL)!=NO_ERROR)
{
if(::WSAGetLastError() != WSA_IO_PENDING)
{
return false;
}
}
return true;
}
/********************************************************************************
* 函数介绍:创建accept函数完成事件,用于处理accepe后等待函数的等待事件句柄数组发
生变化,若此时无事件触发,等待事件句柄数组仍以原来的事件句柄数组为依
据等待。
*********************************************************************************/
void Overlapped::CreateAcceptEvent()
{
//标志套接字,用于触发accept完成事件。
SOCKET myClient = ::WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
list<OverObject>::iterator acceptIterator = GetOverObject(myClient, 512);
RebuildEventArray();
}
/********************************************************************************
* 函数介绍:当accept函数完成时,重置accept所对应的事件,从而使得等待函数能够在改
变后的事件句柄数组上等待。
*********************************************************************************/
void Overlapped::SetAcceptEvent()
{
::SetEvent(eventArray.front());
}
void Overlapped::ResetAcceptEvent()
{
::ResetEvent(eventArray.front());
}
bool Overlapped::IsAcceptEvent(int index)
{
if(index == 0)
{
return true;
}
else
{
return false;
}
}
/*主程序*/
#include "Overlapped.h"
#include <windows.h>
#include <process.h>
bool OperateFunction(PtrOverObject myPtrOverObject);
UINT WINAPI ServerThread(PVOID pvParam);
Overlapped myOverlapped;
int main()
{
WSADATA wsaData;
if(WSAStartup(MAKEWORD(2,2),&wsaData)!=0)
{
cout<<"failed to load winsock !"<<endl;
exit(0);
}
SOCKET mylisten, myClient;
mylisten = ::WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
struct sockaddr_in localAddr, clientAddr;
localAddr.sin_family = AF_INET;
localAddr.sin_port = ntohs(5500);
localAddr.sin_addr.S_un.S_addr = inet_addr("59.73.161.221");
bind(mylisten, (sockaddr*)&localAddr, sizeof(localAddr));
listen(mylisten, 5);
int sizeAddr = sizeof(clientAddr);
cout << "server is listening......" << endl;
myOverlapped.CreateAcceptEvent();
int x;
_beginthreadex(NULL, 0, ServerThread, &x, 0, NULL);
/*循环接收客户端的连接,创建与客户端通信的重叠IO对象,并重建事件句柄数组*/
while(true)
{
myClient = accept(mylisten, (struct sockaddr*)&clientAddr, &sizeAddr);
if(myClient==INVALID_SOCKET)
{
cout<<"accept is failed !"<<endl;
return -1;
}
list<OverObject>::iterator localIterator = myOverlapped.GetOverObject(myClient, 512);
myOverlapped.PostRecv(localIterator);
myOverlapped.RebuildEventArray();
myOverlapped.SetAcceptEvent();
}
char ch;
cin >> ch;
return 0;
}
/********************************************************************************
* 函数介绍:数据处理函数,按照不同操作类型,处理数据的发送或接受。
*********************************************************************************/
bool OperateFunction(list<OverObject>::iterator myOverObjectIterator)
{
DWORD dwTrans;
DWORD dwFlags;
BOOL ret = ::WSAGetOverlappedResult(myOverObjectIterator->m_sock, &(myOverObjectIterator->m_overlapped), &dwTrans, false, &dwFlags);
if(!ret)
{
if(myOverObjectIterator->m_sock != INVALID_SOCKET)
{
closesocket(myOverObjectIterator->m_sock);
}
cout << "socket error : " << ::GetLastError() << endl;
myOverlapped.FreeOverObject(myOverObjectIterator);
myOverlapped.RebuildEventArray();
return false;
}
switch(myOverObjectIterator->m_operation)
{
case OP_READ: /*接收数据完成*/
if(dwTrans > 0)
{
cout << myOverObjectIterator->m_buf << endl;
list<OverObject>::iterator localIterator = myOverlapped.GetOverObject(myOverObjectIterator->m_sock, 512);
localIterator->m_len = myOverObjectIterator->m_len;
strcpy(localIterator->m_buf, myOverObjectIterator->m_buf);
myOverlapped.PostSend(localIterator);
myOverlapped.RebuildEventArray();
return true;
}
else
{
closesocket(myOverObjectIterator->m_sock);
myOverlapped.FreeOverObject(myOverObjectIterator);
myOverlapped.RebuildEventArray();
cout << "the client socket is close !" << endl;
return false;
}
break;
case OP_WRITE: /*发送数据完成*/
if(dwTrans > 0)
{
return true;
}
else
{
closesocket(myOverObjectIterator->m_sock);
myOverlapped.FreeOverObject(myOverObjectIterator);
myOverlapped.RebuildEventArray();
return false;
}
break;
}
}
/********************************************************************************
* 函数介绍:服务线程函数,平常处于等待状态,完成数据处理。
*********************************************************************************/
UINT WINAPI ServerThread(PVOID pvParam)
{
while(true)
{
int index;
DWORD eventNum = (DWORD)(myOverlapped.eventArray.size());
index = ::WSAWaitForMultipleEvents(eventNum, &(myOverlapped.eventArray[0]), false, WSA_INFINITE, false);
if(index == WSA_WAIT_FAILED)
{
cout << "wait error is : " << ::GetLastError() << endl;
break;
}
else
{
index = index - WSA_WAIT_EVENT_0;
if(myOverlapped.IsAcceptEvent(index))
{
myOverlapped.ResetAcceptEvent();
continue;
}
list<OverObject>::iterator nowIterator = (myOverlapped.FindOverObject(myOverlapped.eventArray[index]));
if(nowIterator != NULL)
{
bool ret = OperateFunction(nowIterator);
if(ret)
{
::WSAResetEvent(myOverlapped.eventArray[index]);
myOverlapped.PostRecv(nowIterator);
}
}
}
}
return 0;
}