首页 > 代码库 > 一次select多个socket的测试结果

一次select多个socket的测试结果

接收端程序: 1024个socket建立连接后,创建8个线程收数据,每个线程处理128个socket, 每个线程先select这128个socket,然后用FD_ISSET对这128个socket进行检查和读出。

用脚本bw.sh 统计得到的接收端的总带宽为 0.114 Gb/s, 用 iftop 查看得到的接收端带宽在前几秒为3Gb/s, 然后下降到 112 Mb/s。

接收队列的长度为3.7MB。由此可见接收端很慢。

这种一次select多个socket的工作方式效率很低, 不应该采用。

发送端程序:server1bak.c 布置4个发送端

 1 #include <stdio.h>
  2 #include <stdlib.h>
  3 
  4 #include<pthread.h>
  5 #include <unistd.h>
  6 
  7 #include <sys/socket.h>
  8 #include <arpa/inet.h>
  9 #include <netinet/in.h>
 10 #include <netinet/tcp.h>
 11 #include<sys/time.h>
 12 #include<errno.h>
 13 #include<string.h>
 14 
 15 #define IP "192.168.250.147"
 16 #define PORT  33333
 17 #define SOCKNUM  256 
 18 #define PACKETSIZE 2048
 19 
 20 typedef struct {
 21     int sock;
 22 }ARG;
 23 
 24 pthread_t tid[SOCKNUM];
 25 
 26 
 27 int SetSocketOptions(int fd)
 28 {
 29     int sockopt = 0;
 30     int SOCKET_ERROR = -1;
 31     static const int c_so_rcvbuf = 256*1024;
32     static const int c_so_sndbuf = 256*1024;
 33 
 34     sockopt = 1;
 35     if ( setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&sockopt, sizeof(sockopt)) == SOCKET_ERROR )
 36     {
 37         perror("set reuseaddr error");
 38         return -1;
 39     }
 40 
 41     sockopt = c_so_sndbuf;
 42     if ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (char*)&sockopt, sizeof(sockopt)) == SOCKET_ERROR )
 43     {
 44         perror("set so_sndbuf error");
 45         return -1;
 46     }
 47 
 48     sockopt = c_so_rcvbuf;
 49     if ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (char*)&sockopt, sizeof(sockopt)) == SOCKET_ERROR )
 50     {
 51         perror("set so_rcvbuf error");
 52         return -1;
 53     }
 54 }
 55 
 56 
 57 
 58 
 59 int senddata(int sock)
 60 {
61     int ret;
 62     char temp[PACKETSIZE+1] = "I want to know!";
 63     int size_left=2048;
 64     while( size_left > 0)
 65     {
 66         ret = send(sock, temp, size_left, 0);
 67         if (ret < 0)
 68         {
 69             perror("send fail");
 70             exit(1);
 71         }
 72         size_left -= ret;
 73     }
 74 
 75     return size_left;
 76 }
 77 
 78 
 79 int read_cwnd(int tcp_work_socket)
 80 {
 81     struct tcp_info info;
 82     socklen_t  tcp_info_length = sizeof(tcp_info);
 83     if ( getsockopt(tcp_work_socket, SOL_TCP, TCP_INFO, (void *)&info, &tcp_info_length) == 0 ) {
 84         printf(" cwnd:%u, snd_ssthresh:%u, rtt:%u, rtt_d:%u\n",
 85                 info.tcpi_snd_cwnd,
 86                 info.tcpi_snd_ssthresh,
 87                 info.tcpi_rtt,
 88                 info.tcpi_rttvar
 89            );
90     }
 91     return 0;
 92 }
 93 
 94 
 95 void *sendData(void *arg)
 96 {
 97 #if 1 
 98     ARG *a = (ARG *)arg;
 99     int accept_sock = a->sock;
100 
101     long count = 0;
102     struct  timeval  start;
103     struct  timeval  end;
104     unsigned long timer=0;
105     gettimeofday(&start,NULL);
106 
107 
108     while(1){
109         senddata(accept_sock);
110         usleep(900);
111         if(pthread_self()== tid[0])
112         {
113 //      FILE* fpointer = fopen("result.out", "a+");
114         count++;
115         gettimeofday(&end,NULL);
116         timer = 1000000 * (end.tv_sec-start.tv_sec)+ end.tv_usec-start.tv_usec;
117         if(timer%2000== 0)
118         {
119             printf("count: %ld, socket: %ld,  %lf s, %lf Gb/s, ", count, accept_sock,  timer/1000000.0, count*2048.0/timer/1024*8);
120             read_cwnd(accept_sock);
121         }
122 
123 #if 1 
124         if(timer > 3600000000)
125         {
126             printf("before close: timer: %ld\n", timer);
127             close(accept_sock);
128 //          fclose(fpointer);
129             break;
130         }
131 #endif
132         }
133     }
134     return 0;
135 #endif
136 
137 }
138 
139 
140 
141 int main()
142 {
143     int accept_sock[SOCKNUM];
144     struct sockaddr_in addr_ser;
145 
146 
147     int sock = socket(AF_INET, SOCK_STREAM, 0);
148     if (sock < 0)
149     {
150         perror("create sock fail");
151         exit(1);
152     }
153 
154     addr_ser.sin_family = AF_INET;
155     addr_ser.sin_port = htons(PORT);
156 //  addr_ser.sin_addr.s_addr = inet_addr(IP);
157     addr_ser.sin_addr.s_addr = htonl(INADDR_ANY);
158 
159 
160     int sockopt = 1;
161     if (setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, (char *)&sockopt, sizeof(int)) < 0)
162     {
163         perror("setsockopt fail");
164         exit(1);
165     }
166 
167     if (bind(sock, (struct sockaddr*)&addr_ser, sizeof(struct sockaddr)) < 0)
168     {
169         perror("bind fail");
170         exit(1);
171     }
172 
173 
174 #if 0 
175     if ( SetSocketOptions(sock) == -1)
176     {
177         perror("set socket options error");
178         exit(1);
179     }
180 #endif
181 
182     if (listen(sock, 2000) < 0)
183     {
184         perror("listen fail");
185         exit(1);
186     }
187 
188 
189    for(int i=0; i<SOCKNUM; i++)
190    {
191 
192     accept_sock[i] = accept(sock, 0, 0);
193     if (accept_sock[i] < 0)
194     {
195         perror("accept fail");
196         exit(1);
197     }
198     printf("accept ok!\n");
199    }
200 
201 
202 #if 1 
203 
204    //static extern  pthread_t tid[SOCKNUM];
205    ARG a[SOCKNUM];
206    for(int i=0; i<SOCKNUM; i++){
207    a[i].sock = accept_sock[i];
208    pthread_create(&tid[i], 0, sendData, (void *)&a[i]);
209    }
210 #endif
211 
212 #if 1 
213    for(int i=0; i<SOCKNUM; i++)
214    {
215        pthread_join(tid[i], 0);
216    }
217 #endif
218 
219    return 0;
220 }

 

