首页 > 代码库 > kqueue演示样例

kqueue演示样例

网络server通常都使用epoll进行异步IO处理,而开发人员通常使用mac,为了方便开发。我把自己的handy库移植到了mac平台上。

移植过程中,网上竟然没有搜到kqueue的使用样例。让我吃惊不已。为了让大家不用像我一样再次花费大力气搞定kqueue,我整理了一个简单清晰可执行的kqueue样例,供大家參考。


kqueue一共同拥有几个函数:

int  kqueue(void); //相似epoll_create
int kevent(int kq, const struct kevent *changelist, int nchanges, struct kevent *eventlist, int nevents, const struct timespec *timeout); //兼具epoll_ctl及epoll_wait功能
EV_SET(&kev, ident, filter, flags, fflags, data, udata); //设定kevent參数的宏
struct kevent {
     uintptr_t       ident;          /* identifier for this event */
     int16_t         filter;         /* filter for event */
     uint16_t        flags;          /* general flags */
     uint32_t        fflags;         /* filter-specific flags */
     intptr_t        data;           /* filter-specific data */
     void            *udata;         /* opaque user data identifier */
};

函数调用演示样例:

    //创建kqueue
    int epollfd = kqueue();
    //加入或者改动fd
    struct kevent ev[2];
    int n = 0;
    if (events & kReadEvent) {
        EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, (void*)(intptr_t)fd);
    } else if (modify){
        EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, (void*)(intptr_t)fd);
    }
    if (events & kWriteEvent) {
        EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD|EV_ENABLE, 0, 0, (void*)(intptr_t)fd);
    } else if (modify){
        EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, (void*)(intptr_t)fd);
    }
    printf("%s fd %d events read %d write %d\n",
           modify ?

"mod" : "add", fd, events & kReadEvent, events & kWriteEvent); int r = kevent(efd, ev, n, NULL, 0, NULL); //获取ready的fd struct timespec timeout; timeout.tv_sec = waitms / 1000; timeout.tv_nsec = (waitms % 1000) * 1000 * 1000; const int kMaxEvents = 20; struct kevent activeEvs[kMaxEvents]; int n = kevent(efd, NULL, 0, activeEvs, kMaxEvents, &timeout); //处理IO事件 for (int i = 0; i < n; i ++) { int fd = (int)(intptr_t)activeEvs[i].udata; int events = activeEvs[i].filter; if (events == EVFILT_READ) { handleRead(efd, fd); } else if (events == EVFILT_WRITE) { handleWrite(efd, fd); } }

注意kevent与epoll最大的不同在于READ/WRITE事件是分开注冊而且分开返回的,而Epoll则是一个fd一次返回读和写事件,用标志位来推断。
能够执行的代码例如以下:kqueue-example(handy对kqueue提供了封装版本号)

#include <sys/socket.h>
#include <sys/event.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>

#define exit_if(r, ...) if(r) {printf(__VA_ARGS__); printf("error no: %d error msg %s\n", errno, strerror(errno)); exit(1);}

const int kReadEvent = 1;
const int kWriteEvent = 2;

void setNonBlock(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    exit_if(flags<0, "fcntl failed");
    int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
    exit_if(r<0, "fcntl failed");
}

void updateEvents(int efd, int fd, int events, bool modify) {
    struct kevent ev[2];
    int n = 0;
    if (events & kReadEvent) {
        EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, (void*)(intptr_t)fd);
    } else if (modify){
        EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, (void*)(intptr_t)fd);
    }
    if (events & kWriteEvent) {
        EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD|EV_ENABLE, 0, 0, (void*)(intptr_t)fd);
    } else if (modify){
        EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, (void*)(intptr_t)fd);
    }
    printf("%s fd %d events read %d write %d\n",
           modify ? "mod" : "add", fd, events & kReadEvent, events & kWriteEvent);
    int r = kevent(efd, ev, n, NULL, 0, NULL);
    exit_if(r, "kevent failed ");
}

void handleAccept(int efd, int fd) {
    struct sockaddr_in raddr;
    socklen_t rsz = sizeof(raddr);
    int cfd = accept(fd,(struct sockaddr *)&raddr,&rsz);
    exit_if(cfd<0, "accept failed");
    sockaddr_in peer, local;
    socklen_t alen = sizeof(peer);
    int r = getpeername(cfd, (sockaddr*)&peer, &alen);
    exit_if(r<0, "getpeername failed");
    printf("accept a connection from %s\n", inet_ntoa(raddr.sin_addr));
    setNonBlock(cfd);
    updateEvents(efd, cfd, kReadEvent|kWriteEvent, false);
}

void handleRead(int efd, int fd) {
    char buf[4096];
    int n = 0;
    while ((n=::read(fd, buf, sizeof buf)) > 0) {
        printf("read %d bytes\n", n);
        int r = ::write(fd, buf, n); //写出读取的数据
        //实际应用中。写出数据可能会返回EAGAIN,此时应当监听可写事件。当可写时再把数据写出
        exit_if(r<=0, "write error");
    }
    if (n<0 && (errno == EAGAIN || errno == EWOULDBLOCK))
        return;
    exit_if(n<0, "read error"); //实际应用中,n<0应当检查各类错误,如EINTR
    printf("fd %d closed\n", fd);
    close(fd);
}

void handleWrite(int efd, int fd) {
    //实际应用应当实现可写时写出数据,无数据可写才关闭可写事件
    updateEvents(efd, fd, kReadEvent, true);
}

void loop_once(int efd, int lfd, int waitms) {
    struct timespec timeout;
    timeout.tv_sec = waitms / 1000;
    timeout.tv_nsec = (waitms % 1000) * 1000 * 1000;
    const int kMaxEvents = 20;
    struct kevent activeEvs[kMaxEvents];
    int n = kevent(efd, NULL, 0, activeEvs, kMaxEvents, &timeout);
    printf("epoll_wait return %d\n", n);
    for (int i = 0; i < n; i ++) {
        int fd = (int)(intptr_t)activeEvs[i].udata;
        int events = activeEvs[i].filter;
        if (events == EVFILT_READ) {
            if (fd == lfd) {
                handleAccept(efd, fd);
            } else {
                handleRead(efd, fd);
            }
        } else if (events == EVFILT_WRITE) {
            handleWrite(efd, fd);
        } else {
            exit_if(1, "unknown event");
        }
    }
}

int main() {
    short port = 99;
    int epollfd = kqueue();
    exit_if(epollfd < 0, "epoll_create failed");
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    exit_if(listenfd < 0, "socket failed");
    struct sockaddr_in addr;
    memset(&addr, 0, sizeof addr);
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    addr.sin_addr.s_addr = INADDR_ANY;
    int r = ::bind(listenfd,(struct sockaddr *)&addr, sizeof(struct sockaddr));
    exit_if(r, "bind to 0.0.0.0:%d failed %d %s", port, errno, strerror(errno));
    r = listen(listenfd, 20);
    exit_if(r, "listen failed %d %s", errno, strerror(errno));
    printf("fd %d listening at %d\n", listenfd, port);
    setNonBlock(listenfd);
    updateEvents(epollfd, listenfd, kReadEvent, false);
    for (;;) { //实际应用应当注冊信号处理函数。退出时清理资源
        loop_once(epollfd, listenfd, 10000);
    }
    return 0;
}
<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

kqueue演示样例