首页 > 代码库 > 3高并发server:多路IO之epoll
3高并发server:多路IO之epoll
1 epoll
epoll是Linux下多路复用IO接口select/poll的增强版本号,它能显著提高程序在大量并、发连接中仅仅有少量活跃的情况下的系统CPU利用率,由于它会复用文件描写叙述符集合来传递结果而不用迫使开发人员每次等待事件之前都必须又一次准备要被侦听的文件描写叙述符集合,还有一点原因就是获取事件的时候,它无须遍历整个被侦听的描写叙述符集,仅仅要遍历那些被内核IO事件异步唤醒而增加Ready队列的描写叙述符集合即可了。
眼下epell是linux大规模并发网络程序中的热门首选模型。
epoll除了提供select/ poll那种IO事件的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,降低epoll_wait/epoll_pwait的调用,提高应用程序效率。
一个进程打开大数目的socket描写叙述符
cat/proc/sys/fs/file-max
设置最大打开文件描写叙述符限制
sudo vi /etc/security/limits.conf
epollAPI
1.创建一个epoll句柄,參数size用来告诉内核监听的文件描写叙述符个数,跟内存大小有关。
依赖的头文件
#include <sys/epoll.h>
函数声明
int epoll_create(int size);
函数说明:
size:告诉内核监听的数目
2控制某个epoll监听的文件描写叙述符上的事件:注冊、改动、删除
依赖的头文件
#include <sys/epoll.h>
函数声明
int epoll_ctl(int epfd,int op,int fd,structepoll_event);
函数说明:
epfd:为epoll_create的句柄
op:表示动作,用3个宏来表示
EPOLL_CTL_ADD(注冊新的fd到epfd)
EPOLL_CTL_MOD(改动已经注冊的fd的监听事件)
EPOLL_CTL_DEL(从epfd删除一个fd)
event:告诉内核须要监听的事件
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
EPOLLIN:表示相应的文件描写叙述符能够读(包含对端SOCKET正常关闭)
EPOLLOUT:表示相应的文件描写叙述符能够写
EPOLLPRI:表示相应的文件描写叙述符有紧急的数据可读(这里应该表示有带外数据到来)
EPOLLERR:表示相应的文件描写叙述符错误发生
EPOLLHUP:表示相应的文件描写叙述符被挂断;
EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的
EPOLLONESHOT:仅仅监听一次事件,当监听完这次事件之后,假设还须要继续监听这个socket的话,须要再次把这个socket增加到EPOLL队列里
3等待所监控文件描写叙述符上有事件的产生,类似select()调用。
依赖的头文件
#include <sys/epoll.h>
函数声明:
int epoll_wait(int epfd, struct epoll_event*events, int maxevents, int timeout);
參数介绍:
events:用来从内核得到事件的集合。
maxevents:告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,timeout:是超时时间
-1:堵塞
0:马上返回,非堵塞
>0:指定微秒
返回值:成功返回有多少文件描写叙述符就绪,时间到时返回0,出错返回-1
案例说明:
server.c
#include<stdio.h> #include<stdlib.h> #include<string.h> #include<netinet/in.h> #include<arpa/inet.h> #include<sys/epoll.h> #include<errno.h> #include<unistd.h> #include<ctype.h> #include"wrap.h"
#define MAXLINE 80 #define SERV_PORT 8000 #define OPEN_MAX 1024
int main(void) { int i,j,maxi,listenfd,connfd,sockfd; int nready,efd,res; ssize_t n; char buf[MAXLINE],str[INET_ADDRSTRLEN]; socklen_t clilen; int client[OPEN_MAX]; struct sockaddr_in cliaddr,servaddr; //ep[OPEN_MAX]保存就绪的文件描写叙述符 struct epoll_event tep,ep[OPEN_MAX];
listenfd = Socket(AF_INET,SOCK_STREAM,0);
bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(SERV_PORT);
Bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr)); Listen(listenfd,20);
for(i = 0;i < OPEN_MAX;i++) { client[i] = -1; } maxi = -1;
efd = epoll_create(OPEN_MAX);
if(efd == -1) perr_exit("epoll_create");
//监听读属性 tep.events = EPOLLIN; //data里面保存了就绪的文件的文件描写叙述符。 tep.data.fd = listenfd; res = epoll_ctl(efd,EPOLL_CTL_ADD,listenfd,&tep); if(res == -1) perr_exit("epoll_ctl");
for(;;) { /*堵塞监听*/ nready = epoll_wait(efd,ep,OPEN_MAX,-1); if(nready == -1) { perr_exit("epoll_wait"); } for(i = 0;i< nready;i++) { if(!ep[i].events & EPOLLIN) continue; if(ep[i].data.fd == listenfd) { clilen = sizeof(cliaddr); connfd = Accept(listenfd,(struct sockaddr *)&cliaddr,&clilen); printf("received from %s at PORT %d\n", inet_ntop(AF_INET,&cliaddr.sin_addr,str,sizeof(str)), ntohs(cliaddr.sin_port));
for(j = 0; j < OPEN_MAX; j++) { if(client[j] < 0) { client[j] = connfd; /*save descriptor*/ break; } }
if(j==OPEN_MAX) perr_exit("too many clients");
if(j > maxi) maxi = j; /*max index in client[] array*/ tep.events = EPOLLIN; tep.data.fd = connfd; res = epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&tep); if(res == -1) { perr_exit("epoll_ctl"); } else { sockfd = ep[i].data.fd; n = Read(sockfd,buf,MAXLINE); if(n==0) { for(j = 0; j <= maxi;j++) { if(client[j] == sockfd) { client[j] = -1; break; } } res = epoll_ctl(efd,EPOLL_CTL_DEL,sockfd,NULL); if(res == -1) { perr_exit("epoll_ctl"); } Close(sockfd); printf("client[%d] closed connection",j); } else { for(j = 0;j < n;j++) buf[j] = toupper(buf[j]); Writen(sockfd,buf,n); } } } } } Close(listenfd); Close(efd); return 0; } |
client.c
#include<stdio.h> #include<string.h> #include<unistd.h> #include<arpa/inet.h> #include<netinet/in.h> #include"wrap.h"
#define MAXLINE 80 #define SERV_PORT 8000
int main(void) { struct sockaddr_in servaddr; char buf[MAXLINE]; int sockfd,n;
sockfd = Socket(AF_INET,SOCK_STREAM,0);
bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family = AF_INET; inet_pton(AF_INET,"127.0.0.1",&servaddr.sin_addr); servaddr.sin_port = htons(SERV_PORT);
Connect(sockfd,(struct sockaddr *) &servaddr,sizeof(servaddr));
while(fgets(buf,MAXLINE,stdin) != NULL) { Write(sockfd,buf,strlen(buf)); n = Read(sockfd,buf,MAXLINE); if(n == 0) printf("the other side has been closed.\n"); else Write(STDOUT_FILENO,buf,n); }
Close(sockfd); return 0; } |
wrap.h
#ifndef __WRAP_H_ #define __WRAP_H_
void perr_exit(const char *s); int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr); void Bind(int fd, const struct sockaddr *sa, socklen_t salen); void Connect(int fd, const struct sockaddr *sa, socklen_t salen); void Listen(int fd, int backlog); int Socket(int family, int type, int protocol); ssize_t Read(int fd, void *ptr, size_t nbytes); ssize_t Write(int fd, const void *ptr, size_t nbytes); void Close(int fd); ssize_t Readn(int fd, void *vptr, size_t n); ssize_t Writen(int fd, const void *vptr, size_t n); static ssize_t my_read(int fd, char *ptr); ssize_t Readline(int fd, void *vptr, size_t maxlen);
#endif |
wrap.c
#include <stdlib.h> #include <stdio.h> #include <errno.h> #include <unistd.h> #include <sys/socket.h>
void perr_exit(const char *s) { perror(s); exit(1); }
int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr) { int n;
again: if ( (n = accept(fd, sa, salenptr)) < 0) { if ((errno == ECONNABORTED) || (errno == EINTR)) goto again; else perr_exit("accept error"); } return n; }
void Bind(int fd, const struct sockaddr *sa, socklen_t salen) { if (bind(fd, sa, salen) < 0) perr_exit("bind error"); }
void Connect(int fd, const struct sockaddr *sa, socklen_t salen) { if (connect(fd, sa, salen) < 0) perr_exit("connect error"); }
void Listen(int fd, int backlog) { if (listen(fd, backlog) < 0) perr_exit("listen error"); }
int Socket(int family, int type, int protocol) { int n;
if ( (n = socket(family, type, protocol)) < 0) perr_exit("socket error"); return n; }
ssize_t Read(int fd, void *ptr, size_t nbytes) { ssize_t n;
again: if ( (n = read(fd, ptr, nbytes)) == -1) { if (errno == EINTR) goto again; else return -1; } return n; }
ssize_t Write(int fd, const void *ptr, size_t nbytes) { ssize_t n;
again: if ( (n = write(fd, ptr, nbytes)) == -1) { if (errno == EINTR) goto again; else return -1; } return n; }
void Close(int fd) { if (close(fd) == -1) perr_exit("close error"); } ssize_t Readn(int fd, void *vptr, size_t n) { size_t nleft; ssize_t nread; char *ptr;
ptr = vptr; nleft = n; while (nleft > 0) { if ( (nread = read(fd, ptr, nleft)) < 0) { if (errno == EINTR) nread = 0; else return -1; } else if (nread == 0) break;
nleft -= nread; ptr += nread; } return n - nleft; }
ssize_t Writen(int fd,const void *vptr, size_t n) { size_t nleft; ssize_t nwritten; const char *ptr;
ptr = vptr; nleft = n; while (nleft > 0) { if ( (nwritten = write(fd, ptr, nleft)) <= 0) { if (nwritten < 0 && errno == EINTR) nwritten = 0; else return -1; }
nleft -= nwritten; ptr += nwritten; } return n; } static ssize_t my_read(int fd, char *ptr) { static int read_cnt; static char *read_ptr; static char read_buf[100];
if (read_cnt <= 0) { again: if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0) { if (errno == EINTR) goto again; return -1; } else if (read_cnt == 0) return 0; read_ptr = read_buf; } read_cnt--; *ptr = *read_ptr++; return 1; }
ssize_t Readline(int fd, void *vptr, size_t maxlen) { ssize_t n, rc; char c, *ptr;
ptr = vptr; for (n = 1; n < maxlen; n++) { if ( (rc = my_read(fd, &c)) == 1) { *ptr++ = c; if (c == ‘\n‘) break; } else if (rc == 0) { *ptr = 0; return n - 1; } else return -1; } *ptr = 0; return n; } |
3高并发server:多路IO之epoll