首页 > 代码库 > C++ 异步 IO(三) 异步IO

C++ 异步 IO(三) 异步IO

The oldest solution that people still use for this problem is select(). The select() call takes three sets of fds (implemented as bit arrays): one for reading, one for writing, and one for "exceptions". It waits until a socket from one of the sets is ready and alters the sets to contain only the sockets ready for use.

 

SELECT 函数原型

int select(int fdsp1, fd_set *readfds, fd_set *writefds, fd_set *errorfds, const struct timeval *timeout);

struct fd_set可以理解为一个集合,这个集合中存放的是文件描述符(filedescriptor),即文件句柄,这可以是我们所说的普通意义的文件,当然Unix下任何设备、管道、FIFO等都是文件形式,全部包括在内,所以毫无疑问一个socket就是一个文件,socket句柄就是一个文件描述符。

fd_set*readfds是指向fd_set结构的指针,这个集合中应该包括文件描述符,我们是要监视这些文件描述符的读变化的,即我们关心是否可以从这些文件中读取数据了,如果这个集合中有一个文件可读,select就会返回一个大于0的值,表示有文件可读,如果没有可读的文件,则根据timeout参数再判断是否超时,若超出timeout的时间,select返回0,若发生错误返回负值。可以传入NULL值,表示不关心任何文件的读变化。
fd_set*writefds是指向fd_set结构的指针,这个集合中应该包括文件描述符,我们是要监视这些文件描述符的写变化的,即我们关心是否可以向这些文件中写入数据了,如果这个集合中有一个文件可写,select就会返回一个大于0的值,表示有文件可写,如果没有可写的文件,则根据timeout参数再判断是否超时,若超出timeout的时间,select返回0,若发生错误返回负值。可以传入NULL值,表示不关心任何文件的写变化。
fd_set *errorfds同上面两个参数的意图,用来监视文件错误异常。

各个参数含义如下:

  • int fdsp1:最大描述符值 + 1

  • fd_set *readfds:对可读感兴趣的描述符集

  • fd_set *writefds:对可写感兴趣的描述符集

  • fd_set *errorfds:对出错感兴趣的描述符集

select 在发生以下情况时返回:

  readfds集合中有描述符可读

  writefds集合中有描述符可写

  errorfds集合中有描述符遇到错误条件

  指定的超时时间timeout到了

当select返回时,描述符集合将被修改以指示哪些个描述符正处于可读、可写或有错误状态。可以用FD_ISSET宏对描述符进行测试以找到状态变化的描述符。如果select因为超时而返回的话,所有的描述符集合都将被清空。

设置描述符集合通常用如下几个宏定义:

FD_ZERO(fd_set *fdset);                /* clear all bits in fdset           */
FD_SET(int fd, fd_set *fdset);         /* turn on the bit for fd in fd_set  */
FD_CLR(int fd, fd_set *fdset);         /* turn off the bit for fd in fd_set */
int FD_ISSET(int fd, fd_set *fdset);   /* is the bit for fd on in fdset?    */

如:

fd_set rset;
FD_ZERO(&rset);                        /* initialize the set: all bits off  */
FD_SET(1&rset);                      /* turn on bit for fd 1              */
FD_SET(4&rset);                      /* turn on bit for fd 4              */

当select返回的时候,rset位都将被置0,除了那些有变化的fd位。

注意:
select默认能处理的描述符数量是有上限的,为FD_SETSIZE的大小。
对于timeout参数,如果置为NULL,则表示wait forever;若timeout->tv_sec = timeout->tv_usec = 0,则表示do not wait at all;否则指定等待时间。
如果使用select处理多个套接字,那么需要使用一个数组(也可以是其他结构)来记录各个描述符的状态。而使用poll则不需要,下面看poll函数。

 

 

POLL 函数

int poll(struct pollfd *fdarray, unsigned long nfds, int timeout);

各参数含义如下:

    • struct pollfd *fdarray:一个结构体,用来保存各个描述符的相关状态。
    • unsigned long nfds:fdarray数组的大小,即里面包含有效成员的数量。
    • int timeout:设定的超时时间。(以毫秒为单位)

 

poll函数返回值及含义如下:

    • -1:有错误产生
    • 0:超时时间到,而且没有描述符有状态变化
    • >0:有状态变化的描述符个数
着重讲fdarray数组,因为这是它和select()函数主要的不同的地方:
pollfd的结构如下:

1 struct pollfd {
2    int fd;                  /* descriptor to check */
3    short events;      /* events of interest on fd */
4    short revents;     /* events that occured on fd */
5 };

