首页 > 代码库 > I/O多路复用---epoll函数测试
I/O多路复用---epoll函数测试
参考文章来源:
epoll使用详解(精髓)
Epoll学习笔记
epoll是直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。
epoll的接口非常简单,一共就三个函数:
1. intepoll_create(int size);
创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
2. intepoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll的事件注册函数,它不同与select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_create()的返回值,第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
第三个参数是需要监听的fd;
第四个参数是告诉内核需要监听什么事,structepoll_event结构如下:
typedef unionepoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events可以是以下几个宏的集合:
EPOLLIN:表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET:将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。
3. intepoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
函数功能:
返回值:该函数返回需要处理的事件数目,如返回0表示已超时。
参数含义:
参数events用来从内核得到事件的集合(储存所有的读写事件);
maxevents告之内核这个events有多大,这个maxevents(所有socket句柄数)的值不能大于创建epoll_create()时的size;
参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。
epoll还是poll的一种优化,返回后不需要对所有的fd进行遍历,在内核中维持了fd的列表。select和poll是将这个内核列表维持在用户态,然后传递到内核中。但是只有在2.6的内核才支持。
epoll更适合于处理大量的fd ,且活跃fd不是很多的情况,毕竟fd较多还是一个串行的操作。
在许多测试中我们会看到如果没有大量的idle-connection或者dead-connection,epoll的效率并不会比select/poll高很多,但是当我们遇到大量的idle-connection(例如WAN环境中存在大量的慢速连接),就会发现epoll的效率大大高于select/poll。
源代码:
epoll_server.cpp
#include <netinet/in.h> #include <arpa/inet.h> #include <memory.h> #include <string.h> #include <fcntl.h> #include <sys/epoll.h> #include <iostream> #include <stdio.h> #include <stdlib.h> #include <string> #include <errno.h> using namespace std; class CTCPServer { public: CTCPServer(int nServerPort, int nLengthOfQueueOfListen = 100, const char *strBoundIP = NULL) { m_nServerPort = nServerPort; m_nLengthOfQueueOfListen = nLengthOfQueueOfListen; if(NULL == strBoundIP) { m_strBoundIP = NULL; } else { int length = strlen(strBoundIP); m_strBoundIP = new char[length + 1]; memcpy(m_strBoundIP, strBoundIP, length + 1); } } virtual ~CTCPServer() { if(m_strBoundIP != NULL) { delete [] m_strBoundIP; } } public: int Run() { const int MAXEPOLLSIZE = 100; const int MAXEVENTSIZE = 50; int nListenSocket = socket(AF_INET, SOCK_STREAM, 0); if(-1 == nListenSocket) { cout << "socket error" << std::endl; return -1; } SetNonBlock(nListenSocket);//非阻塞 recv函数 没有数据就绪马上返回 sockaddr_in ServerAddress; memset(&ServerAddress, 0, sizeof(sockaddr_in)); ServerAddress.sin_family = AF_INET; if(NULL == m_strBoundIP) { ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY); } else { if(inet_pton(AF_INET, m_strBoundIP, &ServerAddress.sin_addr) != 1) { cout << "inet_pton error" << endl; close(nListenSocket); return -1; } } ServerAddress.sin_port = htons(m_nServerPort); int on = 1; setsockopt(nListenSocket,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)); if(bind(nListenSocket, (sockaddr *)&ServerAddress, sizeof(sockaddr_in)) == -1) { cout << "bind error" << endl; close(nListenSocket); return -1; } if(listen(nListenSocket, m_nLengthOfQueueOfListen) == -1) { cout << "listen error" << endl; close(nListenSocket); return -1; } int efd; struct epoll_event ev;//告诉内核要监听的事件 struct epoll_event events[MAXEPOLLSIZE];//返回从内核得到的已经就绪的事件集合 efd = epoll_create(MAXEPOLLSIZE);//create epoll handler ev.events = EPOLLIN|EPOLLET; ev.data.fd = nListenSocket; string recv_buf;//---- if(epoll_ctl(efd,EPOLL_CTL_ADD,nListenSocket,&ev)<0) { cout<<"epoll_ctl() error"<<endl; return -1; } while(1) { int n,i; int len; int con_fd; char buf[256]; cout << "epoll_wait()..." << endl; // 返回需要处理的就绪事件的数目 n = epoll_wait(efd,events,MAXEVENTSIZE,-1); cout << "n=" << n << endl; for(i=0;i<n;i++) { /* cout << "i=" << i << endl; cout << "n=" << n << endl; cout << "events[i].data.fd: " << events[i].data.fd << endl;*/ /* if((events[i].events&EPOLLERR)|| (events[i].events&EPOLLHUP)|| (!(events[i].events&EPOLLIN))) /* An error has occured on this fd, or the socket is not ready for reading (why were we notified then?) */ /*{ cout<<"epoll error"<<endl; close(events[i].data.fd); continue; }*/ //就绪事件的文件描述符为 监听套接字 /*else*/ if(nListenSocket == events[i].data.fd) { sockaddr_in ClientAddress; socklen_t LengthOfClientAddress = sizeof(sockaddr_in); int nConnectedSocket = accept(nListenSocket, (sockaddr *)&ClientAddress, &LengthOfClientAddress); if(-1 == nConnectedSocket) { cout << "accept error" << std::endl; close(nListenSocket); return -1; } cout << "Connection from :" << inet_ntoa(ClientAddress.sin_addr)<< ":" << ntohs(ClientAddress.sin_port) << endl; SetNonBlock(nConnectedSocket);//**设置连接套接字为非阻塞状态 cout << "nConnectedSocked: " << nConnectedSocket << endl; //ev.events = EPOLLIN|EPOLLOUT|EPOLLET;//read write edge_triggered ev.events = EPOLLIN|EPOLLOUT|EPOLLET;// ev.data.fd = nConnectedSocket; //注册新的fd到efd句柄中 if(epoll_ctl(efd,EPOLL_CTL_ADD,nConnectedSocket,&ev)<0) { cout<<"epoll_ctl() error"<<endl; return -1; } } else if(events[i].events&EPOLLIN)//readable { int res = 1; recv_buf = ""; cout << i << ":epollin..." << endl; con_fd = events[i].data.fd; if(con_fd < 0) { cout << "con_fd < 0" << endl; break; } else { //cout << "con_fd = " << con_fd << endl; while(((len = recv(con_fd,buf,sizeof(buf)-1,0))!=-1)&&(errno != EAGAIN)) { buf[len] = '\0'; cout <<"len = " << len << ", buf: " << buf; recv_buf += buf; if(len == sizeof(buf) - 1)//has more data to read continue; else if((len < sizeof(buf) - 1)&& (len > 0))//the last data segment break; else if(len == 0)//the peer has closed the socket { cout << "the peer has closed the socket..." << endl; close(con_fd); ev.data.fd = con_fd; if(epoll_ctl(efd,EPOLL_CTL_DEL,con_fd,&ev) < 0) { cout<<"epoll_ctl() error"<<endl; return -1; } break; } } if(recv_buf != "") cout << "Recv:" << recv_buf << endl; else if(errno == EAGAIN) cout << "no data in the buffer to read..." << endl; } } else if(events[i].events&EPOLLOUT)//writeable { cout << i<< ":epollout..." << endl; con_fd = events[i].data.fd; if(con_fd < 0) { cout << "con_fd < 0" << endl; break; } else { //cout << "con_fd = " << con_fd << endl; strcpy(buf,"hello client..."); if((len=send(con_fd,buf,sizeof(buf),0)) == -1) { perror("send"); exit(1); } } } else if(events[i].events&EPOLLHUP) { cout << i << ":epollhup..." << endl; } else if(events[i].events&EPOLLERR) { cout << i << ":epollerr..." << endl; } else { cout << i << ":other events..." << endl; } }//end of "for" }//end of "while" close(nListenSocket); return 0; }//end of int Run() private: virtual void ServerFunction(int nConnectedSocket, int nListenSocket) { } static int SetNonBlock(int fd) { int flags = fcntl(fd,F_GETFL,0); if(flags == -1) { cout<<"fcntl error"<<endl; return -1; } flags |= O_NONBLOCK; if(fcntl(fd,F_SETFL,flags) == -1) { cout<<"fcntl error"<<endl; return -1; } return 0; } private: int m_nServerPort; char* m_strBoundIP; int m_nLengthOfQueueOfListen; }; class CMyTCPServer : public CTCPServer { public: CMyTCPServer(int nServerPort, int nLengthOfQueueOfListen = 100, const char *strBoundIP = NULL) : CTCPServer(nServerPort, nLengthOfQueueOfListen, strBoundIP) { } virtual ~CMyTCPServer() { } private: virtual void ServerFunction(int nConnectedSocket, int nListenSocket) { char buf[14]; write(nConnectedSocket, "Hello World\n", 13); read(nConnectedSocket,buf,14); cout<<buf<<endl; close(nConnectedSocket); } }; int main() { // CTCPServer(int nServerPort, int nLengthOfQueueOfListen = 100, const char *strBoundIP = NULL) CMyTCPServer myserver(4002); myserver.Run(); return 0; }
test_client.cpp
#include <stdio.h> #include <stdlib.h> #include <errno.h> #include <string.h> #include <netdb.h> #include <sys/types.h> #include <netinet/in.h> #include <sys/socket.h> #include <unistd.h> /* 服务器程序监听的端口号 */ //#define PORT 1240 /* 我们一次所能够接收的最大字节数 */ #define MAXDATASIZE 100 int main(int argc, char *argv[]) { /* 套接字描述符 */ int sockfd, numbytes; char buf[MAXDATASIZE]; int port; struct hostent *he; /* 连接者的主机信息 */ struct sockaddr_in their_addr; /* 检查参数信息 */ if(argc!= 3) { /* 如果没有参数,则给出使用方法后退出 */ fprintf(stderr,"usage: server_host server_port\n"); exit(1); } /* 取得主机信息 */ if ((he=gethostbyname(argv[1])) == NULL) { /* 如果 gethostbyname()发生错误,则显示错误信息并退出 */ herror("gethostbyname"); exit(1); } if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { /* 如果 socket()调用出现错误则显示错误信息并退出 */ perror("socket"); exit(1); } port = atoi(argv[2]);// /* 主机字节顺序 */ their_addr.sin_family = AF_INET; /* 网络字节顺序,短整型 */ their_addr.sin_port = htons(port); their_addr.sin_addr = *((struct in_addr *)he->h_addr); /* 将结构剩下的部分清零*/ bzero(&(their_addr.sin_zero), 8); if(connect(sockfd, (struct sockaddr *)&their_addr, sizeof(struct sockaddr)) == -1) { /* 如果 connect()建立连接错误,则显示出错误信息,退出 */ perror("connect"); exit(1); } if((numbytes=recv(sockfd, buf, MAXDATASIZE, 0)) == -1) { // 如果接收数据错误,则显示错误信息并退出 perror("recv"); exit(1); } buf[numbytes] = '\0'; printf("Received: %s\n",buf); int count; for(count = 0;count < 2;count++) { strcpy(buf,"hello server,i'm client!\n"); send(sockfd,buf,strlen(buf),0); } sleep(10); strcpy(buf,"hello server,10 s has passed, i've come back now\n"); send(sockfd,buf,strlen(buf),0); /* sleep(100); strcpy(buf,"Received your message2!\n"); send(sockfd,buf,strlen(buf),0); */ sleep(10); close(sockfd); return 0; }
运行结果:
Server端:
Client端: