首页 > 代码库 > epoll简介 与 UDP server的实现

epoll简介 与 UDP server的实现

Abstract
epoll是Linux内核为处理大批量句柄而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,
它能显著减少程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。


简介:
epoll是Linux下多路复用IO接口select/poll的增强版本,
它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,
因为:
它会复用文件描述符集合来传递结果,
而不用迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,
另一点原因:
就是获取事件的时候,它无须遍历整个被侦听的描述符集,
只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。


epoll除了提供select/poll那种IO事件的
电平触发(Level Triggered, LT)外,
还提供了边沿触发(Edge Triggered, ET),
这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。


优点:
支持一个进程打开大数目的socket描述符
select 最不能忍受的是一个进程所打开的FD是有一定限制的,由FD_SETSIZE设置,默认值是1024。
对于那些需要支持的上万连接数目的IM服务器来说显然太少了。
这时候你一是可以选择修改这个宏然后重新编译内核,不过资料也同时指出这样会带来网络效率的下降,
二是可以选择多进程的解决方案(传统的Apache方案),不过虽然linux上面创建进程的代价比较小,
但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。


epoll则没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,
举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,
一般来说这个数目和系统内存关系很大。


IO效率不随FD数目增加而线性下降
传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,
不过由于网络延时,任一时间只有部分的socket是“活跃”的,
但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。


而epoll不存在这个问题,
它只会对“活跃”的socket进行操作---这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。
那么,只有“活跃”的socket才会主动的去调用 callback函数,其他idle状态socket则不会,
在这点上,epoll实现了一个“伪”AIO,因为这时候推动力在os内核。
在一些 benchmark中,如果所有的socket基本上都是活跃的---比如一个高速LAN环境,
epoll并不比select/poll有什么效率,相反,如果过多使用epoll_ctl,效率相比还有稍微的下降。
但是一旦使用idle connections模拟WAN环境,epoll的效率就远在select/poll之上了。


使用mmap加速内核与用户空间的消息传递
这点实际上涉及到epoll的具体实现了。无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,
如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的。
而如果你像我一样从2.5内核就关注epoll的话,一定不会忘记手工 mmap这一步的。


内核微调:
这一点其实不算epoll的优点了,而是整个linux平台的优点。也许你可以怀疑linux平台,
但是你无法回避linux平台赋予你微调内核的能力。
比如,内核TCP/IP协议栈使用内存池管理sk_buff结构,那么可以在运行时期动态调整这个内存pool
(skb_head_pool)的大小--- 通过echo XXXX>/proc/sys/net/core/hot_list_length完成。
再比如listen函数的第2个参数(TCP完成3次握手的数据包队列长度),也可以根据你平台内存大小动态调整。
更甚至在一个数据包面数目巨大但同时每个数据包本身大小却很小的特殊系统上尝试最新的NAPI网卡驱动架构。


使用
令人高兴的是,2.6内核的epoll比其2.5开发版本的/dev/epoll简洁了许多,
所以,大部分情况下,强大的东西往往是简单的。


唯一有点麻烦是epoll有2种工作方式:
  LT和ET。
LT (level-triggered)是缺省的工作方式,并且同时支持block和no-block socket.
    在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。
    如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。
    传统的select/poll都是这种模型的代表。
ET (edge-triggered)是高速工作方式,只支持no-block socket。
    在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。
    然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,
    直到你做了某些操作导致那个文件描述符不再为就绪状态了
    (比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。
    但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),
    内核不会发送更多的通知(only once),不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认。


ET和LT的区别:
    LT事件不会丢弃,而是只要读buffer里面有数据可以让用户读,则不断的通知你。
    ET则只在事件发生之时通知。
    可以简单理解为LT是水平触发,而ET则为边缘触发。
    LT模式只要有事件未处理就会触发,而ET则只在高低电平变换时(即状态从1到0或者0到1)触发.


系统调用
epoll相关的系统调用有:
  epoll_create,
  epoll_ctl,
  epoll_wait。
  Linux-2.6.19又引入了可以屏蔽指定信号的epoll_wait: epoll_pwait。


至此epoll家族已全。其中
  epoll_create用来创建一个epoll文件描述符,
  epoll_ctl用来添加/修改/删除需要侦听的文件描述符及其事件,
  epoll_wait/epoll_pwait接收发生在被侦听的描述符上的,用户感兴趣的IO事件。
  epoll文件描述符用完后,直接用close关闭即可,非常方便。


事实上,任何被侦听的文件符只要其被关闭,那么它也会自动从被侦听的文件描述符集合中删除,很是智能。
每次添加/修改/删除被侦听文件描述符都需要调用epoll_ctl,所以要尽量少地调用epoll_ctl,
防止其所引来的开销抵消其带来的好处。有的时候,应用中可能存在大量的短连接(比如说Web服务器),
epoll_ctl将被频繁地调用,可能成为这个系统的瓶颈。


