首页 > 代码库 > 高级I/O复用技术:Epoll的使用及一个完整的C实例
高级I/O复用技术:Epoll的使用及一个完整的C实例
高性能的网络服务器需要同时并发处理大量的客户端,而采用以前的那种对每个连接使用一个分开的线程或进程方法效率不高,因为处理大量客户端的时候,资源的使用及进程上下文的切换将会影响服务器的性能。一个可替代的方法是在一个单一的线程中使用非阻塞的I/O(non-blocking I/O)。
这篇文章主要介绍linux下的epoll(7)方法,其有着良好的就绪事件通知机制。我们将会使用C来展现一个完整的TCP服务器实现代码。Epoll是被linux2.6开始引进的,但是不被其他的类UNIX系统支持,它提供了一种类似select或poll函数的机制:
1.
Select(2)只能够同时管理FD_SETSIZE
数目的文件描述符
2. poll(2)没有固定的描述符上限这一限制,但是每次必须遍历所有的描述符来检查就绪的描述符,这个过程的时间复杂度为O(N)。
epoll没有select这样对文件描述符上限的限制,也不会像poll那样进行线性的遍历。因此epoll处理大并发连接有着更高的性能。
Epoll相关操作函数介绍:
1. epoll_create(2) or epoll_create1(2)(有着不同的参数值)用来创建epoll实例。
/usr/include/sys/epoll.hextern int epoll_create (int __size) ;RETURN:>0, 成功;-1, 出错
函数描述:
(1) epoll_create返回的是一个文件描述符,也就是说epoll是以特殊文件的方式体现给用户
(2) __size提示操作系统,用户可能要使用多少个文件描述符,该参数已经废弃,填写一个大于0的正整数
2. epoll_ctl(2)用来增加或移除被epoll所监听的文件描述符。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);RETURN:0,成功;-1,出错
函数描述:
(1) epfd为epoll_create创建的epoll描述符
(2) epoll_ctl函数对epoll进行op类型的操作,op选项为
EPOLL_CTL_ADD,对fd描述符注册event事件
EPOLL_CTL_MOD,对fd描述符的event事件进行修改
EPOLL_CTL_DEL,删除已注册的event事件
3. epoll_wait(2)用来等待发生在监听描述符上的事件。它会一直阻塞直到事件发生。
#include <sys/epoll.h>int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);RETURN:>0,发生事件个数;=0,时间到;-1,出错
函数描述:
epoll_wait与select函数类似,同步地等待事件发生
(1) epfd,标识epoll的文件描述符
(2) events,指向传入操作系统的一个epoll_event数组
(3) maxevents,表示传入数组的大小,必须大于0
当有事件发生,Linux会填写events结构,返回给应用程序。由于epoll_wait同步等待,有可能被信号中断,返回EINTR错误
更多的函数介绍请参照man。
Epoll的两种模式:
1. 水平触发(LT):使用此种模式,当数据可读的时候,epoll_wait()将会一直返回就绪事件。如果你没有处理完全部数据,并且再次在该epoll实例上调用epoll_wait()才监听描述符的时候,它将会再次返回就绪事件,因为有数据可读。ET只支持非阻塞socket。
2. 边缘触发(ET):使用此种模式,只能获取一次就绪通知,如果没有处理完全部数据,并且再次调用epoll_wait()的时候,它将会阻塞,因为就绪事件已经释放出来了。
ET的效能更高,但是对程序员的要求也更高。在ET模式下,我们必须一次干净而彻底地处理完所有事件。LT两种模式的socket都支持。
传递给epoll_ctl(2)的Epoll事件结构体如下所示:
typedefunionepoll_data{void*ptr;intfd; __uint32_t u32; __uint64_t u64;}epoll_data_t;structepoll_event{ __uint32_t events;/* Epoll events */epoll_data_t data;/* User data variable */};
对于每一个监听的描述符,能够关联一个整形数据或指向用户数据的指针。
epoll的事件类型:
enum EPOLL_EVENTS { EPOLLIN = 0x001,#define EPOLLIN EPOLLIN EPOLLPRI = 0x002,#define EPOLLPRI EPOLLPRI EPOLLOUT = 0x004,#define EPOLLOUT EPOLLOUT EPOLLRDNORM = 0x040,#define EPOLLRDNORM EPOLLRDNORM EPOLLRDBAND = 0x080,#define EPOLLRDBAND EPOLLRDBAND EPOLLWRNORM = 0x100,#define EPOLLWRNORM EPOLLWRNORM EPOLLWRBAND = 0x200,#define EPOLLWRBAND EPOLLWRBAND EPOLLMSG = 0x400,#define EPOLLMSG EPOLLMSG EPOLLERR = 0x008,#define EPOLLERR EPOLLERR EPOLLHUP = 0x010,#define EPOLLHUP EPOLLHUP EPOLLRDHUP = 0x2000,#define EPOLLRDHUP EPOLLRDHUP EPOLLONESHOT = (1 << 30),#define EPOLLONESHOT EPOLLONESHOT EPOLLET = (1 << 31)#define EPOLLET EPOLLET };
– EPOLLIN,读事件
– EPOLLOUT,写事件
– EPOLLPRI,带外数据,与select的异常事件集合对应
– EPOLLRDHUP,TCP连接对端至少写写半关闭
– EPOLLERR,错误事件
– EPOLLET,设置事件为边沿触发
– EPOLLONESHOT,只触发一次,事件自动被删除
epoll在一个文件描述符上只能有一个事件,在一个描述符上添加多个事件,会产生EEXIST的错误。同样,删除epoll的事件,只需描述符就够了
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
这里有一个比较重要的问题:从epoll_wait返回的events中,该如何知道是哪个描述符上的事件:在注册epoll事件的时候,一定要填写epoll_data,否则我们将分不清触发的是哪个描述符上的事件。
下面我们将实现一个轻型TCP服务器,功能是在标准输出中打印发送给套接字的一切数据。
/* * ===================================================================================== * * Filename: EpollServer.c * * Description: this is a epoll server example * * Version: 1.0 * Created: 2012年03月15日 20时24分26秒 * Revision: none * Compiler: gcc * * Author: LGP (), lgp8819@gmail.com * Company: * * ===================================================================================== */#include <stdio.h>#include <stdlib.h>#include <string.h>#include <sys/epoll.h>#include <sys/types.h>#include <sys/socket.h>#include <errno.h>#include <fcntl.h>#include <unistd.h>#include <netdb.h>/*struct addrinfo { int ai_flags; int ai_family; int ai_socktype; int ai_protocol; size_t ai_addrlen; struct sockaddr *ai_addr; char *ai_canonname; struct addrinfo *ai_next;}; */static int create_and_bind(char* port){ struct addrinfo hints; struct addrinfo*result,*rp; int s,sfd; memset(&hints,0,sizeof(struct addrinfo)); hints.ai_family= AF_UNSPEC;/* Return IPv4 and IPv6 */ hints.ai_socktype= SOCK_STREAM;/* TCP socket */ hints.ai_flags= AI_PASSIVE;/* All interfaces */ s = getaddrinfo(NULL, port,&hints,&result); //more info about getaddrinfo() please see:man getaddrinfo! if(s != 0) { fprintf(stderr,"getaddrinfo: %s\n",gai_strerror(s)); return -1; } for(rp= result;rp!= NULL;rp=rp->ai_next) { sfd = socket(rp->ai_family,rp->ai_socktype,rp->ai_protocol); if(sfd==-1) continue; s =bind(sfd,rp->ai_addr,rp->ai_addrlen); if(s ==0) { /* We managed to bind successfully! */ break; } close(sfd); } if(rp== NULL) { fprintf(stderr,"Could not bind\n"); return-1; } freeaddrinfo(result); return sfd;}static int make_socket_non_blocking(int sfd){ int flags, s; flags = fcntl(sfd, F_GETFL,0); if(flags == -1) { perror("fcntl"); return-1; } flags|= O_NONBLOCK; s =fcntl(sfd, F_SETFL, flags); if(s ==-1) { perror("fcntl"); return-1; } return 0;}#define MAXEVENTS 64int main(int argc,char*argv[]){ int sfd, s; int efd; struct epoll_event event; struct epoll_event* events; if(argc!=2) { fprintf(stderr,"Usage: %s [port]\n",argv[0]); exit(EXIT_FAILURE); } sfd = create_and_bind(argv[1]); if( sfd == -1 ) abort(); s = make_socket_non_blocking(sfd); if(s ==-1) abort(); s = listen(sfd, SOMAXCONN); if(s ==-1) { perror("listen"); abort(); } efd = epoll_create1(0); if(efd==-1) { perror("epoll_create"); abort(); } event.data.fd=sfd; event.events= EPOLLIN | EPOLLET; s =epoll_ctl(efd, EPOLL_CTL_ADD,sfd,&event); if(s ==-1) { perror("epoll_ctl"); abort(); } /* Buffer where events are returned */ events=calloc(MAXEVENTS,sizeof event); /* The event loop */ while(1) { int n,i; n =epoll_wait(efd, events, MAXEVENTS,-1); for(i=0;i< n;i++) { if((events[i].events & EPOLLERR)|| (events[i].events & EPOLLHUP)|| (!(events[i].events & EPOLLIN))) { /* An error has occured on this fd, or the socket is not ready for reading (why were we notified then?) */ fprintf(stderr,"epoll error\n"); close(events[i].data.fd); continue; } else if(sfd == events[i].data.fd) { /* We have a notification on the listening socket, which means one or more incoming connections. */ while(1) { struct sockaddr in_addr; socklen_t in_len; int infd; char hbuf[NI_MAXHOST],sbuf[NI_MAXSERV]; in_len = sizeof in_addr; infd = accept(sfd,&in_addr,&in_len); if(infd==-1) { if((errno== EAGAIN)|| (errno== EWOULDBLOCK)) { /* We have processed all incoming connections. */ break; } else { perror("accept"); break; } } s =getnameinfo(&in_addr,in_len, hbuf,sizeof hbuf, sbuf,sizeof sbuf, NI_NUMERICHOST | NI_NUMERICSERV); if(s ==0) { printf("Accepted connection on descriptor %d " "(host=%s, port=%s)\n",infd,hbuf,sbuf); } /* Make the incoming socket non-blocking and add it to the list of fds to monitor. */ s = make_socket_non_blocking(infd); if(s ==-1) abort(); event.data.fd=infd; event.events= EPOLLIN | EPOLLET; s = epoll_ctl(efd, EPOLL_CTL_ADD,infd,&event); if(s ==-1) { perror("epoll_ctl"); abort(); } } continue; } else { /* We have data on the fd waiting to be read. Read and display it. We must read whatever data is available completely, as we are running in edge-triggered mode and won‘t get a notification again for the same data. */ int done =0; while(1) { ssize_t count; char buf[512]; count = read(events[i].data.fd,buf,sizeof buf); if(count == -1) { /* If errno == EAGAIN, that means we have read all data. So go back to the main loop. */ if(errno!= EAGAIN) { perror("read"); done=1; } break; } else if(count ==0) { /* End of file. The remote has closed the connection. */ done=1; break; } /* Write the buffer to standard output */ s = write(1,buf, count); if(s ==-1) { perror("write"); abort(); } } if(done) { printf("Closed connection on descriptor %d\n",events[i].data.fd); /* Closing the descriptor will make epoll remove it from the set of descriptors which are monitored. */ close(events[i].data.fd); } } } } free(events); close(sfd); return EXIT_SUCCESS;}
以下是使用c++对epoll简单的封装类:
/** * @file file.h * @comment * wrap of file descriptor * * @author niexw */#ifndef _XCOM_FILE_H_#define _XCOM_FILE_H_#include <stdio.h>#include <fcntl.h>#include <sys/uio.h>#include <fcntl.h>#include "exception.h"#include "buffer.h"namespace unp{/** * @class File * @comment * wrap of file descriptor */class File{protected: int fd_;public: // // construtor and destructor // File() : fd_(-1) {} explicit File(FILE *stream) : fd_(fileno(stream)) {} ~File() { close(); } int getFd() { return fd_; } int getFd() const { return fd_; } size_t read(char *buf, size_t count) const { int ret; RETRY: if ((ret = ::read(fd_, buf, count)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } return ret; } size_t write(char *buf, size_t count) const { int ret; RETRY: if ((ret = ::write(fd_, buf, count)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } return ret; } void close() { if (fd_ != -1) { ::close(fd_); fd_ = -1; } } void setNonblock() { int flags = fcntl(fd_, F_GETFL); if (flags == -1) throw EXCEPTION(); flags |= O_NONBLOCK; flags = fcntl(fd_, F_SETFL, flags); if (flags == -1) throw EXCEPTION(); } void clrNonblock() { int flags = fcntl(fd_, F_GETFL); if (flags == -1) throw EXCEPTION(); flags &= ~O_NONBLOCK; flags = fcntl(fd_, F_SETFL, flags); if (flags == -1) throw EXCEPTION(); } size_t readv(CircleBuffer &buf) { int ret; RETRY: if ((ret = ::readv(fd_, buf.idle_, buf.idlenum_)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } buf.afterRead(ret); return ret; } size_t writev(CircleBuffer &buf) { int ret; RETRY: if ((ret = ::writev(fd_, buf.data_, buf.datanum_)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } buf.afterWrite(ret); return ret; } void setFlag(int option) { int flags; RETRY: flags = fcntl(fd_, F_GETFL); if (flags == -1) { if (errno == EINTR) goto RETRY; else throw EXCEPTION(); } flags |= option; RETRY1: int ret = fcntl(fd_, F_SETFL, flags); if (ret == -1) { if (errno == EINTR) goto RETRY1; else throw EXCEPTION(); } } void clrFlag(int option) { int flags; RETRY: flags = fcntl(fd_, F_GETFL); if (flags == -1) { if (errno == EINTR) goto RETRY; else throw EXCEPTION(); } flags &= ~option; RETRY1: int ret = fcntl(fd_, F_SETFL, flags); if (ret == -1) { if (errno == EINTR) goto RETRY1; else throw EXCEPTION(); } }};/** * @class File2 * @comment * wrap of file descriptor */class File2{protected: int descriptor_;public: File2() : descriptor_(-1) { } explicit File2(FILE *stream) : descriptor_(fileno(stream)) { } explicit File2(File2 &f) : descriptor_(f.descriptor_) { f.descriptor_ = -1; } ~File2() { close(); } int descriptor() { return descriptor_; } size_t read(char *buf, size_t count) { int ret; RETRY: if ((ret = ::read(descriptor_, buf, count)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } return ret; } size_t write(char *buf, size_t count) const { int ret; RETRY: if ((ret = ::write(descriptor_, buf, count)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } return ret; } void close() { if (descriptor_ != -1) { ::close(descriptor_); descriptor_ = -1; } } size_t readv(const struct iovec *iov, int cnt) { int ret; RETRY: if ((ret = ::readv(descriptor_, iov, cnt)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } return ret; } size_t writev(const struct iovec *iov, int cnt) { int ret; RETRY: if ((ret = ::writev(descriptor_, iov, cnt)) == -1) { if (errno == EAGAIN) goto RETRY; else throw EXCEPTION(); } return ret; } void setControlOption(int option) { int flags; RETRY: flags = fcntl(descriptor_, F_GETFL); if (flags == -1) { if (errno == EINTR) goto RETRY; else throw EXCEPTION(); } flags |= option; RETRY1: int ret = fcntl(descriptor_, F_SETFL, flags); if (ret == -1) { if (errno == EINTR) goto RETRY1; else throw EXCEPTION(); } } void clearControlOption(int option) { int flags; RETRY: flags = fcntl(descriptor_, F_GETFL); if (flags == -1) { if (errno == EINTR) goto RETRY; else throw EXCEPTION(); } flags &= ~option; RETRY1: int ret = fcntl(descriptor_, F_SETFL, flags); if (ret == -1) { if (errno == EINTR) goto RETRY1; else throw EXCEPTION(); } } void setNonblock() { int flags = fcntl(descriptor_, F_GETFL); if (flags == -1) throw EXCEPTION(); flags |= O_NONBLOCK; flags = fcntl(descriptor_, F_SETFL, flags); if (flags == -1) throw EXCEPTION(); } void clrNonblock() { int flags = fcntl(descriptor_, F_GETFL); if (flags == -1) throw EXCEPTION(); flags &= ~O_NONBLOCK; flags = fcntl(descriptor_, F_SETFL, flags); if (flags == -1) throw EXCEPTION(); }};}; // namespace unp#endif // _XCOM_FILE_H_
/** * @file epoll.h * @comment * wrap of epoll * * @author niexw */#ifndef _UNP_EPOLL_H_#define _UNP_EPOLL_H_#include <sys/epoll.h>#include <assert.h>#include <map>#include <strings.h>#include "file.h"namespace unp{/** * @class Epoll * @comment * wrap of epoll */class Epoll : public File{public: Epoll() {} ~Epoll() {} struct Event : public epoll_event { Event() { events = EPOLLERR; data.u64 = 0; } Event(unsigned int type, void *magic) { events = type; data.ptr = magic; } }; int create() { if ((fd_ = epoll_create(1)) == -1) throw EXCEPTION(); return fd_; } void registerEvent(int fd, Event &event) { if (epoll_ctl(fd_, EPOLL_CTL_ADD, fd, &event) == -1) throw EXCEPTION(); } void modifyEvent(int fd, Event &event) { if (epoll_ctl(fd_, EPOLL_CTL_MOD, fd, &event) == -1) throw EXCEPTION(); } void unregisterEvent(int fd) { if (epoll_ctl(fd_, EPOLL_CTL_DEL, fd, NULL) == -1) throw EXCEPTION(); } int waitEvent(Event *events, int size, int msec) { int ret; assert(events != NULL); RETRY: if ((ret = epoll_wait(fd_, events, size, msec == -1 ? NULL : msec)) == -1) { if (errno == EINTR) goto RETRY; else throw EXCEPTION(); } return ret; }};#include <iostream>using std::cout;using std::endl;class Epoll2 : public File{public: typedef void* (*Callback)(epoll_event &event, void *);protected: struct Event : public epoll_event { Callback func_; void *param_; Event() : func_(NULL), param_(NULL) { events = EPOLLERR; data.u64 = 0; } Event(unsigned int type) : func_(NULL), param_(NULL) { events = EPOLLERR | type; data.u64 = 0; } Event(unsigned int type, Callback func, void *param) : func_(func), param_(param) { events = EPOLLERR | type; data.u64 = 0; } }; typedef std::map<int, Event> Events; Events events_; epoll_event happens_[10]; int timeout_; Callback func_; void *param_;public: Epoll2() : timeout_(-1), func_(NULL), param_(NULL) { assert(sizeof(Events::iterator) == sizeof(void*)); fd_ = epoll_create(10); } Epoll2(int msec, Callback func) : timeout_(msec), func_(NULL), param_(NULL) { assert(sizeof(Events::iterator) == sizeof(void*)); fd_ = epoll_create(10); } ~Epoll2() { } void registerEvent(int fd, int option, Callback func, void *param) { Event event(option, func, param); std::pair<Events::iterator, bool> ret = events_.insert(std::pair<int, Event>(fd, event)); //ret.first->second.data.ptr = (void *)ret.first._M_node; bcopy(&ret.first, &ret.first->second.data.ptr, sizeof(void*)); if (epoll_ctl(fd_, EPOLL_CTL_ADD, fd, &ret.first->second) == -1) throw EXCEPTION(); } void setEventOption(int fd, int option) { Event *p = &events_[fd]; p->events = option | EPOLLERR; if (epoll_ctl(fd_, EPOLL_CTL_MOD, fd, p) == -1) throw EXCEPTION(); } void setEventOption(int fd, int option, Callback func, void *param) { Event *p = &events_[fd]; p->events = option | EPOLLERR; p->func_ = func; p->param_ = param; if (epoll_ctl(fd_, EPOLL_CTL_MOD, fd, p) == -1) throw EXCEPTION(); } void addEventOption(int fd, int option) { Event *p = &events_[fd]; p->events |= option; if (epoll_ctl(fd_, EPOLL_CTL_MOD, fd, p) == -1) throw EXCEPTION(); } void addEventOption(int fd, int option, Callback func, void *param) { Event *p = &events_[fd]; p->events |= option; p->func_ = func; p->param_ = param; if (epoll_ctl(fd_, EPOLL_CTL_MOD, fd, p) == -1) throw EXCEPTION(); } void clrEventOption(int fd, int option, Callback func, void *param) { Event *p = &events_[fd]; p->events &= ~option; p->func_ = func; p->param_ = param; if (epoll_ctl(fd_, EPOLL_CTL_MOD, fd, p) == -1) throw EXCEPTION(); } void clrEventOption(int fd, int option) { Event *p = &events_[fd]; p->events &= ~option; if (epoll_ctl(fd_, EPOLL_CTL_MOD, fd, p) == -1) throw EXCEPTION(); } void unregisterEvent(int fd) { events_.erase(fd); if (epoll_ctl(fd_, EPOLL_CTL_DEL, fd, NULL) == -1) throw EXCEPTION(); } void setTimeout(int msec, Callback func, void *param) { timeout_ = msec; func_ = func; param_ = param; } bool run() { int ret; RETRY: if ((ret = epoll_wait(fd_, happens_, 10, timeout_)) == -1) { if (errno == EINTR) goto RETRY; else throw EXCEPTION(); } for (int i = 0; i < ret; ++i) { Events::iterator it; bcopy(&happens_[i].data.ptr, &it, sizeof(void *)); //it._M_node = (std::_Rb_tree_node_base*)happens_[i].data.ptr; if (it->second.func_ != NULL) it->second.func_(happens_[i], it->second.param_); if (happens_[i].events & EPOLLERR) throw EXCEPTION(); } if (ret == 0 && func_ != NULL) func_(happens_[0], param_); return !events_.empty(); }};}; // namespace unp#endif /* _UNP_EPOLL_H_ */