首页 > 代码库 > 2高并发服务器:多路IO之poll

2高并发服务器:多路IO之poll



1 poll

A依赖的头文件

#include <poll.h>

B函数声明

int poll(struct pollfd *fds, nfds_t nfds,int timeout);

 

struct pollfd {

int fd; /*文件描述符*/

short events; /*监控的事件*/

short revents; /*监控事件中满足条件返回的事件*/

};

POLLIN普通或带外优先数据可读,POLLRDNORM |POLLRDBAND

POLLRDNORM-数据可读

POLLRDBAND-优先级带数据可读

POLLPRI高优先级可读数据

 

POLLOUT普通或带外数据可写

POLLWRNORM-数据可写

POLLWRBAND-优先级带数据可写

 

POLLERR发生错误

POLLHUP发生挂起

POLLNVAL描述字不是一个打开的文件

 

nfds监控数据中有多少文件描述符需要被监控

timeout毫秒级等待

-1:阻塞等,#defineINFTIM -1 Linux中没有定义此宏

0:立即返回,不阻塞进程

>0:等待指定毫秒数,如当前系统时间精度不够毫秒,向上取值

 

如果不再监控某个文件描述符时,可以把pollfd中,fd设置为-1poll不再监控此pollfd,下次返回时,把revents设置为0

   ppollGNU定义了ppoll(POSIX标准),可以支持设置信号屏蔽字,大家可参考poll模型自行实现C/S

#define _GNU_SOURCE /* Seefeature_test_macros(7) */

#include <poll.h>

int ppoll(struct pollfd *fds, nfds_t nfds,

const struct timespec *timeout_ts, constsigset_t *sigmask);

 

案例说明:

Server.c

#include<stdio.h>

#include<stdlib.h>

#include<string.h>

#include<netinet/in.h>

#include<arpa/inet.h>

#include<poll.h>

#include<errno.h>

#include<ctype.h>

#include"wrap.h"

 

#define MAXLINE 80

#define SERV_PORT 8000

#define OPEN_MAX 1024

 

int main(void)

{

   int i, j, maxi, listenfd, connfd, sockfd;

   int nready;

   ssize_t n;

   char buf[MAXLINE], str[INET_ADDRSTRLEN];

   socklen_t clilen;

   struct pollfd client[OPEN_MAX];

   struct sockaddr_in cliaddr, servaddr;

   listenfd = Socket(AF_INET, SOCK_STREAM, 0);

   bzero(&servaddr, sizeof(servaddr));

   servaddr.sin_family = AF_INET;

   servaddr.sin_addr.s_addr = htonl(INADDR_ANY);

   servaddr.sin_port = htons(SERV_PORT);

   Bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));

   Listen(listenfd, 20);

   client[0].fd = listenfd;

   client[0].events = POLLRDNORM; /* listenfd监听普通读事件*/

   for (i = 1; i < OPEN_MAX; i++)

       client[i].fd = -1; /* -1初始化client[]里剩下元素*/

   maxi = 0; /* client[]数组有效元素中最大元素下标*/

   for ( ; ; ) {

       nready = poll(client, maxi+1, -1); /* 阻塞*/

       if (client[0].revents & POLLRDNORM) { /* 有客户端链接请求*/

           clilen = sizeof(cliaddr);

           connfd = Accept(listenfd, (struct sockaddr *)&cliaddr, &clilen);

           printf("received from %s at PORT %d\n",

                   inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)),

                   ntohs(cliaddr.sin_port));

           for (i = 1; i < OPEN_MAX; i++)

               if (client[i].fd < 0) {

                   client[i].fd = connfd; /* 找到client[]中空闲的位置,存放accept返回的connfd */

                   break;

               }

           if (i == OPEN_MAX)

               perr_exit("too many clients");

           client[i].events = POLLRDNORM; /* 设置刚刚返回的connfd,监控读事件*/

           if (i > maxi)

               maxi = i; /* 更新client[]中最大元素下标*/

           if (--nready <= 0)

               continue; /* 没有更多就绪事件时,继续回到poll阻塞*/

       }

       for (i = 1; i <= maxi; i++) { /* 检测client[] */

           if ( (sockfd = client[i].fd) < 0)

               continue;

           if (client[i].revents & (POLLRDNORM | POLLERR)) {

               if ( (n = Read(sockfd, buf, MAXLINE)) < 0) {

                   if (errno == ECONNRESET) { /* 当收到RST标志时*/

                       /* connection reset by client */

                       printf("client[%d] aborted connection\n", i);

                       Close(sockfd);

                       client[i].fd = -1;

                   } else

                       perr_exit("read error");

               } else if (n == 0) {

                   /* connection closed by client */

                   printf("client[%d] closed connection\n", i);

                   Close(sockfd);

                   client[i].fd = -1;

               } else {

                   for (j = 0; j < n; j++)

                       buf[j] = toupper(buf[j]);

                   Writen(sockfd, buf, n);

               }

               if (--nready <= 0)

                   break; /* no more readable descriptors */

           }

       }

   }

   return 0;

}

Client.c

#include<stdio.h>

#include<string.h>

#include<unistd.h>

#include<arpa/inet.h>

#include<netinet/in.h>

#include"wrap.h"

 

#define MAXLINE 80

#define SERV_PORT 8000

 

int main(void)