其实poll()和select()函数要处理的问题是相同的,只不过是不同组织在几乎相同时刻同时推出的,因此才同时保留了下来。select()函数把可读描述符、可写描述符、错误描述符分在了三个集合里,这三个集合都是用bit位来标记一个描述符,一旦有若干个描述符状态发生变化,那么它将被置位,而其他没有发生变化的描述符的bit位将被clear,也就是说select()的readset、writeset、errorset是一个value-result类型,通过它们传值,而也通过它们返回结果。这样的一个坏处是每次重新select 的时候对集合必须重新赋值。而poll()函数则与select()采用的方式不同,它通过一个结构数组保存各个描述符的状态,每个结构体第一项fd代表描述符,第二项代表要监听的事件,也就是感兴趣的事件,而第三项代表poll()返回时描述符的返回状态。合法状态如下:

 

POLLIN:                有普通数据或者优先数据可读

POLLRDNORM:    有普通数据可读

POLLRDBAND:    有优先数据可读

POLLPRI:              有紧急数据可读

POLLOUT:            有普通数据可写

POLLWRNORM:   有普通数据可写

POLLWRBAND:    有紧急数据可写

POLLERR:            有错误发生

POLLHUP:            有描述符挂起事件发生

POLLNVAL:          描述符非法

 

select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

 

EPOLL 函数

epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

 

一个SELECT的例子

 

int main() 
{ 
    int sock; 
    FILE *fp; 
    struct fd_set fds; 
    struct timeval timeout={3,0}; //select等待3秒,3秒轮询,要非阻塞就置0 

    char buffer[256]={0}; //256字节的接收缓冲区 
    /* 假定已经建立UDP连接,具体过程不写,简单,当然TCP也同理,主机ip和port都已经给定,要写的文件已经打开 
    sock=socket(...); 
    bind(...); 
    fp=fopen(...); */ 
    while(1) 
   { 
        FD_ZERO(&fds); //每次循环都要清空集合,否则不能检测描述符变化
        FD_SET(sock,&fds); //添加描述符 
        FD_SET(fp,&fds); //同上
        maxfdp=sock>fp?sock+1:fp+1;    //描述符最大值加1
        switch(select(maxfdp,&fds,&fds,NULL,&timeout))   //select使用 
        { 
            case -1: exit(-1);break; //select错误,退出程序 
            case 0:break; //再次轮询
            default: 
                  if(FD_ISSET(sock,&fds)) //测试sock是否可读,即是否网络上有数据
                  { 
                        recvfrom(sock,buffer,256,.....);//接受网络数据 
                        if(FD_ISSET(fp,&fds)) //测试文件是否可写 
                            fwrite(fp,buffer...);//写入文件 
                         buffer清空; 
                   }// end if break; 
          }// end switch 
     }//end while 
}//end main

 

POLL 函数一个例子:

#define NORMAL_DATA 1
#define HIPRI_DATA 2
  
int poll_two_normal(int fd1,int fd2)  {
       struct pollfd poll_list[2];
       int retval;
  
       poll_list[0].fd = fd1;
       poll_list[1].fd = fd2;
       poll_list[0].events = POLLIN|POLLPRI;
       poll_list[1].events = POLLIN|POLLPRI;
  
       while(1)
       {
           retval = poll(poll_list,(unsigned long)2,-1);
           /* retval 总是大于0或为-1,因为我们在阻塞中工作 */
  
           if(retval < 0)
           {
               fprintf(stderr,"poll错误: %s/n",strerror(errno));
               return -1;
           }
    
           if(((poll_list[0].revents&POLLHUP) == POLLHUP) ||
              ((poll_list[0].revents&POLLERR) == POLLERR) ||
              ((poll_list[0].revents&POLLNVAL) == POLLNVAL) ||
              ((poll_list[1].revents&POLLHUP) == POLLHUP) ||
              ((poll_list[1].revents&POLLERR) == POLLERR) ||
              ((poll_list[1].revents&POLLNVAL) == POLLNVAL))
             return 0;
  
           if((poll_list[0].revents&POLLIN) == POLLIN)
             handle(poll_list[0].fd,NORMAL_DATA);
           if((poll_list[0].revents&POLLPRI) == POLLPRI)
             handle(poll_list[0].fd,HIPRI_DATA);
           if((poll_list[1].revents&POLLIN) == POLLIN)
             handle(poll_list[1].fd,NORMAL_DATA);
           if((poll_list[1].revents&POLLPRI) == POLLPRI)
             handle(poll_list[1].fd,HIPRI_DATA);
       }
   }

 

EPOLL 函数例子:

#include <sys/epoll.h>

       int epoll_create(int size)

       int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

       int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)

 

            typedef union epoll_data {
                 void *ptr;
                 int fd;
                 __uint32_t u32;
                 __uint64_t u64;
            } epoll_data_t;

            struct epoll_event {
                 __uint32_t events;  /* Epoll events */
                 epoll_data_t data;  /* User data variable */
            };

