首页 > 代码库 > 基于管道通知的百万并发长连接server模型
基于管道通知的百万并发长连接server模型
0、前言
最近突然想了解怎样设计一个支持百万连接的后台server架构。
要设计一个支持百万连接的后台server,我们首先要知道会有哪些因素限制后台server的高并发连接,这里想到的因素有以下几点:
1、操作系统的参数设置能否支持百万并发连接;
2、操作系统维持百万并发长连接需要多少内存;
3、应用层面上维持百万并发长连接需要多少内存;
4、百万并发长连接的吞吐量是否超过了硬件网卡的限制。
在学习的过程中,主要针对的是1、2、4,第3点一般跟业务相关,这里暂时没有考虑。
本篇文章估计需要多次才能完成,现在初步的想法是先写一个demo程序,然后后面再慢慢测试优化。
1、后台设计
1.1 后台设计图
如下为后台的设计结构:
1、首先主进程根据机器CPU个数,创建对应数量的管道;
2、创建完对应的管道之后,再创建一样数量的线程,每个线程绑定一个CPU;
3、主进程开始初始化socket,然后accept,当接收到一个客户端连接时,就把conn_fd写到某个pipe中;
3、每个线程创建epoll,然后监听对应pipe的写端fd,当监听到pipe中有数据时,就读取该数据,格式化为fd,将该fd加入epoll进行监听。
1.2 编码实现
根据1.1的设计,我们编写代码,包括server模块和worker模块。server模块负责创建pipe、线程、和监听客户端连接;worker模块负责处理每个客户端的连接。代码如下所示:
1.2.0 common
1 #ifndef _SERV_COMMON_H 2 #define _SERV_COMMON_H 3 4 typedef struct { 5 int id; 6 int fd; 7 } thread_arg; 8 9 #define SERV_PORT 987610 #define MAX_LINE 102411 12 #endif
1.2.1 worker
worker.h
1 #ifndef _SERV_WORKER_H2 #define _SERV_WORKER_H3 4 void *worker(void *arg);5 6 #endif
worker.cc
1 #include <errno.h> 2 #include <fcntl.h> 3 #include <stdio.h> 4 #include <stdlib.h> 5 #include <string.h> 6 #include <unistd.h> 7 #include <sched.h> 8 #include <pthread.h> 9 #include <sys/epoll.h> 10 #include <sys/types.h> 11 #include <sys/socket.h> 12 13 #include "common.h" 14 15 #define MAXFDS 1000000 16 #define EVENTSIZE 1000 17 18 int taskset_thread_core(int core_id) 19 { 20 cpu_set_t cpuset; 21 CPU_ZERO(&cpuset); 22 CPU_SET(core_id, &cpuset); 23 24 pthread_t curr_tid = pthread_self(); 25 return pthread_setaffinity_np(curr_tid, sizeof(cpu_set_t), &cpuset); 26 } 27 28 int setnonblocking(int fd) 29 { 30 if (fcntl(fd, F_SETFL, fcntl(fd, F_GETFD, 0) | O_NONBLOCK) == -1) { 31 printf("fd %d set non blocking failed\n", fd); 32 return -1; 33 } 34 35 return 0; 36 } 37 38 void handle_req(int cli_fd) 39 { 40 char in_buff[MAX_LINE]; 41 int ret, rs = 1; 42 43 while (rs) { 44 ret = recv(cli_fd, in_buff, 1024, 0); 45 46 if (ret < 0) { 47 if (errno == EAGAIN) { 48 printf("EAGAIN\n"); 49 break; 50 } else { 51 printf("recv error: %d\n", errno); 52 close(cli_fd); 53 break; 54 } 55 } else if (ret == 0) { 56 rs = 0; 57 } 58 59 if (ret == sizeof(in_buff)) 60 rs = 1; 61 else 62 rs = 0; 63 } 64 65 if (ret > 0) { 66 send(cli_fd, in_buff, strlen(in_buff), 0); 67 } 68 } 69 70 void run_epoll(int epfd, int pipe_fd) 71 { 72 int i, cli_fd, nfds; 73 struct epoll_event ev, events[EVENTSIZE]; 74 char buff[16]; 75 76 ev.events = EPOLLIN | EPOLLET; 77 78 while (1) { 79 nfds = epoll_wait(epfd, events, EVENTSIZE , -1); 80 for (i = 0; i < nfds; i++) { 81 // pipe msg, add connected fd to epoll 82 if (events[i].data.fd == pipe_fd) { 83 read(pipe_fd, buff, 16); 84 cli_fd = atoi(buff); 85 setnonblocking(cli_fd); 86 ev.data.fd = cli_fd; 87 88 if (epoll_ctl(epfd, EPOLL_CTL_ADD, cli_fd, &ev) < 0) { 89 printf("epoll add fd %d failed\n", cli_fd); 90 } 91 } else { // socket msg 92 cli_fd = events[i].data.fd; 93 handle_req(cli_fd); 94 } 95 } 96 } 97 } 98 99 void *worker(void *arg)100 {101 int epfd, pipe_fd;102 struct epoll_event ev;103 104 taskset_thread_core(((thread_arg*) arg)->id);105 106 pipe_fd = ((thread_arg*) arg)->fd;107 epfd = epoll_create(MAXFDS);108 setnonblocking(pipe_fd);109 ev.data.fd = pipe_fd;110 ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT;111 if (epoll_ctl(epfd, EPOLL_CTL_ADD, pipe_fd, &ev) < 0) {112 printf("epoll add mq fail\n");113 }114 115 run_epoll(epfd, pipe_fd);116 117 return 0;118 }
1.2.2 server
View Code
写完后台代码之后,开始测试能支持多少连接,但测试过程中一直有问题,会报如下的错误:error: Cannot assign requested address。
google了一下,说是因为短时间内大量短连接造成TIME_WAIT耗尽端口问题,不明白我的测试代码怎么是短连接,而不是长连接。
我的客户端代码如下,不知道是哪里出问题了。
#include <unistd.h>#include <arpa/inet.h>#include <sys/socket.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include <errno.h>void process_conn_svr(const char *svr_ip, int svr_port);int connections = 0;#define MAX_CONN 1005000int fd[MAX_CONN]; int main(int argc, char **argv) { if (argc <= 2) { printf("usage: %s ip port\n", argv[0]); exit(0); } const char *ip = argv[1]; int port = atoi(argv[2]); pid_t pid = fork(); if (pid == 0) { process_conn_svr(ip, port); } const char buf[] = "keepalive!"; for (;;) { usleep(1*1000); for (int i = 0; i < MAX_CONN; ++i) { if (fd[i] != 0) { send(fd[i], buf, sizeof(buf), 0); } } } return 0; }void process_conn_svr(const char *svr_ip, int svr_port){ int conn_idx = 0; for (;;) { struct sockaddr_in serv_addr; bzero(&serv_addr, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; inet_pton(AF_INET, svr_ip, &serv_addr.sin_addr); serv_addr.sin_port = htons(svr_port); int cli_fd = socket(AF_INET, SOCK_STREAM, 0); if (cli_fd == -1) { goto sock_err; } if (connect(cli_fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1) { goto sock_err; } fd[conn_idx] = cli_fd; conn_idx++; connections++; printf("connections: %d, fd: %d\n", connections, cli_fd); if (connections % 10000 == 9999) { printf("press Enter to continue: "); getchar(); } usleep(1*1000); }sock_err: printf("error: %s\n", strerror(errno));}
基于管道通知的百万并发长连接server模型