接收端代码:client1_multi_select2.c 

 

1 #include <stdio.h>
  2 #include <stdlib.h>
  3 
  4 #include<pthread.h>
  5 #include <unistd.h>
  6 #include <time.h>
  7 
  8 #include<sys/ioctl.h>
  9 #include <sys/socket.h>
 10 #include <arpa/inet.h>
 11 #include <netinet/in.h>
 12 #include<sys/time.h>
 13 #include<string>
 14 
 15 #define PORT 33333
 16 
 17 #define SOCKNUM 1024
 18 #define THREAD_NUM 8
 19 #define SOCKET_PER_THREAD 128
 20 #define SERVER_NUM 4
 21 #define MSGSIZE 2048
 22 
 23 typedef struct{
 24     int sock[SOCKET_PER_THREAD];
 25 }ARG;
 26 
 27 int SetSocketOptions(int fd)
 28 {
 29     int sockopt = 0;
 30     int SOCKET_ERROR = -1;
 31     static const int c_so_rcvbuf = 256*1024;
32     static const int c_so_sndbuf = 256*1024;
 33 
 34     sockopt = 1;
 35     if ( setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&sockopt, sizeof(sockopt)) == SOCKET_ERROR )
 36     {
 37         perror("set reuseaddr error");
 38         return -1;
 39     }
 40 
 41     sockopt = c_so_sndbuf;
 42     if ( setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (char*)&sockopt, sizeof(sockopt)) == SOCKET_ERROR )
 43     {
 44         perror("set so_sndbuf error");
 45         return -1;
 46     }
 47 
 48     sockopt = c_so_rcvbuf;
 49     if ( setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (char*)&sockopt, sizeof(sockopt)) == SOCKET_ERROR )
 50     {
 51         perror("set so_rcvbuf error");
 52         return -1;
 53     }
 54 }
 55 
 56 int recvdata(int sock, char *buffer)
 57 {
 58     int msgsize = MSGSIZE;
 59     int ret;
 60     int nrecv=0;
61     while (nrecv < msgsize)
 62     {
 63         ret = recv(sock, buffer, msgsize-nrecv, 0);
 64         if (ret < 0)
 65         {
 66             perror("recv fail");
 67             exit(1);
 68         }
 69         else
 70         {
 71             nrecv += ret;
 72         }
 73     }
 74     return nrecv;
 75 }
 76 
 77 int max(int a, int b)
 78 {
 79     if(a >= b)
 80         return a;
 81     else
 82         return b;
 83 }
 84 
 85 
 86 
 87 
 88 int  poll_and_recv(fd_set recv_fds, int maxfd, int* socket)