epoll返回时已经明确的知道哪个sokcet fd发生了事件,不用再一个个比对。这样就提高了效率。

select的FD_SETSIZE是有限止的,而epoll是没有限止的只与系统资源有关。

1、epoll_create函数 
函数声明:int epoll_create(int size) 
该 函数生成一个epoll专用的文件描述符。它其实是在内核申请一空间,用来存放你想关注的socket。 fd上是否发生以及发生了什么事件。size就是你在这个epoll fd上能关注的最大socket fd数。随你定好了。只要你有空间。

2、epoll_ctl函数 
函数声明:int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) 
该函数用于控制某个epoll文件描述符上的事件,可以注册事件,修改事件,删除事件。 
参数: 
epfd:由 epoll_create 生成的epoll专用的文件描述符; 
op:要进行的操作例如注册事件,可能的取值EPOLL_CTL_ADD 注册、EPOLL_CTL_MOD 修 改、EPOLL_CTL_DEL 删除

fd:关联的文件描述符; 
event:指向epoll_event的指针; 
如果调用成功返回0,不成功返回-1

用到的数据结构 
typedef union epoll_data { 
void *ptr; 
int fd; 
__uint32_t u32; 
__uint64_t u64; 
} epoll_data_t;
struct epoll_event { 
__uint32_t events; /* Epoll events */ 
epoll_data_t data; /* User data variable */ 
};

如: 
struct epoll_event ev; 
//设置与要处理的事件相关的文件描述符 
ev.data.fd=listenfd; 
//设置要处理的事件类型 
ev.events=EPOLLIN|EPOLLET; 
//注册epoll事件 
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);

3、epoll_wait函数 
函数声明:int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout) 
该函数用于轮询I/O事件的发生; 
参数: 
epfd:由epoll_create 生成的epoll专用的文件描述符; 
epoll_event:用于回传代处理事件的数组; 
maxevents:每次能处理的事件数; 
timeout:等待I/O事件发生的超时值(单位我也不太清楚);-1相当于阻塞,0相当于非阻塞。一般用-1即可 
返回发生事件数。 


用法如下: 

/*build the epoll enent for recall */ 
struct epoll_event ev_read[20]; 
int nfds = 0; //return the events count 
nfds=epoll_wait(epoll_fd,ev_read,20, -1); 
for(i=0; i 
{ 
if(ev_read[i].data.fd == sock)// the listener port hava data 
...

epoll_wait运行的原理是 
等侍注册在epfd上的socket fd的事件的发生,如果发生则将发生的sokct fd和事件类型放入到events数组中。 
并 且将注册在epfd上的socket fd的事件类型给清空,所以如果下一个循环你还要关注这个socket fd的话,则需要用epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev)来重新设置socket fd的事件类型。这时不用EPOLL_CTL_ADD,因为socket fd并未清空,只是事件类型清空。这一步非常重要。 
单个epoll并不能解决所有问题,特别是你的每个操作都比较费时的时候,因为epoll是串行处理的。 
所以你还是有必要建立线程池来发挥更大的效能。 

 

while (TRUE) 
{ 
    int nfds = epoll_wait (m_epoll_fd, m_events, MAX_EVENTS, EPOLL_TIME_OUT)//等待EPOLL事件的发生,相当于监听,至于相关的端口,需要在初始化EPOLL的时候绑定。 

    if (nfds <= 0) 
    continue; 
    m_bOnTimeChecking = FALSE; 
    G_CurTime = time(NULL); 
    for (int i=0; i<nfds; i++) 
    { 
        try { 
    if (m_events[i].data.fd == m_listen_http_fd)//如果新监测到一个HTTP用户连接到绑定的HTTP端口,建立新的连接。由于我们新采用了SOCKET连接,所以基本没用。 
    { 
     OnAcceptHttpEpoll (); 
    } 
    else if (m_events[i].data.fd == m_listen_sock_fd)//如果新监测到一个SOCKET用户连接到了绑定的SOCKET端口,建立新的连接。 
    { 
     OnAcceptSockEpoll (); 
    } 
    else if (m_events[i].events & EPOLLIN)//如果是已经连接的用户,并且收到数据,那么进行读入。 
    { 
     OnReadEpoll (i); 
    } 

    OnWriteEpoll (i);//查看当前的活动连接是否有需要写出的数据。 
   } 
   catch (int) 
   { 
    PRINTF ("CATCH捕获错误\n"); 
    continue; 
   } 
} 
m_bOnTimeChecking = TRUE; 
OnTimer ();//进行一些定时的操作,主要就是删除一些断线用户等。 
} 

 

 

 Ref

[1] 详述socket编程之select()和poll()函数

[2] A tiny introduction to asynchronous IO

[3] select,poll,epoll用法