首页 > 代码库 > 基于多进程的网络聊天程序
基于多进程的网络聊天程序
参考:linux高性能服务器编程,作者:游双
程序简介:该程序用了共享内存来实现进程间的同步,由于只是同时读取共享内存,所以没有用到锁。该程序的功能是服务器监听网络连接,当有一个客户端连接时,服务器创建一个子进程处理该连接。每个子进程只负责自己的客户端以及和父进程通信。当子进程从客户端读取数据后,把数据放到共享内存上,每个子进程在共享内存上有自己的一段空间,因此不会出现同时写。放上去后通知父进程,说:共享内存上有新数据到达了,然后父进程通知其他子进程,去到该位置读取数据,把数据发送到自己的客户端,实现了群聊的效果。该程序对于多进程编程的初学者是个不错的例子,写下来是为了让自己熟悉一下。
服务器代码:编译的时候需要加上 -lrt选项
#include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <signal.h> #include <sys/wait.h> #include <sys/mman.h> #include <sys/stat.h> #include <fcntl.h> #define USER_LIMIT 5 #define BUFFER_SIZE 1024 #define FD_LIMIT 65535 #define MAX_EVENT_NUMBER 1024 #define PROCESS_LIMIT 65536 /* 处理一个客户端连接的必要数据 */ struct client_data { sockaddr_in address; int connfd; /* 客户端的fd */ pid_t pid; /* 处理这个连接的子进程的pid */ int pipefd[2]; /* 和父进程通信用的管道 */ }; int sig_pipefd[2];//当有信号发生时,用于父进程自己的通信 char* share_mem; int user_count = 0; //当前客户的数量 client_data* users = 0 ; int* sub_process = 0; static const char* shm_name = "/my_share_memory"; int maxevents = 100; bool stop_child = false; void setnonblock(int fd) { int flag = fcntl(fd,F_GETFL); assert(flag != -1); fcntl(fd,F_SETFL,flag | O_NONBLOCK); } void addfd(int epollfd,int fd) { epoll_event ee; ee.data.fd = fd; ee.events = EPOLLIN | EPOLLET; epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ee); setnonblock(fd); } void sig_handler(int sig) { int save_errno = errno; int msg = sig; send(sig_pipefd[1],(char*)&msg,1,0); errno = save_errno;//恢复错误值 } void child_sig_handler(int sig) { stop_child = true; } void addsig(int sig,void (*handler)(int),bool restart = true) { struct sigaction sa; memset(&sa,'\0',sizeof(sa)); sa.sa_handler = handler; if(restart)sa.sa_flags |= SA_RESTART; sigfillset(&sa.sa_mask);//作用? assert(sigaction(sig,&sa,NULL) != -1); } /* 子进程的处理函数,idx表示该子进程处理的客户端连接的编号,users表示所有客户端连接数据的数组,share_mem表示共享内存的起始地址 */ int run_child(int idx,client_data* users,char* share_mem) { int connfd = users[idx].connfd; int pipefd = users[idx].pipefd[1]; int epollfd = epoll_create(100);//子进程的事件处理函数 assert(epollfd != -1); addfd(epollfd,connfd);//与客户端通信 addfd(epollfd,pipefd);//与父进程通信 addsig(SIGTERM,child_sig_handler,false); epoll_event events[maxevents]; int ret; while(!stop_child) { int number = epoll_wait(epollfd,events,maxevents,-1); if(number < 0 && errno != EINTR) { printf("epoll error\n"); break; } int i; for(i = 0;i < number;i++) { int sockfd = events[i].data.fd; if(sockfd == connfd && (events[i].events & EPOLLIN))//客户端发来数据 { memset(share_mem+idx*BUFFER_SIZE,'\0',BUFFER_SIZE); /* 将客户端数据读取到对应的读缓存中,该读缓存是共享内存的一段 */ ret = recv(sockfd,share_mem+idx*BUFFER_SIZE,BUFFER_SIZE-1,0); if(ret < 0 && errno != EAGAIN) { printf("recv error\n"); stop_child = true; } else if(ret == 0) { printf("client close\n"); stop_child = true; } else { send(pipefd,(char*)&idx,sizeof(idx),0);//告诉父进程,“我”收到数据了 } } /* 父进程通知"我"将第client个客户端的数据发送到我负责的客户端 */ else if(sockfd == pipefd && (events[i].events & EPOLLIN)) { int client = 0; ret = recv(sockfd,(char*)&client,sizeof(client),0); if(ret < 0 && errno != EAGAIN)stop_child = true; else if(ret == 0) stop_child = true; else { send(connfd,share_mem+client*BUFFER_SIZE,BUFFER_SIZE,0); } } } } close(connfd); close(pipefd); close(epollfd); return 0; } int main(int argc,char* argv[]) { if(argc != 3) { printf("usage %s server_ip server_port \n",basename(argv[0])); return -1; } sockaddr_in server; server.sin_family = AF_INET; inet_pton(AF_INET,argv[1],&server.sin_addr); server.sin_port = htons(atoi(argv[2])); int listenfd = socket(AF_INET,SOCK_STREAM,0); assert(listenfd != -1); int opt = 1; int ret = setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt)); assert(ret == 0); ret = bind(listenfd,(const sockaddr*)&server,sizeof(server)); assert(ret != -1); ret = listen(listenfd,100); assert(ret != -1); /* 初始化连接池 */ user_count = 0; users = new client_data[USER_LIMIT+1]; sub_process = new int[PROCESS_LIMIT]; int i; for(i = 0; i < PROCESS_LIMIT;++i) { sub_process[i] = -1; } /* epoll的初始化 */ int epollfd = epoll_create(100); assert(epollfd != -1); addfd(epollfd,listenfd);//监听网络连接端口 ret = socketpair(AF_UNIX,SOCK_STREAM,0,sig_pipefd);//当有信号发生时,用于父进程自己的通信 assert(ret != -1); setnonblock(sig_pipefd[1]);//UNIX域套接字的0号端口用于信号处理函数 addfd(epollfd,sig_pipefd[0]);//主进程监听UNIX域套接字的1号端口 /* 设置信号处理函数 */ addsig(SIGCHLD,sig_handler); addsig(SIGPIPE,SIG_IGN); addsig(SIGINT,sig_handler); addsig(SIGTERM,sig_handler); /* 创建共享内存,用于所有客户socket连接的读缓存 */ int shmfd = shm_open(shm_name,O_CREAT|O_RDWR,0666); assert(shmfd != -1); ret = ftruncate(shmfd,USER_LIMIT*BUFFER_SIZE);//设置shmfd的大小 assert(ret != -1); share_mem = (char*)mmap(NULL,USER_LIMIT*BUFFER_SIZE,PROT_READ|PROT_WRITE,MAP_SHARED,shmfd,0); assert(share_mem != MAP_FAILED); close(shmfd); /* 进入epoll事件循环 */ bool stop_server = false; bool terminate = false; epoll_event events[maxevents]; while(!stop_server) { int number = epoll_wait(epollfd,events,maxevents,-1); if(number < 0 && errno != EINTR) { printf("epoll error\n"); break; } for(i = 0;i < number;i++) { int sockfd = events[i].data.fd; /* 新的客户连接 */ if(sockfd == listenfd) { sockaddr_in client; socklen_t clilen = sizeof(client); int connfd = accept(listenfd,(struct sockaddr*)&client,&clilen); if(connfd < 0) { printf("accept error\n"); continue; } if(user_count >= USER_LIMIT) { const char* info = "to many users\n"; printf("%s\n",info); send(connfd,info,strlen(info),0); close(connfd); continue; } /* 保存第user_count 个客户连接的相关数据 */ users[user_count].address = client; users[user_count].connfd = connfd; ret = socketpair(AF_UNIX,SOCK_STREAM,0,users[user_count].pipefd); assert( ret != -1); pid_t pid = fork(); if(pid < 0) { close(connfd); continue; } if(pid == 0)//子进程 { close(sig_pipefd[0]); close(sig_pipefd[1]); close(users[user_count].pipefd[0]);//子进程关闭0端口 close(listenfd); close(epollfd); run_child(user_count,users,share_mem);//子进程的处理函数 munmap((void*)share_mem,USER_LIMIT*BUFFER_SIZE); exit(0); } else //父进程 { close(users[user_count].pipefd[1]);//父进程关闭1端口 close(connfd); addfd(epollfd,users[user_count].pipefd[0]); users[user_count].pid = pid; sub_process[pid] = user_count; user_count ++; } } /* 处理信号事件 */ else if(sockfd ==sig_pipefd[0] && events[i].events & EPOLLIN) { int sig; char signals[1024]; ret = recv(sockfd,signals,sizeof(signals),0); if(ret < 0 && ret != EAGAIN) { printf("recv error\n"); continue; } if(ret == 0)continue; for(i = 0; i < ret; ++ i) { switch(signals[i]) { case SIGCHLD : //子进程关闭 { pid_t pid; int status; while((pid = waitpid(-1,&status,WNOHANG)) > 0) { /* 用子进程的pid获取被关闭的客户端连接的编号 */ int del_user = sub_process[pid]; sub_process[del_user] = -1; /* 清楚第del_user个客户连接使用的相关数据 */ epoll_ctl(epollfd,EPOLL_CTL_DEL,users[del_user].pipefd[0],0); close(users[del_user].pipefd[0]); /* 把最后一个客户连接的信息移动到该位置,用于保证0~user_count-1直接的连接都是活着的 */ users[del_user] = users[--user_count]; sub_process[users[del_user].pid] = del_user; } if(terminate && user_count == 0) stop_server = true; break; } case SIGINT : case SIGTERM : //结束服务器进程 { printf("kill all the child now\n"); for(i = 0 ;i < user_count;++i) { pid_t pid = users[i].pid; kill(pid,SIGTERM); } terminate = true;//此处不是stop_sever是为了等待所有子进程结束后再结束 break; } default : break; } } } /* 某个子进程向父进程写入了数据 */ else if(events[i].events & EPOLLIN) { int child; ret = recv(sockfd,(char*)&child,sizeof(child),0); if(ret < 0 && errno != EAGAIN) continue; else if(ret == 0)continue; printf("read data from child accross pipe\n"); for(i = 0 ;i < user_count;i++) { if(i != child) { printf("send data to child accross pipe\n"); send(users[i].pipefd[0],(char*)&child,sizeof(child),0); } } } } } close(listenfd); close(epollfd); close(sig_pipefd[0]); close(sig_pipefd[1]); shm_unlink(shm_name); delete[] users; delete[] sub_process; return 0; }
#define _GNU_SOURCE 1 #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <string.h> #include <stdlib.h> #include <poll.h> #include <fcntl.h> #define BUFFER_SIZE 64 int main( int argc, char* argv[] ) { if( argc <= 2 ) { printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); struct sockaddr_in server_address; bzero( &server_address, sizeof( server_address ) ); server_address.sin_family = AF_INET; inet_pton( AF_INET, ip, &server_address.sin_addr ); server_address.sin_port = htons( port ); int sockfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( sockfd >= 0 ); if ( connect( sockfd, ( struct sockaddr* )&server_address, sizeof( server_address ) ) < 0 ) { printf( "connection failed\n" ); close( sockfd ); return 1; } pollfd fds[2]; fds[0].fd = 0; fds[0].events = POLLIN; fds[0].revents = 0; fds[1].fd = sockfd; fds[1].events = POLLIN | POLLRDHUP; fds[1].revents = 0; char read_buf[BUFFER_SIZE]; int pipefd[2]; int ret = pipe( pipefd ); assert( ret != -1 ); while( 1 ) { ret = poll( fds, 2, -1 ); if( ret < 0 ) { printf( "poll failure\n" ); break; } if( fds[1].revents & POLLRDHUP ) { printf( "server close the connection\n" ); break; } else if( fds[1].revents & POLLIN ) { memset( read_buf, '\0', BUFFER_SIZE ); int len = recv( fds[1].fd, read_buf, BUFFER_SIZE-1, 0 ); int i; for(i = 0;i<len;i++)printf("%c",read_buf[i]); } if( fds[0].revents & POLLIN ) { ret = splice( 0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE ); ret = splice( pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE ); } } close( sockfd ); return 0; }
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。