A:IO效率。
在大家苦苦的为在线人数的增长而导致的系统资源吃紧上的问题正在发愁的时候,
Linux 2.6内核中提供的System Epoll为我们提供了一套完美的解决方案。
传统的select以及poll的效率会因为在线人数的线形递增而导致呈二次乃至三次方的下降,
这些直接导致了网络服务器可以支持的人数有了个比较明显的限制。
自从Linux提供了/dev/epoll的设备以及后来2.6内核中对/dev/epoll设备的访问的封装(System Epoll)之后,
这种现象得到了大大的缓解,如果说几个月前,大家还对epoll不熟悉,那么现在来说的话,
epoll的应用已经得到了大范围的普及。


那么究竟如何来使用epoll呢?其实非常简单。
通过在包含一个头文件#include <sys/epoll.h>以及几个简单的API将可以大大的提高你的网络服务器的支持人数。
  首先, 通过epoll_create(int maxfds)来创建一个epoll的句柄,
        其中maxfds为你epoll所支持的最大句柄数。
        这个函数会返回一个新的epoll句柄,之后的所有操作将通过这个句柄来进行操作。
        在用完之后,记得用close()来关闭这个创建出来的epoll句柄。
  之后, 在你的网络主循环里面,每一帧的调用
           epoll_wait(int epfd, epoll_event events, int max events, int timeout)
        来查询所有的网络接口,看哪一个可以读,哪一个可以写了。
        基本的语法为:
           nfds = epoll_wait(kdpfd, events, maxevents, -1);
           其中:
               kdpfd  为用epoll_create创建之后的句柄,
               events 是一个epoll_event*的指针,
                      当epoll_wait这个函数操作成功之后,epoll_events里面将储存所有的读写事件。
               max_events是当前需要监听的所有socket句柄数。
               timeout是epoll_wait的超时,
                      为0  , 表示马上返回,
                      为-1 , 表示一直等下去,直到有事件范围,为任意正整数的时候表示等这么长的时间,
                             如果一直没有事件,则返回。
                             一般如果网络主循环是单独的线程的话,可以用-1来等,这样可以保证一些效率,
                      如果是和主逻辑在同一个线程的话,则可以用0来保证主循环的效率。


epoll_wait范围之后应该是一个循环,遍历所有的事件:
for (n = 0; n < nfds; ++n) {
    if (events[n].data.fd == listener) {    // 如果是主socket的事件的话,则表示
                                            // 有新连接进入了,进行新连接的处理。
        client = accept(listener, (struct sockaddr *) &local, &addrlen);
        if (client < 0){
            perror("accept");
            continue;
        }


        setnonblocking(client);             // 将新连接置于非阻塞模式
        ev.events = EPOLLIN | EPOLLET;      // 并且将新连接也加入EPOLL的监听队列。
        // 注意,这里的参数EPOLLIN | EPOLLET并没有设置对写socket的监听,
        // 如果有写操作的话,这个时候epoll是不会返回事件的,如果要对写操作
        // 也监听的话,应该是EPOLLIN | EPOLLOUT | EPOLLET


        ev.data.fd = client;
        if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) {
        // 设置好event之后,将这个新的event通过epoll_ctl加入到epoll的监听队列里面,
        // 这里用EPOLL_CTL_ADD来加一个新的epoll事件,通过EPOLL_CTL_DEL来减少一个
        // epoll事件,通过EPOLL_CTL_MOD来改变一个事件的监听方式。
            fprintf(stderr, "epoll set insertion error: fd=%d0, client);
            return -1;
        }
    } else if (event[n].events & EPOLLIN) {  // 如果是已经连接的用户,并且收到数据,
                                             // 那么进行读入
        int sockfd_r;
        if ((sockfd_r = event[n].data.fd) < 0)
            continue;
        read(sockfd_r, buffer, MAXSIZE);


        // 修改sockfd_r上要处理的事件为EPOLLOUT
        ev.data.fd = sockfd_r;
        ev.events = EPOLLOUT | EPOLLET;
        epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_r, &ev)
    } else if (event[n].events & EPOLLOUT) { // 如果有数据发送
        int sockfd_w = events[n].data.fd;
        write(sockfd_w, buffer, sizeof(buffer));


        // 修改sockfd_w上要处理的事件为EPOLLIN
        ev.data.fd = sockfd_w;
        ev.events = EPOLLIN | EPOLLET;
        epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_r, &ev)
    }
    do_use_fd(events[n].data.fd);
  }


对,epoll的操作就这么简单,总共不过4个API:
   epoll_create, epoll_ctl, epoll_wait和close。

示例程序:
用epoll实现的多线程UDP server

 

