首页 > 代码库 > Linux客户/服务器程序设计范式2——并发服务器(进程池)

Linux客户/服务器程序设计范式2——并发服务器(进程池)

引言

让服务器在启动阶段调用fork创建一个子进程池,通过子进程来处理客户端请求。子进程与父进程之间使用socketpair进行通信(为了方便使用sendmsg与recvmsg,如果使用匿名管道,则无法使用以上两个函数)。以下针对TCP进行分析。

server端使用select轮询用于监听客户端请求的被动套接字fd_listen以及用于父子之间通信的socketpair。每当客户端有请求时,server端会将由accept返回的用于与客户端通信的socket描述符通过socketpair发送给一个空闲的子进程,由子进程与客户端进行通信(处理请求)。因此服务器端需要维护一个子进程队列,队列中的每个元素存放着与子进程通信的socketpair以及标记子进程是否空闲的标志位,如下:

 

1 typedef struct tag_chd2 {3     int s_sfd ;     //与子进程通信的socketpair描述符4     int s_state ;   //标记子进程是否空闲5 }NODE, *pNODE;

每当子进程处理完客户端请求时,会通过socketpair向server端发送消息,server端select到该socketpair后,会将对应子进程标志位设置为空闲。

 

注意

1. 由于父进程是先创建子进程,之后才accept用于与客户端通信的socket描述符fd_client,因此子进程的pcb中并没有fd_client的信息。server端需要将fd_client发送子进程。如果只是用send来发送fd_client信息的话,子进程只会将其当成一个整型数。我们需要用sendmsg将fd_client连同其辅助(控制)信息一并发送,这样子进程才会将其当成一个socket描述符。

 

2. 父进程预先创建子进程池,该子进程如同server端一样是永远不会退出的。子进程中使用while死循环,如下:

1 while(1)2     {3         readn = read(sfd, &flag, 4);                // 服务器分配的子进程在子进程队列中的下标4         printf("readn: %d \n", readn);             5         printf("read from father: %d \n", flag);6         recv_fd(sfd, &fd_client);                   // recv_fd中封装了recvmsg,接收与客户端通信的socket描述符7         handle_request(fd_client);                  // 处理客户端请求8         write(sfd, &pid, sizeof(pid));              // 处理完请求后通过socketpair通知服务器,服务器将该子进程状态设置为空闲9     }

每当子进程处理完一个客户端请求后(也就是客户端退出了),子进程会阻塞在 read 处,等待接收下一个客户端请求。

由于是while死循环,且死循环中没有break语句,因此子进程不可能跳出这个while循环,也就不会执行while循环以下的内容了,这样可以保证子进程结尾没有exit也不会执行之后的内容。

 

函数原型

 

 1 #include <sys/types.h> 2  #include <sys/socket.h> 3  4  ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags); 5  ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags); 6  7  struct msghdr { 8                void         *msg_name;       /* optional address */ 9                socklen_t     msg_namelen;    /* size of address */10                struct iovec *msg_iov;        /* scatter/gather array */11                size_t        msg_iovlen;     /* # elements in msg_iov */12                void         *msg_control;    /* ancillary data, see below */13                socklen_t     msg_controllen; /* ancillary data buffer len */14                int           msg_flags;      /* flags on received message */15            };16 17  18  19  fields:20  21  struct iovec {                    /* Scatter/gather array items */22                void  *iov_base;              /* Starting address */23                size_t iov_len;               /* Number of bytes to transfer */24            };25            26  struct cmsghdr {27            socklen_t cmsg_len;    /* data byte count, including header */28            int       cmsg_level;  /* originating protocol */ /* 如果是文件描述符,填SOL_SOCKET */29            int       cmsg_type;   /* protocol-specific type */ /* 如果是文件描述符,填SCM_RIGHTS */30            /* followed by unsigned char cmsg_data[]; */31        };32  33  /* 返回cmsghdr结构的cmsg_len成员的值,考虑到对齐,使用数据部分的长度作为参数。*/      34  size_t CMSG_LEN(size_t length);  35  /* 返回cmsghdr的数据部分指针。*/36  unsigned char *CMSG_DATA(struct cmsghdr *cmsg); 37 38  CMSG_DATA() returns a pointer to the data portion of a cmsghdr.39  CMSG_LEN()  returns  the  value  to store in the cmsg_len member of the cmsghdr structure, taking into40              account any necessary alignment.  It takes the data length as an argument. 41              This is a constant expression.
 1 NAME 2        socketpair - create a pair of connected sockets 3  4 SYNOPSIS 5        #include <sys/types.h>          /* See NOTES */ 6        #include <sys/socket.h> 7  8        int socketpair(int domain, int type, int protocol, int sv[2]); 9 RETURN VALUE10        On  success,  zero is returned.  On error, -1 is returned, and errno is11        set appropriately.

 

代码

server.h

 1 #ifndef __SERVER_H__ 2 #define __SERVER_H__ 3 #include "my_socket.h" 4 #include <sys/stat.h> 5 #include <sys/types.h> 6 #include <fcntl.h> 7 #include <sys/time.h> 8 #include <sys/select.h> 9 #include <sys/uio.h>10 #include <sys/wait.h>11 #include <errno.h>12 #define SER_IP "127.0.0.1"13 #define SER_PORT 888814 #define ST_BUSY 115 #define ST_IDLE 216 #define SIZE 819217 #define MSG_SIZE (SIZE - 4)18 19 typedef struct tag_mag20 {21     int msg_len ;22     char msg_buf[MSG_SIZE];//818823 }MSG, *pMSG;24 25 typedef struct tag_chd26 {27     int s_sfd ;28     int s_state ;29 }NODE, *pNODE;30 31 extern int errno ;32 void make_child(pNODE arr, int cnt);33 void child_main(int sfd) ;34 void handle_request(int sfd);35 void send_fd(int sfd, int fd_file) ;36 void recv_fd(int sfd, int* fd_file) ;37 void dispatch(pNODE arr, int cnt, int fd_client);38 #endif

main.c

 1 #include "server.h" 2 int main(int argc, char* argv[])//exe chld_cnt 3 { 4     if(argc != 2) 5     { 6         printf("Usage: exe , child_cnt! \n"); 7         exit(1); 8     } 9     int child_cnt = atoi(argv[1]);10     pNODE arr_child = (pNODE)calloc(child_cnt, sizeof(NODE)) ; /* 动态数组维护子进程池 */11     make_child(arr_child, child_cnt);12     13     int fd_listen, fd_client ;14     my_socket(&fd_listen, MY_TCP, SER_IP, SER_PORT);15     my_listen(fd_listen, 10);16     17     fd_set readset, readyset ;18     FD_ZERO(&readset);19     FD_ZERO(&readyset);20     FD_SET(fd_listen, &readset);21     int index ;22     for(index = 0; index < child_cnt; index ++)23     {24         FD_SET(arr_child[index].s_sfd, &readset);25     }26     27     int select_ret ;28     struct timeval tm ;29     while(1)30     {31         tm.tv_sec = 0 ;32         tm.tv_usec = 1000 ;33         readyset = readset ;34         select_ret = select(1024, &readyset, NULL, NULL, &tm);35         if(select_ret == 0)        /* 轮询时间内,所有描述符均没有活动,返回0,继续轮询 */36         {37             continue ;38         }else if(select_ret == -1) /* 信号 */39         {40             if(errno == EINTR)41             {42                 continue ;43             }else 44             {45                 exit(1);46             }47         }else 48         {49             if(FD_ISSET(fd_listen, &readyset))50             {51             fd_client = accept(fd_listen, NULL, NULL) ;    52             dispatch(arr_child, child_cnt ,fd_client);53             close(fd_client);54             }55             for(index = 0; index < child_cnt; index ++)56             {57                 if(FD_ISSET(arr_child[index].s_sfd, &readyset))58                 {59                     int val ;60                     read(arr_child[index].s_sfd, &val, 4);61                     arr_child[index].s_state = ST_IDLE ;62                 }63             }64             65         }66         67     }   68 }

server.c

 

  1 #include "server.h"  2 void make_child(pNODE arr, int cnt)  3 {  4     int index ;   5     for(index = 0; index < cnt; index ++)  6     {  7         pid_t pid ;  8         int fds[2] ;//fds[0] - c  fds[1] - p  9         socketpair(AF_LOCAL, SOCK_STREAM, 0, fds); 10         pid = fork() ; 11         if(pid == 0)// child 12         { 13             close(fds[1]);         /* 子进程用fds[0],关闭fds[1] */ 14             child_main(fds[0]) ;   /* 每创建一个子进程,子进程就进入该函数中(死循环),接收请求,处理请求,如此循环。*/ 15  16         }else  17         { 18             /* 初始化进程池队列中的每一个子进程 */ 19             arr[index].s_sfd = fds[1] ; 20             arr[index].s_state = ST_IDLE ; 21             close(fds[0]);         /* 父进程用fds[1], 关闭fds[0] */ 22         } 23  24     } 25  26 } 27 void child_main(int sfd) 28 { 29     int fd_client ; 30     int flag ; 31     int readn ; 32     pid_t pid = getpid(); 33     while(1) 34     { 35         readn = read(sfd, &flag, 4); 36         printf("readn: %d \n", readn); 37         printf("read from father: %d \n", flag); 38         recv_fd(sfd, &fd_client); 39         handle_request(fd_client); 40         write(sfd, &pid, sizeof(pid)); 41     } 42 } 43 void handle_request(int sfd) 44 {     45  46     MSG my_msg ; 47     int recvn ; 48     while(1) 49     { 50         memset(&my_msg, 0, sizeof(MSG)); 51         my_recv(&recvn, sfd, &my_msg, 4); 52         if(my_msg.msg_len  == 0) 53         { 54             break ; 55         } 56         my_recv(NULL, sfd, my_msg.msg_buf, my_msg.msg_len); 57         my_send(NULL, sfd, &my_msg, my_msg.msg_len + 4); 58  59     } 60  61 } 62 void send_fd(int sfd, int fd_file)  63 { 64     struct msghdr my_msg ; 65     memset(&my_msg, 0, sizeof(my_msg)); 66      67     struct iovec bufs[1] ; 68     char buf[32] = "hello world ! \n"; 69     bufs[0].iov_base = buf ; 70     bufs[0].iov_len = strlen(buf) ; 71      72     my_msg.msg_name = NULL ; 73     my_msg.msg_namelen = 0 ; 74     my_msg.msg_iov = bufs ; 75     my_msg.msg_iovlen = 1 ; 76     my_msg.msg_flags = 0 ; 77  78     struct cmsghdr *p  ; 79     int cmsg_len = CMSG_LEN(sizeof(int)) ;     /* 所传为文件描述符,因此sizeof(int) */ 80     p = (struct cmsghdr*)calloc(1, cmsg_len) ; 81     p -> cmsg_len = cmsg_len ; 82     p -> cmsg_level = SOL_SOCKET ; 83     p -> cmsg_type = SCM_RIGHTS ; 84     *(int*)CMSG_DATA(p) = fd_file ; 85      86     my_msg.msg_control = p ; 87     my_msg.msg_controllen = cmsg_len ; 88      89     int sendn ; 90     sendn = sendmsg(sfd, &my_msg, 0); 91     printf("send masg len : %d \n", sendn); 92 } 93 void recv_fd(int sfd, int* fd_file)  94 { 95     struct msghdr my_msg ; 96      97     struct iovec bufs[1] ; 98     char buf1[32]="" ; 99     bufs[0].iov_base = buf1 ;100     bufs[0].iov_len = 31 ;101 102     my_msg.msg_name = NULL ;103     my_msg.msg_namelen = 0 ;104     my_msg.msg_iov = bufs ;105     my_msg.msg_iovlen = 2 ;106     my_msg.msg_flags = 0 ;107     108     struct cmsghdr *p  ;109     int cmsg_len = CMSG_LEN(sizeof(int)) ;110     p = (struct cmsghdr*)calloc(1, cmsg_len) ;111     my_msg.msg_control = p ;112     my_msg.msg_controllen = cmsg_len ;113     114     int recvn ;115     recvn = recvmsg(sfd, &my_msg, 0);116     117     *fd_file = *(int*)CMSG_DATA((struct cmsghdr*)my_msg.msg_control); //写成*(int*)CMSG_DATA(P)也可118     119     printf("buf1: %s, recv msg len : %d   \n", buf1, recvn);120 121 }122 void dispatch(pNODE arr, int cnt, int fd_client)123 {124     int index ;125     for(index = 0 ; index < cnt; index ++)126     {127         if(arr[index].s_state == ST_IDLE)128         {129             write(arr[index].s_sfd, &index, 4);130             send_fd(arr[index].s_sfd, fd_client); /* 向空闲的子进程分配任务,将服务器accept返回的socket描述符发送给子进程*/131             arr[index].s_state = ST_BUSY ;132             break ;133         }134     }135 }

client.c

 1 #include "my_socket.h" 2 #define MY_IP "127.0.0.1" 3 #define MY_PORT 6666 4 #define SER_IP "127.0.0.1" 5 #define SER_PORT 8888 6 #define SIZE 8192 7 #define MSG_SIZE (SIZE - 4) 8 typedef struct tag_mag//  9 {10     int msg_len ;11     char msg_buf[MSG_SIZE];//818812 }MSG, *pMSG;13 int main(int argc, char* argv[])14 {15     int sfd ;16     my_socket(&sfd, MY_TCP, MY_IP, atoi(argv[1]));17     my_connect(sfd, SER_IP, SER_PORT);18     MSG my_msg ;19     while(memset(&my_msg, 0, sizeof(MSG)), fgets(my_msg.msg_buf, MSG_SIZE, stdin)!= NULL)20     {21         my_msg.msg_len = strlen(my_msg.msg_buf);22         my_send(NULL, sfd, &my_msg, 4 + my_msg.msg_len );23         memset(&my_msg, 0, sizeof(MSG));24         my_recv(NULL, sfd, &my_msg, 4);25         my_recv(NULL, sfd, &my_msg.msg_buf, my_msg.msg_len);26         printf("recv from server : %s \n", my_msg.msg_buf);27     28     }29     /* 客户端退出时,向服务器发送一个长度为0的消息 ,用于通知服务器退出 */30     memset(&my_msg, 0, sizeof(MSG));31     my_send(NULL, sfd, &my_msg, 4 + my_msg.msg_len);32     close(sfd);33 34 }

 

Linux客户/服务器程序设计范式2——并发服务器(进程池)