89 {
 90 #if 1 
 91     char buffer[MSGSIZE] = "0";
 92     struct timeval c_select_timeout = {0, 0};
 93     int sel = select(maxfd+1, &recv_fds, NULL, NULL, &c_select_timeout);
 94     if(sel == 0)
 95     {
 96         return 0;
 97     }
 98     else if(sel > 0)
 99     {
100         for(int i=0; i<SOCKET_PER_THREAD; i++)
101         {
102             if( FD_ISSET(socket[i], &recv_fds) ) // socket is readable
103             {
104                 int length;
105                 int status = ioctl(socket[i], FIONREAD, &length);
106                 if(status == -1)
107                 {
108                     printf("Error reading input size\n");
109                 }
110                 if(length)
111                 {
112                     //printf("data in socket %d : %d\n", socket[i], length);
113 //                  for(int num=0; num<300; num++)
114 //                  {
115                         recvdata(socket[i], buffer);
116 //                  }
117                 }
118                 else // length==0
119                 {
120                     printf("Nothing to read, eof??\n");
121                     if(socket[i] != -1)
122                     {
123                         close(socket[i]);
124                         socket[i] = -1;
125                     }
126                     perror("socket flagged but no data available probable EOF");
127                 }
128 
129             } // FD_ISSET != 0
130         } // i < SOCKET_PER_THREAD 
131     } // sel > 0
132     else // sel < 0
133     {
134         for (int i=0; i<SOCKET_PER_THREAD; i++)
135         {
136             if(socket[i] != -1)
137             {
138                 close(socket[i]);
139                 socket[i] = -1;
140             }
141         }
142         perror("select<0 error");
143         return 0;
144     }
145 146 #endif
147     return 1;
148 
149 }
150 
151 
152 
153 void *recvData(void *arg)
154 {
155     ARG* a = (ARG*)arg;
156     int *socket = a->sock;
157     char buffer[MSGSIZE] = "0";
158     int count = 0;
159 
160     int maxfd = 0;
161     for(int i=0; i<SOCKET_PER_THREAD; i++)
162     {
163         maxfd = max(socket[i], maxfd);
164     }
165 
166     while(1)
167     {
168         fd_set recv_fds;
169         for(int i=0; i<SOCKET_PER_THREAD; i++)
170         {
171             FD_ZERO(&recv_fds);
172             FD_SET(socket[i], &recv_fds);
173         }
174         poll_and_recv(recv_fds, maxfd, socket);
175     }
176     return 0;
177 }
178 
179 
180 int main()
181 {
182     int sock[SERVER_NUM][SOCKNUM/SERVER_NUM];
183     struct sockaddr_in addr_ser[SERVER_NUM];
184     struct sockaddr_in addr_cli[SERVER_NUM][SOCKNUM/SERVER_NUM];
185 
186     std::string local_ip("192.168.250.141");
187 
188     std::string server_ip[SERVER_NUM] = {"192.168.250.146", "192.168.250.147", "192.168.250.142", "192.168.250.143"};
189     //std::string server_ip[SERVER_NUM] = {"192.168.251.166", "192.168.251.167", "192.168.251.162", "192.168.251.163"}; 
190 //  std::string server_ip[SERVER_NUM] = {"192.168.251.163"}; 
191     for(int ser=0; ser < SERVER_NUM; ser++)
192     {
193     for(int i=0; i<SOCKNUM/SERVER_NUM; i++)
194     {
195         sock[ser][i] = socket(AF_INET, SOCK_STREAM, 0);
196         if(sock[ser][i] < 0)
197         {
198             printf("%d ", i);
199             perror("create socket fail");
200         }
201 
202         addr_ser[ser].sin_family = AF_INET;
203         addr_ser[ser].sin_port = htons(PORT);
204         addr_ser[ser].sin_addr.s_addr = inet_addr(server_ip[ser].c_str());
205 
206         addr_cli[ser][i].sin_family = AF_INET;
207         addr_cli[ser][i].sin_port = 0;
208         addr_cli[ser][i].sin_addr.s_addr = inet_addr(local_ip.c_str());
209 
210 
211         int sockopt = 1;
212         if ( setsockopt(sock[ser][i], SOL_SOCKET, SO_REUSEADDR, (char*)&sockopt, sizeof(sockopt)) == -1 )
213         {
214             perror("set reuseaddr error");
215             exit(1);
216         }
217 
218 
219 #if 0 
220 
221         if ( SetSocketOptions(sock[ser][i]) == -1)
222         {
223             perror("set socket options error");
224             exit(1);
225         }
226 #endif
227 
228         if( bind(sock[ser][i], (struct sockaddr*)&addr_cli[ser][i], sizeof(addr_cli[ser][i]) ) < 0 )
229         {
230             perror("TCP bind: ");
231             exit(1);
232         }
233         printf("bind ok!\n");
234 
235         if(connect(sock[ser][i], (struct sockaddr*)&addr_ser[ser], sizeof(struct sockaddr)) < 0)
236         {
237             perror("connect fail:");
238             exit(1);
239         }
240         printf("connect ok!\n");
241 
242     }
243 
244     }
245 
246 
247     int socket[SOCKNUM] ;
248     int count=0;
249     for(int i=0; i< SERVER_NUM; i++)
250     {
251         for(int j=0; j<SOCKNUM/SERVER_NUM; j++)
252         {
253             socket[count++] = sock[i][j];
254         }
255     }
256 
257     pthread_t tid[THREAD_NUM];
258     ARG a[THREAD_NUM];
259     for(int i=0; i<THREAD_NUM; i++)
260     {
261         for(int j=0; j<SOCKET_PER_THREAD; j++)
262         {
263         a[i].sock[j] = socket[i*SOCKET_PER_THREAD+j];
264         }
265         pthread_create(&tid[i], 0, recvData, (void *)&a[i]);
266     }
267 
268     for(int i=0; i<SOCKNUM; i++)
269     {
270         pthread_join(tid[i], 0);
271     }
272 
273     return 0;
274 }

 

计算带宽代码:bw.sh

1 #!/bin/bash
2 
3 
4 bw1=`ifconfig eth4 |grep "RX bytes"|awk {print $2}|awk -F : {print $2}`
5 sleep 60
6 bw2=`ifconfig eth4 |grep "RX bytes"|awk {print $2}|awk -F : {print $2}`
7 
8 echo "($bw2-$bw1)/1024.0/1024.0/1024.0*8/60.0"|bc -l

 

一次select多个socket的测试结果