首页 > 代码库 > 3高并发服务器:多路IO之epoll
3高并发服务器:多路IO之epoll
1 epoll
epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并、发连接中只有少量活跃的情况下的系统CPU利用率,因为它会复用文件描述符集合来传递结果而不用迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。
目前epell是linux大规模并发网络程序中的热门首选模型。
epoll除了提供select/ poll那种IO事件的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
一个进程打开大数目的socket描述符
cat/proc/sys/fs/file-max
设置最大打开文件描述符限制
sudo vi /etc/security/limits.conf
epollAPI
1.创建一个epoll句柄,参数size用来告诉内核监听的文件描述符个数,跟内存大小有关。
依赖的头文件
#include <sys/epoll.h>
函数声明
int epoll_create(int size);
函数说明:
size:告诉内核监听的数目
2控制某个epoll监听的文件描述符上的事件:注册、修改、删除
依赖的头文件
#include <sys/epoll.h>
函数声明
int epoll_ctl(int epfd,int op,int fd,structepoll_event);
函数说明:
epfd:为epoll_create的句柄
op:表示动作,用3个宏来表示
EPOLL_CTL_ADD(注册新的fd到epfd)
EPOLL_CTL_MOD(修改已经注册的fd的监听事件)
EPOLL_CTL_DEL(从epfd删除一个fd)
event:告诉内核需要监听的事件
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
EPOLLIN:表示对应的文件描述符可以读(包括对端SOCKET正常关闭)
EPOLLOUT:表示对应的文件描述符可以写
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)
EPOLLERR:表示对应的文件描述符发生错误
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
3等待所监控文件描述符上有事件的产生,类似select()调用。
依赖的头文件
#include <sys/epoll.h>
函数声明:
int epoll_wait(int epfd, struct epoll_event*events, int maxevents, int timeout);
参数介绍:
events:用来从内核得到事件的集合。
maxevents:告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,timeout:是超时时间
-1:阻塞
0:立即返回,非阻塞
>0:指定微秒
返回值:成功返回有多少文件描述符就绪,时间到时返回0,出错返回-1
案例说明:
server.c
#include<stdio.h> #include<stdlib.h> #include<string.h> #include<netinet/in.h> #include<arpa/inet.h> #include<sys/epoll.h> #include<errno.h> #include<unistd.h> #include<ctype.h> #include"wrap.h"
#define MAXLINE 80 #define SERV_PORT 8000 #define OPEN_MAX 1024
int main(void) { int i,j,maxi,listenfd,connfd,sockfd; int nready,efd,res; ssize_t n; char buf[MAXLINE],str[INET_ADDRSTRLEN]; socklen_t clilen; int client[OPEN_MAX]; struct sockaddr_in cliaddr,servaddr; //ep[OPEN_MAX]保存就绪的文件描述符 struct epoll_event tep,ep[OPEN_MAX];
listenfd = Socket(AF_INET,SOCK_STREAM,0);
bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(SERV_PORT);
Bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr)); Listen(listenfd,20);
for(i = 0;i < OPEN_MAX;i++) { client[i] = -1; } maxi = -1;
efd = epoll_create(OPEN_MAX);
if(efd == -1) perr_exit("epoll_create");
//监听读属性 tep.events = EPOLLIN; //data里面保存了就绪的文件的文件描述符。 tep.data.fd = listenfd; res = epoll_ctl(efd,EPOLL_CTL_ADD,listenfd,&tep); if(res == -1) perr_exit("epoll_ctl");
for(;;) { /*阻塞监听*/ nready = epoll_wait(efd,ep,OPEN_MAX,-1); if(nready == -1) { perr_exit("epoll_wait"); } for(i = 0;i< nready;i++) { if(!ep[i].events & EPOLLIN) continue; if(ep[i].data.fd == listenfd) { clilen = sizeof(cliaddr); connfd = Accept(listenfd,(struct sockaddr *)&cliaddr,&clilen); printf("received from %s at PORT %d\n", inet_ntop(AF_INET,&cliaddr.sin_addr,str,sizeof(str)), ntohs(cliaddr.sin_port));
for(j = 0; j < OPEN_MAX; j++) { if(client[j] < 0) { client[j] = connfd; /*save descriptor*/ break; } }
if(j==OPEN_MAX) perr_exit("too many clients");
if(j > maxi) maxi = j; /*max index in client[] array*/ tep.events = EPOLLIN; tep.data.fd = connfd; res = epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&tep); if(res == -1) { perr_exit("epoll_ctl"); } else { sockfd = ep[i].data.fd; n = Read(sockfd,buf,MAXLINE); if(n==0) { for(j = 0; j <= maxi;j++) { if(client[j] == sockfd) { client[j] = -1; break; } } res = epoll_ctl(efd,EPOLL_CTL_DEL,sockfd,NULL); if(res == -1) { perr_exit("epoll_ctl"); } Close(sockfd); printf("client[%d] closed connection",j); } else { for(j = 0;j < n;j++) buf[j] = toupper(buf[j]); Writen(sockfd,buf,n); } } } } } Close(listenfd); Close(efd); return 0; } |
client.c
#include<stdio.h> #include<string.h> #include<unistd.h> #include<arpa/inet.h> #include<netinet/in.h> #include"wrap.h"
#define MAXLINE 80 #define SERV_PORT 8000
int main(void) { struct sockaddr_in servaddr; char buf[MAXLINE]; int sockfd,n;
sockfd = Socket(AF_INET,SOCK_STREAM,0);
bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family = AF_INET; inet_pton(AF_INET,"127.0.0.1",&servaddr.sin_addr); servaddr.sin_port = htons(SERV_PORT);
Connect(sockfd,(struct sockaddr *) &servaddr,sizeof(servaddr));
while(fgets(buf,MAXLINE,stdin) != NULL) { Write(sockfd,buf,strlen(buf)); n = Read(sockfd,buf,MAXLINE); if(n == 0) printf("the other side has been closed.\n"); else Write(STDOUT_FILENO,buf,n); }
Close(sockfd); return 0; } |
wrap.h
#ifndef __WRAP_H_ #define __WRAP_H_
void perr_exit(const char *s); int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr); void Bind(int fd, const struct sockaddr *sa, socklen_t salen); void Connect(int fd, const struct sockaddr *sa, socklen_t salen); void Listen(int fd, int backlog); int Socket(int family, int type, int protocol); ssize_t Read(int fd, void *ptr, size_t nbytes); ssize_t Write(int fd, const void *ptr, size_t nbytes); void Close(int fd); ssize_t Readn(int fd, void *vptr, size_t n); ssize_t Writen(int fd, const void *vptr, size_t n); static ssize_t my_read(int fd, char *ptr); ssize_t Readline(int fd, void *vptr, size_t maxlen);
#endif |
wrap.c
#include <stdlib.h> #include <stdio.h> #include <errno.h> #include <unistd.h> #include <sys/socket.h>
void perr_exit(const char *s) { perror(s); exit(1); }
int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr) { int n;
again: if ( (n = accept(fd, sa, salenptr)) < 0) { if ((errno == ECONNABORTED) || (errno == EINTR)) goto again; else perr_exit("accept error"); } return n; }
void Bind(int fd, const struct sockaddr *sa, socklen_t salen) { if (bind(fd, sa, salen) < 0) perr_exit("bind error"); }
void Connect(int fd, const struct sockaddr *sa, socklen_t salen) { if (connect(fd, sa, salen) < 0) perr_exit("connect error"); }
void Listen(int fd, int backlog) { if (listen(fd, backlog) < 0) perr_exit("listen error"); }
int Socket(int family, int type, int protocol) { int n;
if ( (n = socket(family, type, protocol)) < 0) perr_exit("socket error"); return n; }
ssize_t Read(int fd, void *ptr, size_t nbytes) { ssize_t n;
again: if ( (n = read(fd, ptr, nbytes)) == -1) { if (errno == EINTR) goto again; else return -1; } return n; }
ssize_t Write(int fd, const void *ptr, size_t nbytes) { ssize_t n;
again: if ( (n = write(fd, ptr, nbytes)) == -1) { if (errno == EINTR) goto again; else return -1; } return n; }
void Close(int fd) { if (close(fd) == -1) perr_exit("close error"); } ssize_t Readn(int fd, void *vptr, size_t n) { size_t nleft; ssize_t nread; char *ptr;
ptr = vptr; nleft = n; while (nleft > 0) { if ( (nread = read(fd, ptr, nleft)) < 0) { if (errno == EINTR) nread = 0; else return -1; } else if (nread == 0) break;
nleft -= nread; ptr += nread; } return n - nleft; }
ssize_t Writen(int fd,const void *vptr, size_t n) { size_t nleft; ssize_t nwritten; const char *ptr;
ptr = vptr; nleft = n; while (nleft > 0) { if ( (nwritten = write(fd, ptr, nleft)) <= 0) { if (nwritten < 0 && errno == EINTR) nwritten = 0; else return -1; }
nleft -= nwritten; ptr += nwritten; } return n; } static ssize_t my_read(int fd, char *ptr) { static int read_cnt; static char *read_ptr; static char read_buf[100];
if (read_cnt <= 0) { again: if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0) { if (errno == EINTR) goto again; return -1; } else if (read_cnt == 0) return 0; read_ptr = read_buf; } read_cnt--; *ptr = *read_ptr++; return 1; }
ssize_t Readline(int fd, void *vptr, size_t maxlen) { ssize_t n, rc; char c, *ptr;
ptr = vptr; for (n = 1; n < maxlen; n++) { if ( (rc = my_read(fd, &c)) == 1) { *ptr++ = c; if (c == ‘\n‘) break; } else if (rc == 0) { *ptr = 0; return n - 1; } else return -1; } *ptr = 0; return n; } |
3高并发服务器:多路IO之epoll