{

   struct sockaddr_in servaddr;

   char buf[MAXLINE];

   int sockfd,n;

 

   sockfd = Socket(AF_INET,SOCK_STREAM,0);

 

   bzero(&servaddr,sizeof(servaddr));

   servaddr.sin_family = AF_INET;

   inet_pton(AF_INET,"127.0.0.1",&servaddr.sin_addr);

   servaddr.sin_port = htons(SERV_PORT);

 

   Connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr));

 

   while(fgets(buf,MAXLINE,stdin) != NULL) {

       Write(sockfd,buf,strlen(buf));

       n = Read(sockfd,buf,MAXLINE);

       if(n==0) {

           printf("the other side has been closed\n");

       } else {

           Write(STDOUT_FILENO,buf,n);

       }

   }

 

   Close(sockfd);

   return 0;

}

Wrap.h

#ifndef __WRAP_H_

#define __WRAP_H_

 

void perr_exit(const char *s);

int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr);

void Bind(int fd, const struct sockaddr *sa, socklen_t salen);

void Connect(int fd, const struct sockaddr *sa, socklen_t salen);

void Listen(int fd, int backlog);

int Socket(int family, int type, int protocol);

ssize_t Read(int fd, void *ptr, size_t nbytes);

ssize_t Write(int fd, const void *ptr, size_t nbytes);

void Close(int fd);

ssize_t Readn(int fd, void *vptr, size_t n);

ssize_t Writen(int fd, const void *vptr, size_t n);

static ssize_t my_read(int fd, char *ptr);

ssize_t Readline(int fd, void *vptr, size_t maxlen);

 

#endif

Wrap.c

#include <stdlib.h>

#include <errno.h>

#include <sys/socket.h>

 

void perr_exit(const char *s)

{

        perror(s);

        exit(1);

}

 

int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr)

{

        int n;

 

again:

        if ( (n = accept(fd, sa, salenptr)) < 0) {

                  if ((errno == ECONNABORTED) || (errno == EINTR))

                           goto again;

                  else

                           perr_exit("accept error");

        }

        return n;

}

 

void Bind(int fd, const struct sockaddr *sa, socklen_t salen)

{

        if (bind(fd, sa, salen) < 0)

                  perr_exit("bind error");

}

 

void Connect(int fd, const struct sockaddr *sa, socklen_t salen)

{

        if (connect(fd, sa, salen) < 0)

                  perr_exit("connect error");

}

 

void Listen(int fd, int backlog)

{

        if (listen(fd, backlog) < 0)

                  perr_exit("listen error");

}

 

int Socket(int family, int type, int protocol)

{

        int n;

 

        if ( (n = socket(family, type, protocol)) < 0)

                  perr_exit("socket error");

        return n;

}

 

ssize_t Read(int fd, void *ptr, size_t nbytes)

{

        ssize_t n;

 

again:

        if ( (n = read(fd, ptr, nbytes)) == -1) {

                  if (errno == EINTR)

                           goto again;

                  else

                           return -1;

        }

        return n;

}

 

ssize_t Write(int fd, const void *ptr, size_t nbytes)

{

        ssize_t n;

 

again:

        if ( (n = write(fd, ptr, nbytes)) == -1) {

                  if (errno == EINTR)

                           goto again;

                  else

                           return -1;

        }

        return n;

}

 

void Close(int fd)

{

        if (close(fd) == -1)

                  perr_exit("close error");

}

ssize_t Readn(int fd, void *vptr, size_t n)

{

        size_t  nleft;

        ssize_t nread;

        char   *ptr;

 

        ptr = vptr;

        nleft = n;

        while (nleft > 0) {

                  if ( (nread = read(fd, ptr, nleft)) < 0) {

                           if (errno == EINTR)

                                    nread = 0;

                           else

                                    return -1;

                  } else if (nread == 0)

                           break;

 

                  nleft -= nread;

                  ptr += nread;

        }

        return n - nleft;

}

 

ssize_t Writen(int fd, const void *vptr, size_t n)

{

        size_t nleft;

        ssize_t nwritten;

        const char *ptr;

 

        ptr = vptr;

        nleft = n;

        while (nleft > 0) {

                  if ( (nwritten = write(fd, ptr, nleft)) <= 0) {

                           if (nwritten < 0 && errno == EINTR)

                                    nwritten = 0;

                           else

                                    return -1;

                  }

 

                  nleft -= nwritten;

                  ptr += nwritten;

        }

        return n;

}

static ssize_t my_read(int fd, char *ptr)

{

        static int read_cnt;

        static char *read_ptr;

        static char read_buf[100];

 

        if (read_cnt <= 0) {

again:

                  if ( (read_cnt = read(fd, read_buf, sizeof(read_buf))) < 0) {

                           if (errno == EINTR)

                                    goto again;

                           return -1;

                  } else if (read_cnt == 0)

                           return 0;

                  read_ptr = read_buf;

        }

        read_cnt--;

        *ptr = *read_ptr++;

        return 1;

}

 

ssize_t Readline(int fd, void *vptr, size_t maxlen)

{

        ssize_t n, rc;

        char    c, *ptr;

 

        ptr = vptr;

        for (n = 1; n < maxlen; n++) {

                  if ( (rc = my_read(fd, &c)) == 1) {

                           *ptr++ = c;

                           if (c  == ‘\n‘)

                                    break;

                  } else if (rc == 0) {

                           *ptr = 0;

                           return n - 1;

                  } else

                           return -1;

        }

        *ptr  = 0;

        return n;

}

 

2高并发服务器:多路IO之poll