#include <stdio.h> #include <stdlib.h> #include <errno.h> #include <string.h> #include <sys/types.h> #include <netinet/in.h> #include <sys/socket.h> #include <sys/wait.h> #include <unistd.h> #include <arpa/inet.h> //#include <openssl/ssl.h> //#include <openssl/err.h> #include <fcntl.h> #include <sys/epoll.h> #include <sys/time.h> #include <sys/resource.h> #include <pthread.h> #include <assert.h> //#include "oci_api.h" #define MAXBUF 1024 #define MAXEPOLLSIZE 100 /*  setnonblocking – 设置句柄为非阻塞方式  */ int setnonblocking(int sockfd) {   if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0)|O_NONBLOCK) == -1)   {     return -1;   }   return 0; } /*  pthread_handle_message – 线程处理 socket 上的消息收发  */ void* pthread_handle_message(int* sock_fd) {   char recvbuf[MAXBUF + 1];   char sendbuf[MAXBUF+1];   int  ret;   int  new_fd;   struct sockaddr_in client_addr;   socklen_t cli_len=sizeof(client_addr);   new_fd=*sock_fd;    /* 开始处理每个新连接上的数据收发 */   bzero(recvbuf, MAXBUF + 1);   bzero(sendbuf, MAXBUF + 1);   /* 接收客户端的消息 */   ret = recvfrom(new_fd, recvbuf, MAXBUF, 0, (struct sockaddr *)&client_addr, &cli_len);   if (ret > 0){     printf("socket %d recv from : %s : %d message: %s ,%d bytes/n",            new_fd, inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), recvbuf, ret);   /*    char *s1="insert";   char *s2="select";   char *s3="delete";   if(!strncmp(s1,recvbuf,6))     oci_insert(recvbuf,sendbuf);   else if(!strncmp(s2,recvbuf,6))     oci_select(recvbuf,sendbuf);   else if(!strncmp(s3,recvbuf,6))     oci_delete(recvbuf,sendbuf);   else     sprintf(sendbuf,"input sql is error!/n");    ret = sendto(new_fd, sendbuf, strlen(sendbuf), 0, (struct sockaddr *)&client_addr, cli_len);    if(ret<0)      printf("消息发送失败!错误代码是%d,错误信息是‘%s‘/n", errno, strerror(errno));    */   }   else   {     printf("received failed! error code %d,message : %s /n",       errno, strerror(errno));       }   /* 处理每个新连接上的数据收发结束 */    //printf("pthread exit!");   fflush(stdout);    pthread_exit(NULL); }   int main(int argc, char **argv) {   int listener, kdpfd, nfds, n, curfds;   socklen_t len;   struct sockaddr_in my_addr, their_addr;   unsigned int myport;   struct epoll_event ev;   struct epoll_event events[MAXEPOLLSIZE];   struct rlimit rt;   myport = 1234;    pthread_t thread;   pthread_attr_t attr;   /* 设置每个进程允许打开的最大文件数 */   rt.rlim_max = rt.rlim_cur = MAXEPOLLSIZE;  if (setrlimit(RLIMIT_NOFILE, &rt) == -1)    {     perror("setrlimit");     exit(1);   }   else    {     printf("setting success /n");   }   /* 开启 socket 监听 */   if ((listener = socket(PF_INET, SOCK_DGRAM, 0)) == -1)   {     perror("socket create failed !");     exit(1);   }   else   {     printf("socket create  success /n");   }   /*设置socket属性,端口可以重用*/   int opt=SO_REUSEADDR;   setsockopt(listener,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));   setnonblocking(listener);   bzero(&my_addr, sizeof(my_addr));   my_addr.sin_family = PF_INET;   my_addr.sin_port = htons(myport);   my_addr.sin_addr.s_addr = INADDR_ANY;   if (bind(listener, (struct sockaddr *) &my_addr, sizeof(struct sockaddr)) == -1)    {     perror("bind");     exit(1);   }    else   {     printf("IP and port bind success /n");   }     /* 创建 epoll 句柄,把监听 socket 加入到 epoll 集合里 */   kdpfd = epoll_create(MAXEPOLLSIZE);   len = sizeof(struct sockaddr_in);   ev.events = EPOLLIN | EPOLLET;   ev.data.fd = listener;   if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, &ev) < 0)    {     fprintf(stderr, "epoll set insertion error: fd=%d/n", listener);     return -1;   }   else   {     printf("listen socket added in  epoll success /n");   }   while (1)    {     /* 等待有事件发生 */     nfds = epoll_wait(kdpfd, events, 10000, -1);     if (nfds == -1)     {       perror("epoll_wait");       break;     }     /* 处理所有事件 */     for (n = 0; n < nfds; ++n)     {       if (events[n].data.fd == listener)        {         /*初始化属性值,均设为默认值*/         pthread_attr_init(&attr);         pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);         /*  设置线程为分离属性*/          pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);         if(pthread_create(&thread,&attr,(void*)pthread_handle_message,(void*)&(events[n].data.fd)))         {            perror("pthread_creat error!");            exit(-1);         }         }       }   }   close(listener);   return 0; } 

 

转载:http://blog.chinaunix.net/uid-26000296-id-3868466.html