首页 > 代码库 > Linux下select, poll和epoll IO模型的详解(转)
Linux下select, poll和epoll IO模型的详解(转)
http://blog.csdn.net/tianmohust/article/details/6677985
一).Epoll 介绍
Epoll 可是当前在 Linux 下开发大规模并发网络程序的热门人选, Epoll 在 Linux2.6 内核中正式引入,和 select 相似,其实都 I/O 多路复用技术而已 ,并没有什么神秘的。其实在 Linux 下设计并发网络程序,向来不缺少方法,比如典型的 Apache 模型( Process Per Connection ,简称 PPC ), TPC ( Thread Per Connection )模型,以及 select 模型和 poll 模型,那为何还要再引入 Epoll 这个东东呢?那还是有得说说的 …
二). 常用模型的缺点
如果不摆出来其他模型的缺点,怎么能对比出 Epoll 的优点呢。
① PPC/TPC 模型
这两种模型思想类似,就是让每一个到来的连接一边自己做事去,别再来烦我 。只是 PPC 是为它开了一个进程,而 TPC 开了一个线程。可是别烦我是有代价的,它要时间和空间啊,连接多了之后,那么多的进程 / 线程切换,这开销就上来了;因此这类模型能接受的最大连接数都不会高,一般在几百个左右。
② select 模型
1. 最大并发数限制,因为一个进程所打开的 FD (文件描述符)是有限制的,由 FD_SETSIZE 设置,默认值是 1024/2048 ,因此 Select 模型的最大并发数就被相应限制了。自己改改这个 FD_SETSIZE ?想法虽好,可是先看看下面吧 …
2. 效率问题, select 每次调用都会线性扫描全部的 FD 集合,这样效率就会呈现线性下降,把 FD_SETSIZE 改大的后果就是,大家都慢慢来,什么?都超时了。
3. 内核 / 用户空间 内存拷贝问题,如何让内核把 FD 消息通知给用户空间呢?在这个问题上 select 采取了内存拷贝方法。
总结为:1.连接数受限 2.查找配对速度慢 3.数据由内核拷贝到用户态
③ poll 模型
基本上效率和 select 是相同的, select 缺点的 2 和 3 它都没有改掉。
三). Epoll 的提升
把其他模型逐个批判了一下,再来看看 Epoll 的改进之处吧,其实把 select 的缺点反过来那就是 Epoll 的优点了。
①. Epoll 没有最大并发连接的限制,上限是最大可以打开文件的数目,这个数字一般远大于 2048, 一般来说这个数目和系统内存关系很大 ,具体数目可以 cat /proc/sys/fs/file-max 察看。
②. 效率提升, Epoll 最大的优点就在于它只管你“活跃”的连接 ,而跟连接总数无关,因此在实际的网络环境中, Epoll 的效率就会远远高于 select 和 poll 。
③. 内存拷贝, Epoll 在这点上使用了“共享内存 ”,这个内存拷贝也省略了。
四). Epoll 为什么高效
Epoll 的高效和其数据结构的设计是密不可分的,这个下面就会提到。
首先回忆一下 select 模型,当有 I/O 事件到来时, select 通知应用程序有事件到了快去处理,而应用程序必须轮询所有的 FD 集合,测试每个 FD 是否有事件发生,并处理事件;代码像下面这样:
- int res = select(maxfd+1, &readfds, NULL, NULL, 120);
- if (res > 0)
- {
- for (int i = 0; i < MAX_CONNECTION; i++)
- {
- if (FD_ISSET(allConnection[i], &readfds))
- {
- handleEvent(allConnection[i]);
- }
- }
- }
- // if(res == 0) handle timeout, res < 0 handle error
- int res = epoll_wait(epfd, events, 20, 120);
- for (int i = 0; i < res;i++)
- {
- handleEvent(events[n]);
- }
前面提到 Epoll 速度快和其数据结构密不可分,其关键数据结构就是:
- struct epoll_event {
- __uint32_t events; // Epoll events
- epoll_data_t data; // User data variable
- };
- typedef union epoll_data {
- void *ptr;
- int fd;
- __uint32_t u32;
- __uint64_t u64;
- } epoll_data_t;
六). 使用 Epoll
既然 Epoll 相比 select 这么好,那么用起来如何呢?会不会很繁琐啊 … 先看看下面的三个函数吧,就知道 Epoll 的易用了。
int epoll_create(int size);生成一个 Epoll 专用的文件描述符,其实是申请一个内核空间,用来存放你想关注的 socket fd 上是否发生以及发生了什么事件。 size 就是你在这个 Epoll fd 上能关注的最大 socket fd 数,大小自定,只要内存足够。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event );
控制某个 Epoll 文件描述符上的事件:注册、修改、删除。其中参数 epfd 是 epoll_create() 创建 Epoll 专用的文件描述符。相对于 select 模型中的 FD_SET 和 FD_CLR 宏。
int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout);
等待 I/O 事件的发生;参数说明:
epfd: 由 epoll_create() 生成的 Epoll 专用的文件描述符;
epoll_event: 用于回传代处理事件的数组;
maxevents: 每次能处理的事件数;
timeout: 等待 I/O 事件发生的超时值;
返回发生事件数。
相对于 select 模型中的 select 函数。
七). 例子程序
下面是一个简单 Echo Server 的例子程序,麻雀虽小,五脏俱全,还包含了一个简单的超时检查机制,简洁起见没有做错误处理。
- //
- // a simple echo server using epoll in linux
- //
- // 2009-11-05
- // by sparkling
- //
- #include <sys/socket.h>
- #include <sys/epoll.h>
- #include <netinet/in.h>
- #include <arpa/inet.h>
- #include <fcntl.h>
- #include <unistd.h>
- #include <stdio.h>
- #include <errno.h>
- #include <iostream>
- using namespace std;
- #define MAX_EVENTS 500
- struct myevent_s
- {
- int fd;
- void (*call_back)(int fd, int events, void *arg);
- int events;
- void *arg;
- int status; // 1: in epoll wait list, 0 not in
- char buff[128]; // recv data buffer
- int len;
- long last_active; // last active time
- };
- // set event
- void EventSet(myevent_s *ev, int fd, void (*call_back)(int, int, void*), void *arg)
- {
- ev->fd = fd;
- ev->call_back = call_back;
- ev->events = 0;
- ev->arg = arg;
- ev->status = 0;
- ev->last_active = time(NULL);
- }
- // add/mod an event to epoll
- void EventAdd(int epollFd, int events, myevent_s *ev)
- {
- struct epoll_event epv = {0, {0}};
- int op;
- epv.data.ptr = ev;
- epv.events = ev->events = events;
- if(ev->status == 1){
- op = EPOLL_CTL_MOD;
- }
- else{
- op = EPOLL_CTL_ADD;
- ev->status = 1;
- }
- if(epoll_ctl(epollFd, op, ev->fd, &epv) < 0)
- printf("Event Add failed[fd=%d]/n", ev->fd);
- else
- printf("Event Add OK[fd=%d]/n", ev->fd);
- }
- // delete an event from epoll
- void EventDel(int epollFd, myevent_s *ev)
- {
- struct epoll_event epv = {0, {0}};
- if(ev->status != 1) return;
- epv.data.ptr = ev;
- ev->status = 0;
- epoll_ctl(epollFd, EPOLL_CTL_DEL, ev->fd, &epv);
- }
- int g_epollFd;
- myevent_s g_Events[MAX_EVENTS+1]; // g_Events[MAX_EVENTS] is used by listen fd
- void RecvData(int fd, int events, void *arg);
- void SendData(int fd, int events, void *arg);
- // accept new connections from clients
- void AcceptConn(int fd, int events, void *arg)
- {
- struct sockaddr_in sin;
- socklen_t len = sizeof(struct sockaddr_in);
- int nfd, i;
- // accept
- if((nfd = accept(fd, (struct sockaddr*)&sin, &len)) == -1)
- {
- if(errno != EAGAIN && errno != EINTR)
- {
- printf("%s: bad accept", __func__);
- }
- return;
- }
- do
- {
- for(i = 0; i < MAX_EVENTS; i++)
- {
- if(g_Events[i].status == 0)
- {
- break;
- }
- }
- if(i == MAX_EVENTS)
- {
- printf("%s:max connection limit[%d].", __func__, MAX_EVENTS);
- break;
- }
- // set nonblocking
- if(fcntl(nfd, F_SETFL, O_NONBLOCK) < 0) break;
- // add a read event for receive data
- EventSet(&g_Events[i], nfd, RecvData, &g_Events[i]);
- EventAdd(g_epollFd, EPOLLIN|EPOLLET, &g_Events[i]);
- printf("new conn[%s:%d][time:%d]/n", inet_ntoa(sin.sin_addr), ntohs(sin.sin_port), g_Events[i].last_active);
- }while(0);
- }
- // receive data
- void RecvData(int fd, int events, void *arg)
- {
- struct myevent_s *ev = (struct myevent_s*)arg;
- int len;
- // receive data
- len = recv(fd, ev->buff, sizeof(ev->buff)-1, 0);
- EventDel(g_epollFd, ev);
- if(len > 0)
- {
- ev->len = len;
- ev->buff[len] = ‘/0‘;
- printf("C[%d]:%s/n", fd, ev->buff);
- // change to send event
- EventSet(ev, fd, SendData, ev);
- EventAdd(g_epollFd, EPOLLOUT|EPOLLET, ev);
- }
- else if(len == 0)
- {
- close(ev->fd);
- printf("[fd=%d] closed gracefully./n", fd);
- }
- else
- {
- close(ev->fd);
- printf("recv[fd=%d] error[%d]:%s/n", fd, errno, strerror(errno));
- }
- }
- // send data
- void SendData(int fd, int events, void *arg)
- {
- struct myevent_s *ev = (struct myevent_s*)arg;
- int len;
- // send data
- len = send(fd, ev->buff, ev->len, 0);
- ev->len = 0;
- EventDel(g_epollFd, ev);
- if(len > 0)
- {
- // change to receive event
- EventSet(ev, fd, RecvData, ev);
- EventAdd(g_epollFd, EPOLLIN|EPOLLET, ev);
- }
- else
- {
- close(ev->fd);
- printf("recv[fd=%d] error[%d]/n", fd, errno);
- }
- }
- void InitListenSocket(int epollFd, short port)
- {
- int listenFd = socket(AF_INET, SOCK_STREAM, 0);
- fcntl(listenFd, F_SETFL, O_NONBLOCK); // set non-blocking
- printf("server listen fd=%d/n", listenFd);
- EventSet(&g_Events[MAX_EVENTS], listenFd, AcceptConn, &g_Events[MAX_EVENTS]);
- // add listen socket
- EventAdd(epollFd, EPOLLIN|EPOLLET, &g_Events[MAX_EVENTS]);
- // bind & listen
- sockaddr_in sin;
- bzero(&sin, sizeof(sin));
- sin.sin_family = AF_INET;
- sin.sin_addr.s_addr = INADDR_ANY;
- sin.sin_port = htons(port);
- bind(listenFd, (const sockaddr*)&sin, sizeof(sin));
- listen(listenFd, 5);
- }
- int main(int argc, char **argv)
- {
- short port = 12345; // default port
- if(argc == 2){
- port = atoi(argv[1]);
- }
- // create epoll
- g_epollFd = epoll_create(MAX_EVENTS);
- if(g_epollFd <= 0) printf("create epoll failed.%d/n", g_epollFd);
- // create & bind listen socket, and add to epoll, set non-blocking
- InitListenSocket(g_epollFd, port);
- // event loop
- struct epoll_event events[MAX_EVENTS];
- printf("server running:port[%d]/n", port);
- int checkPos = 0;
- while(1){
- // a simple timeout check here, every time 100, better to use a mini-heap, and add timer event
- long now = time(NULL);
- for(int i = 0; i < 100; i++, checkPos++) // doesn‘t check listen fd
- {
- if(checkPos == MAX_EVENTS) checkPos = 0; // recycle
- if(g_Events[checkPos].status != 1) continue;
- long duration = now - g_Events[checkPos].last_active;
- if(duration >= 60) // 60s timeout
- {
- close(g_Events[checkPos].fd);
- printf("[fd=%d] timeout[%d--%d]./n", g_Events[checkPos].fd, g_Events[checkPos].last_active, now);
- EventDel(g_epollFd, &g_Events[checkPos]);
- }
- }
- // wait for events to happen
- int fds = epoll_wait(g_epollFd, events, MAX_EVENTS, 1000);
- if(fds < 0){
- printf("epoll_wait error, exit/n");
- break;
- }
- for(int i = 0; i < fds; i++){
- myevent_s *ev = (struct myevent_s*)events[i].data.ptr;
- if((events[i].events&EPOLLIN)&&(ev->events&EPOLLIN)) // read event
- {
- ev->call_back(ev->fd, events[i].events, ev->arg);
- }
- if((events[i].events&EPOLLOUT)&&(ev->events&EPOLLOUT)) // write event
- {
- ev->call_back(ev->fd, events[i].events, ev->arg);
- }
- }
- }
- // free resource
- return 0;
- }
3、epoll
3.1、poll(select)的限制
Poll函数起源于SVR3,最初局限于流设备,SVR4取消了这种限制。总是来说,poll比select要高效一些,但是,它有可移植性问题,例如,windows就只支持select。
一个poll的简单例子:
#include <unistd.h>
#include <sys/poll.h>
#define TIMEOUT 5 /* poll timeout, in seconds */
int main (void)
{
struct pollfd fds[2];
int ret;
/* watch stdin for input */
fds[0].fd = STDIN_FILENO;
fds[0].events = POLLIN;
/* watch stdout for ability to write (almost always true) */
fds[1].fd = STDOUT_FILENO;
fds[1].events = POLLOUT;
/* All set, block! */
ret = poll (fds, 2, TIMEOUT * 1000);
if (ret == -1) {
perror ("poll");
return 1;
}
if (!ret) {
printf ("%d seconds elapsed.\n", TIMEOUT);
return 0;
}
if (fds[0].revents & POLLIN)
printf ("stdin is readable\n");
if (fds[1].revents & POLLOUT)
printf ("stdout is writable\n");
return 0;
}
select模型与此类例。内核必须遍历所有监视的描述符,而应用程序也必须遍历所有描述符,检查哪些描述符已经准备好。当描述符成百上千时,会变得非常低效——这是select(poll)模型低效的根源所在。考虑这些情况,2.6以后的内核都引进了epoll模型。
3.2、核心数据结构与接口
Epoll模型由3个函数构成,epoll_create、epoll_ctl和epoll_wait。
3.2.1创建epoll实例(Creating a New Epoll Instance)
epoll环境通过epoll_create函数创建:
#include <sys/epoll.h>
int epoll_create (int size)
调用成功则返回与实例关联的文件描述符,该文件描述符与真实的文件没有任何关系,仅作为接下来调用的函数的句柄。size是给内核的一个提示,告诉内核将要监视的文件描述符的数量,它不是最大值;但是,传递合适的值能够提高系统性能。发生错误时,返回-1。
例子:
epfd = epoll_create (100); /* plan to watch ~100 fds */
if (epfd < 0)
perror ("epoll_create");
3.2.2、控制epoll(Controlling Epoll)
通过epoll_ctl,可以加入文件描述符到epoll环境或从epoll环境移除文件描述符。
int epoll_ctl (int epfd,
int op,
int fd,
struct epoll_event *event);
struct epoll_event {
_ _u32 events; /* events */
union {
void *ptr;
int fd;
_ _u32 u32;
_ _u64 u64;
} data;
};
epfd为epoll_create返回的描述符。op表示对描述符fd采取的操作,取值如下:
EPOLL_CTL_ADD
Add a monitor on the file associated with the file descriptor fd to the epoll instance associated with epfd, per the events defined in event.
EPOLL_CTL_DEL
Remove a monitor on the file associated with the file descriptor fd from the epollinstance associated with epfd.
EPOLL_CTL_MOD
Modify an existing monitor of fd with the updated events specified by event.
epoll_event结构中的events字段,表示对该文件描述符所关注的事件,它的取值如下:
EPOLLET
Enables edge-triggered behavior for the monitor of the file .The default behavior is level-
triggered.
EPOLLHUP
A hangup occurred on the file. This event is always monitored, even if it’s not specified.
EPOLLIN
The file is available to be read from without blocking.
EPOLLONESHOT
After an event is generated and read, the file is automatically no longer monitored.A new event mask must be specified via EPOLL_CTL_MOD to reenable the watch.
EPOLLOUT
The file is available to be written to without blocking.
EPOLLPRI
There is urgent out-of-band data available to read.
而epoll_event结构中的fd是epoll高效的根源所在,当描述符准备好。应用程序不用遍历所有描述符,而只用检查发生事件的描述符。
将一个描述符加入epoll环境:
int ret;
event.data.fd = fd; /* return the fd to us later */
event.events = EPOLLIN | EPOLLOUT;
ret = epoll_ctl (epfd, EPOLL_CTL_ADD, fd, &event);
if (ret)
perror ("epoll_ctl");
3.2.3、等待事件(Waiting for Events with Epoll)
#include <sys/epoll.h>
int epoll_wait (int epfd,
struct epoll_event *events,
int maxevents,
int timeout);
等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有 说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。
一个简单示例:
struct epoll_event *events;
int nr_events, i, epfd;
events = malloc (sizeof (struct epoll_event) * MAX_EVENTS);
if (!events) {
perror ("malloc");
return 1;
}
nr_events = epoll_wait (epfd, events, MAX_EVENTS, -1);
if (nr_events < 0) {
perror ("epoll_wait");
free (events);
return 1;
}
//只需要检查发生事件的文件描述符,而不需要遍历所有描述符
for (i = 0; i < nr_events; i++) {
printf ("event=%ld on fd=%d\n",
events[i].events,
events[i].data.fd);
/*
* We now can, per events[i].events, operate on
* events[i].data.fd without blocking.
*/
}
free (events);
3.2.4、epoll的典型用法
for(;;) {
nfds = epoll_wait(kdpfd, events, maxevents, -1);
for(n = 0; n < nfds; ++n) {
if(events[n].data.fd == listener) {
//新的连接
client = accept(listener, (struct sockaddr *) &local,
&addrlen);
if(client < 0){
perror("accept");
continue;
}
setnonblocking(client);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = client;
// 设置好event之后,将这个新的event通过epoll_ctl加入到epoll的监听队列里面
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) {
fprintf(stderr, "epoll set insertion error: fd=%d0,
client);
return -1;
}
}
else
do_use_fd(events[n].data.fd);
}
}
3.3、综合示例
#include "echo.h"
#include <sys/epoll.h>
#include <fcntl.h>
#define EVENT_ARR_SIZE 20
#define EPOLL_SIZE 20
void setnonblocking(
int sockfd
);
int
main(int argc, char **argv)
{
int i, listenfd, connfd, sockfd, epfd;
ssize_t n;
char buf[MAXLINE];
socklen_t clilen;
struct sockaddr_in cliaddr, servaddr;
struct epoll_event ev, evs[EVENT_ARR_SIZE];
int nfds;
if((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
err_sys("create socket error!\n");
setnonblocking(listenfd);
epfd = epoll_create(EPOLL_SIZE);
ev.data.fd = listenfd;
ev.events = EPOLLIN | EPOLLET;
if(epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev) < 0)
err_sys("epoll_ctl listenfd error!\n");
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
//servaddr.sin_addr.s_addr = INADDR_ANY;
servaddr.sin_addr.s_addr = inet_addr("211.67.28.128");
servaddr.sin_port = htons(SERV_PORT);
if(bind(listenfd, (struct sockaddr*) &servaddr, sizeof(servaddr)) < 0)
err_sys("bind error!\n");
if(listen(listenfd, LISTENQ) < 0)
err_sys("listen error!\n");
printf("server is listening....\n");
for ( ; ; ) {
if((nfds = epoll_wait(epfd, evs, EVENT_ARR_SIZE, -1)) < 0)
err_sys("epoll_wait error!\n");
for(i = 0; i < nfds; i++)
{
if(evs[i].data.fd == listenfd)
{
clilen = sizeof(cliaddr);
connfd = accept(listenfd, (struct sockaddr*) &cliaddr, &clilen);
if(connfd < 0)
continue;
setnonblocking(connfd);
ev.data.fd = connfd;
ev.events = EPOLLIN | EPOLLET;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev) < 0)
err_sys("epoll_ctl connfd error!\n");
}
else if(evs[i].events & EPOLLIN)
{
sockfd = evs[i].data.fd;
if (sockfd < 0)
continue;
if ( (n = read(sockfd, buf, MAXLINE)) == 0) {
epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, &ev);
close(sockfd);
evs[i].data.fd = -1;
}
else if(n < 0)
err_sys("read socket error!\n");
else
{
printf("write %d bytes\n", n);
write(sockfd, buf, n);
}
}
else
printf("other event!\n");
}
}
return 0;
}
void setnonblocking(
int sockfd
)
{
int flag;
flag = fcntl(sockfd, F_GETFL);
if(flag < 0)
err_sys("fcnt(F_GETFL) error!\n");
flag |= O_NONBLOCK;
if(fcntl(sockfd, F_SETFL, flag) < 0)
err_sys("fcon(F_SETFL) error!\n");
}
//echo.h
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#define SERV_PORT 9877
#define MAXLINE 4096
#define LISTENQ 5
void
err_sys(const char *fmt, ...);
ssize_t
readn(int fd, void *vptr, size_t n);
补充部分:
select()系统调用提供一个机制来实现同步多元I/O:
#include <sys/time.h> |
调用select()将阻塞,直到指定的文件描述符准备好执行I/O,或者可选参数timeout指定的时间已经过去。
监视的文件描述符分为三类set,每一种对应等待不同的事件。readfds中列出的文件描述符被监视是否有数据可供读取(如果读取操作完成则不会阻塞)。writefds中列出的文件描述符则被监视是否写入操作完成而不阻塞。最后,exceptfds中列出的文件描述符则被监视是否发生异常,或者无法控制的数据是否可用(这些状态仅仅应用于套接字)。这三类set可以是NULL,这种情况下select()不监视这一类事件。
select()成功返回时,每组set都被修改以使它只包含准备好I/O的文件描述符。例如,假设有两个文件描述符,值分别是7和9,被放在readfds中。当select()返回时,如果7仍然在set中,则这个文件描述符已经准备好被读取而不会阻塞。如果9已经不在set中,则读取它将可能会阻塞(我说可能是因为数据可能正好在select返回后就可用,这种情况下,下一次调用select()将返回文件描述符准备好读取)。
第一个参数n,等于所有set中最大的那个文件描述符的值加1。因此,select()的调用者负责检查哪个文件描述符拥有最大值,并且把这个值加1再传递给第一个参数。
timeout参数是一个指向timeval结构体的指针,timeval定义如下:
#include <sys/time.h> struct timeval { long tv_sec; /* seconds */ long tv_usec; /* 10E-6 second */ }; |
如果这个参数不是NULL,则即使没有文件描述符准备好I/O,select()也会在经过tv_sec秒和tv_usec微秒后返回。当select()返回时,timeout参数的状态在不同的系统中是未定义的,因此每次调用select()之前必须重新初始化timeout和文件描述符set。实际上,当前版本的Linux会自动修改timeout参数,设置它的值为剩余时间。因此,如果timeout被设置为5秒,然后在文件描述符准备好之前经过了3秒,则这一次调用select()返回时tv_sec将变为2。
如果timeout中的两个值都设置为0,则调用select()将立即返回,报告调用时所有未决的事件,但不等待任何随后的事件。
文件描述符set不会直接操作,一般使用几个助手宏来管理。这允许Unix系统以自己喜欢的方式来实现文件描述符set。但大多数系统都简单地实现set为位数组。FD_ZERO移除指定set中的所有文件描述符。每一次调用select()之前都应该先调用它。
fd_set writefds;
FD_ZERO(&writefds);
FD_SET添加一个文件描述符到指定的set中,FD_CLR则从指定的set中移除一个文件描述符:
FD_SET(fd, &writefds); /* add ‘fd‘ to the set */
FD_CLR(fd, &writefds); /* oops, remove ‘fd‘ from the set */
设计良好的代码应该永远不使用FD_CLR,而且实际情况中它也确实很少被使用。
FD_ISSET测试一个文件描述符是否指定set的一部分。如果文件描述符在set中则返回一个非0整数,不在则返回0。FD_ISSET在调用select()返回之后使用,测试指定的文件描述符是否准备好相关动作:
if (FD_ISSET(fd, &readfds))
/* ‘fd‘ is readable without blocking! */
因为文件描述符set是静态创建的,它们对文件描述符的最大数目强加了一个限制,能够放进set中的最大文件描述符的值由FD_SETSIZE指定。在Linux中,这个值是1024。本章后面我们还将看到这个限制的衍生物。
返回值和错误代码
select()成功时返回准备好I/O的文件描述符数目,包括所有三个set。如果提供了timeout,返回值可能是0;错误时返回-1,并且设置errno为下面几个值之一:
EBADF,给某个set提供了无效文件描述符。
EINTR,等待时捕获到信号,可以重新发起调用。
EINVAL,参数n为负数,或者指定的timeout非法。
ENOMEM,不够可用内存来完成请求。
--------------------------------------------------------------------------------------------------------------
poll()系统调用是System V的多元I/O解决方案。它解决了select()的几个不足,尽管select()仍然经常使用(多数还是出于习惯,或者打着可移植的名义):
#include <sys/poll.h> int poll (struct pollfd *fds, unsigned int nfds, int timeout); |
和select()不一样,poll()没有使用低效的三个基于位的文件描述符set,而是采用了一个单独的结构体pollfd数组,由fds指针指向这个组。pollfd结构体定义如下:
#include <sys/poll.h> struct pollfd { int fd; /* file descriptor */ short events; /* requested events to watch */ short revents; /* returned events witnessed */ }; |
每一个pollfd结构体指定了一个被监视的文件描述符,可以传递多个结构体,指示poll()监视多个文件描述符。每个结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域。revents域是文件描述符的操作结果事件掩码。内核在调用返回时设置这个域。events域中请求的任何事件都可能在revents域中返回。合法的事件如下:
POLLIN,有数据可读。
POLLRDNORM,有普通数据可读。
POLLRDBAND,有优先数据可读。
POLLPRI,有紧迫数据可读。
POLLOUT,写数据不会导致阻塞。
POLLWRNORM,写普通数据不会导致阻塞。
POLLWRBAND,写优先数据不会导致阻塞。
POLLMSG,SIGPOLL消息可用。
此外,revents域中还可能返回下列事件:
POLLER,指定的文件描述符发生错误。
POLLHUP,指定的文件描述符挂起事件。
POLLNVAL,指定的文件描述符非法。
这些事件在events域中无意义,因为它们在合适的时候总是会从revents中返回。使用poll()和select()不一样,你不需要显式地请求异常情况报告。POLLIN | POLLPRI等价于select()的读事件,POLLOUT | POLLWRBAND等价于select()的写事件。POLLIN等价于POLLRDNORM | POLLRDBAND,而POLLOUT则等价于POLLWRNORM。
例如,要同时监视一个文件描述符是否可读和可写,我们可以设置events为POLLIN | POLLOUT。在poll返回时,我们可以检查revents中的标志,对应于文件描述符请求的events结构体。如果POLLIN事件被设置,则文件描述符可以被读取而不阻塞。如果POLLOUT被设置,则文件描述符可以写入而不导致阻塞。这些标志并不是互斥的:它们可能被同时设置,表示这个文件描述符的读取和写入操作都会正常返回而不阻塞。timeout参数指定等待的毫秒数,无论I/O是否准备好,poll都会返回。timeout指定为负数值表示无限超时;timeout为0指示poll调用立即返回并列出准备好I/O的文件描述符,但并不等待其它的事件。这种情况下,poll()就像它的名字那样,一旦选举出来,立即返回。
返回值和错误代码
成功时,poll()返回结构体中revents域不为0的文件描述符个数;如果在超时前没有任何事件发生,poll()返回0;失败时,poll()返回-1,并设置errno为下列值之一:
EBADF,一个或多个结构体中指定的文件描述符无效。
EFAULT,fds指针指向的地址超出进程的地址空间。
EINTR,请求的事件之前产生一个信号,调用可以重新发起。
EINVAL,nfds参数超出PLIMIT_NOFILE值。
ENOMEM,可用内存不足,无法完成请求。
以上内容来自《OReilly.Linux.System.Programming - Talking.Directly.to.the.Kernel.and.C.Library.2007》
Epoll的优点:
1.支持一个进程打开大数目的socket描述符(FD)
select 最不能忍受的是一个进程所打开的FD是有一定限制的,由FD_SETSIZE设置,默认值是2048。对于那些需要支持的上万连接数目的IM服务器来说显然太少了。这时候你一是可以选择修改这个宏然后重新编译内核,不过资料也同时指出这样会带来网络效率的下降,二是可以选择多进程的解决方案(传统的 Apache方案),不过虽然linux上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。不过 epoll则没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。
2.IO效率不随FD数目增加而线性下降
传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,不过由于网络延时,任一时间只有部分的socket是"活跃"的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。但是epoll不存在这个问题,它只会对"活跃"的socket进行操作---这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。那么,只有"活跃"的socket才会主动的去调用 callback函数,其他idle状态socket则不会,在这点上,epoll实现了一个"伪"AIO,因为这时候推动力在os内核。在一些 benchmark中,如果所有的socket基本上都是活跃的---比如一个高速LAN环境,epoll并不比select/poll有什么效率,相反,如果过多使用epoll_ctl,效率相比还有稍微的下降。但是一旦使用idle connections模拟WAN环境,epoll的效率就远在select/poll之上了。
3.使用mmap加速内核与用户空间的消息传递。
这点实际上涉及到epoll的具体实现了。无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的。而如果你想我一样从2.5内核就关注epoll的话,一定不会忘记手工 mmap这一步的。
4.内核微调
这一点其实不算epoll的优点了,而是整个linux平台的优点。也许你可以怀疑linux平台,但是你无法回避linux平台赋予你微调内核的能力。比如,内核TCP/IP协议栈使用内存池管理sk_buff结构,那么可以在运行时期动态调整这个内存pool(skb_head_pool)的大小--- 通过echo XXXX>/proc/sys/net/core/hot_list_length完成。再比如listen函数的第2个参数(TCP完成3次握手的数据包队列长度),也可以根据你平台内存大小动态调整。更甚至在一个数据包面数目巨大但同时每个数据包本身大小却很小的特殊系统上尝试最新的NAPI网卡驱动架构。
########################################################
select/epoll的特点
select的特点:select 选择句柄的时候,是遍历所有句柄,也就是说句柄有事件响应时,select需要遍历所有句柄才能获取到哪些句柄有事件通知,因此效率是非常低。但是如果连接很少的情况下, select和epoll的LT触发模式相比, 性能上差别不大。
这 里要多说一句,select支持的句柄数是有限制的, 同时只支持1024个,这个是句柄集合限制的,如果超过这个限制,很可能导致溢出,而且非常不容易发现问题, TAF就出现过这个问题, 调试了n天,才发现:)当然可以通过修改linux的socket内核调整这个参数。
epoll的特点:epoll对于句柄事件的选择不是遍历的,是事件响应的,就是句柄上事件来就马上选择出来,不需要遍历整个句柄链表,因此效率非常高,内核将句柄用红黑树保存的。相比于select,epoll最大的好处在于它不会随着监听fd数目的增长而降低效率。因为在内核中的select实现中,它是采用轮询来处理的,轮询的fd数目越多,自然耗时越多。并且,在linux/posix_types.h头文件有这样的声明:
#define __FD_SETSIZE 1024
表示select最多同时监听1024个fd,当然,可以通过修改头文件再重编译内核来扩大这个数目,但这似乎并不治本。
对于epoll而言还有ET和LT的区别,LT表示水平触发,ET表示边缘触发,两者在性能以及代码实现上差别也是非常大的。
epoll的LT和ET的区别
LT:水平触发,效率会低于ET触发,尤其在大并发,大流量的情况下。但是LT对代码编写要求比较低,不容易出现问题。LT模式服务编写上的表现是:只要有数据没有被获取,内核就不断通知你,因此不用担心事件丢失的情况。
ET:边缘触发,效率非常高,在并发,大流量的情况下,会比LT少很多epoll的系统调用,因此效率高。但是对编程要求高,需要细致的处理每个请求,否则容易发生丢失事件的情况。
epoll相关API
epoll的接口非常简单,一共就三个函数:
1. int epoll_create(int size);
创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。
示例代码
epoll服务器
- #include <sys/epoll.h>
- #include <netinet/in.h>
- #include <sys/types.h> /* See NOTES */
- #include <sys/socket.h>
- #include <string.h>
- #include <stdio.h>
- #include <unistd.h>
- #include <fcntl.h>
- #include <errno.h>
- #include <stdlib.h>
- typedef struct sockaddr_in sockaddr_in ;
- typedef struct sockaddr sockaddr ;
- #define SER_PORT 8080
- int nonblock(int fd){
- int opt ;
- opt = fcntl(fd,F_GETFL);
- opt |= O_NONBLOCK ;
- return fcntl(fd,F_SETFL,opt);
- }
- int main(int argc,char**argv){
- sockaddr_in srv, cli ;
- int listen_fd ,con_fd ;
- socklen_t len;
- int res ,nsize,ws;
- char buf[255];
- int epfd,ers;
- struct epoll_event evn,events[50];
- int i;
- bzero(&srv,sizeof(srv));
- bzero(&cli,sizeof(cli));
- srv.sin_port= SER_PORT ;
- srv.sin_family = AF_INET ;
- listen_fd = socket(AF_INET,SOCK_STREAM,0);
- int yes = 1;
- setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int));
- if(bind(listen_fd,(sockaddr*)&srv,sizeof(sockaddr))<0) {
- perror("bind");
- exit(0);
- }
- listen(listen_fd,100);
- nonblock(listen_fd);
- epfd = epoll_create(200);
- evn.events = EPOLLIN|EPOLLET ;
- evn.data.fd = listen_fd;
- epoll_ctl(epfd,EPOLL_CTL_ADD,listen_fd,&evn);
- static int count ;
- while(1){
- ers = epoll_wait(epfd,events,100,5000);
- if(ers<0 ){
- perror("epoll_wait:");exit(0);
- }else if(ers==0){
- printf("time out:%d\n",count++);
- continue ;
- }
- for(i=0;i<ers;i++){
- if(events[i].data.fd == listen_fd){
- con_fd = accept(listen_fd,(sockaddr*)&cli ,&len);
- nonblock(con_fd);
- printf("connect from:%s\n",inet_ntoa(cli.sin_addr));
- evn.data.fd = con_fd;
- evn.events = EPOLLIN | EPOLLET ;
- epoll_ctl(epfd,EPOLL_CTL_ADD,con_fd,&evn);
- }else if(events[i].events & EPOLLIN){
- nsize = 0;
- while((res=read(events[i].data.fd,buf+nsize,sizeof(buf)-1))>0){
- nsize+= res;
- }
- if(res==0){
- epoll_ctl(epfd,EPOLL_CTL_DEL,events[i].data.fd,NULL);
- printf("a client over\n");
- close(con_fd);
- continue ;
- }else if(res<0 && errno!=EAGAIN){
- perror("read");
- continue ;
- }
- buf[nsize]=0;
- evn.data.fd = events[i].data.fd;
- evn.events=EPOLLOUT|EPOLLET ;
- epoll_ctl(epfd,EPOLL_CTL_MOD,events[i].data.fd,&evn);
- }else if(events[i].events & EPOLLOUT){
- nsize = strlen(buf);
- ws = 0;
- while(nsize>0){
- ws=write(events[i].data.fd,buf,nsize);
- nsize-=ws;
- }
- evn.data.fd = events[i].data.fd;
- evn.events=EPOLLIN|EPOLLET ;
- epoll_ctl(epfd,EPOLL_CTL_MOD,events[i].data.fd,&evn);
- }else{
- printf("others\n");
- }
- }
- }
- close(listen_fd);
- return 0;
- }
客户端测试代码:
- #include <sys/epoll.h>
- #include <netinet/in.h>
- #include <sys/types.h> /* See NOTES */
- #include <sys/socket.h>
- #include <strings.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <unistd.h>
- typedef struct sockaddr_in sockaddr_in ;
- typedef struct sockaddr sockaddr ;
- #define SER_PORT 8080
- #define IP_ADDR "10.33.28.230"
- int main(int argc,char**argv){
- sockaddr_in srv, cli ;
- int listen_fd ,con_fd ;
- socklen_t len;
- int res,ws ;
- char buf[255];
- bzero(&srv,sizeof(srv));
- bzero(&cli,sizeof(cli));
- srv.sin_port= SER_PORT ;
- srv.sin_family = AF_INET ;
- inet_pton(AF_INET,IP_ADDR,&srv.sin_addr);
- listen_fd = socket(AF_INET,SOCK_STREAM,0);
- if(connect(listen_fd,(sockaddr*)&srv,sizeof(sockaddr))<0){
- perror("connect");
- exit(0);
- }
- while(1){
- res = read(STDIN_FILENO,buf,sizeof(buf)-1);
- ws = write(listen_fd,buf,res);
- res = read(listen_fd,buf,sizeof(buf)-1);
- ws = write(STDOUT_FILENO,buf,res);
- }
- close(listen_fd);
- return 0;
- }
Linux下select, poll和epoll IO模型的详解(转)