首页 > 代码库 > 端口复用

端口复用

多个进程绑定(bind)同一个端口,当客户断发起连接(connect)时,内核会通过一个hash算法决定分配到那个进程上。

Linux 4.5之前的reuseport查找实现(4.3内核)

以下是未优化前的Linux 4.3内核的实现,可见是多么地不直观。它采用了遍历HASH冲突链表的方式进行reuseport套接字的精确定位:
result = NULL;  
badness = 0;  
udp_portaddr_for_each_entry_rcu(sk, node, &hslot2->head) {  
    score = compute_score2(sk, net, saddr, sport,  
                  daddr, hnum, dif);  
    if (score > badness) { // 冒泡排序  
        // 找到了更加合适的socket,需要重新hash  
        result = sk;  
        badness = score;  
        reuseport = sk->sk_reuseport;  
        if (reuseport) {  
            hash = udp_ehashfn(net, daddr, hnum,  
                       saddr, sport);  
            matches = 1;  
        }  
    } else if (score == badness && reuseport) { // reuseport套接字散列定位  
        // 找到了同样reuseport的socket,进行定位  
        matches++;  
        if (reciprocal_scale(hash, matches) == 0)  
            result = sk;  
        hash = next_pseudo_random32(hash);  
    }  
} 

  

Linux 4.5(针对UDP)/4.6(针对TCP)的reuseport查找实现

我们来看看在4.5和4.6内核中对于reuseport的查找增加了一些什么神奇的新东西:
result = NULL;  
badness = 0;  
udp_portaddr_for_each_entry_rcu(sk, node, &hslot2->head) {  
    score = compute_score2(sk, net, saddr, sport,  
                  daddr, hnum, dif);  
    if (score > badness) {  
        // 在reuseport情形下,意味着找到了更加合适的socket组,需要重新hash  
        result = sk;  
        badness = score;  
        reuseport = sk->sk_reuseport;  
        if (reuseport) {  
            hash = udp_ehashfn(net, daddr, hnum,  
                       saddr, sport);  
            if (select_ok) {  
                struct sock *sk2;  
                // 找到了一个组,接着进行组内hash。  
                sk2 = reuseport_select_sock(sk, hash, skb,  
                        sizeof(struct udphdr));  
                if (sk2) {  
                    result = sk2;  
                    select_ok = false;  
                    goto found;  
                }  
            }  
            matches = 1;  
        }  
    } else if (score == badness && reuseport) {  
    // 这个else if分支的期待是,在分层查找不适用的时候,寻找更加匹配的reuseport组,注意4.5/4.6以后直接寻找的是一个reuseport组。  
    // 在某种意义上,这回退到了4.5之前的算法。  
        matches++;  
        if (reciprocal_scale(hash, matches) == 0)  
            result = sk;  
        hash = next_pseudo_random32(hash);  
    }  
} 

  

struct sock *reuseport_select_sock(struct sock *sk,  
                   u32 hash,  
                   struct sk_buff *skb,  
                   int hdr_len)  
{  
    ...  
    prog = rcu_dereference(reuse->prog);  
    socks = READ_ONCE(reuse->num_socks);  
    if (likely(socks)) {  
        /* paired with smp_wmb() in reuseport_add_sock() */  
        smp_rmb();  
  
        if (prog && skb) // 可以用BPF来从用户态注入自己的定位逻辑,更好实现基于策略的负载均衡  
            sk2 = run_bpf(reuse, socks, prog, skb, hdr_len);  
        else  
            // reciprocal_scale简单地将结果限制在了[0,socks)这个区间内  
            sk2 = reuse->socks[reciprocal_scale(hash, socks)];  
    }  
    ...  
}  

  单机上的 连接服务器 则可以用端口复用的方式实现负载均衡;也完美解决了nginx之前的惊群现象,也不需要像nginx后来的做法去避免惊群。

下面给出测试用的demo

#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>  
#include <netinet/in.h>  
#include <arpa/inet.h>  
#include <sys/types.h>  
#include <unistd.h>  
#include <pthread.h>
#define MAXLINE 100

void* thread_(void* agr)
{
	int listenfd,connfd;
	struct sockaddr_in servaddr;
	char buff[MAXLINE+1];
	time_t ticks;
	unsigned short port;
	int flag=1,len=sizeof(int);

	port=10013;
	if( (listenfd=socket(AF_INET,SOCK_STREAM,0)) == -1)
	{
		perror("socket");
		exit(1);
	}
	bzero(&servaddr,sizeof(servaddr));
	servaddr.sin_family=AF_INET;
	servaddr.sin_addr.s_addr=htonl(INADDR_ANY);
	servaddr.sin_port=htons(port);
	//SO_REUSEPORT
	if( setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, len) == -1)
	{
		perror("SO_REUSEADDR");
		exit(1);
	}
	if( setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, &flag, len) == -1)
	{
		perror("SO_REUSEPORT");
		exit(1);
	}
	if( bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) ==-1)
	{
		perror("bind");
		exit(1);
	}
	else
		printf("bind call OK!\n");
	if( listen(listenfd,5) == -1)
	{
		perror("listen");
		exit(1);
	}
	char buf[] = "hello world.";
	for(;;)	{
		if( (connfd=accept(listenfd,(struct sockaddr*)NULL,NULL)) == -1)
		{
			perror("accept");
			exit(1);
		}
		send(connfd,buf,sizeof(buf),0);
		close(connfd);
		printf("pid %d : once\n",pthread_self());
	}
}
int main(int argc, char** argv)
{

	pthread_t pt;
	if( 0!=pthread_create(&pt,NULL,thread_,(void*)0) )
	{
		perror("pthread_create");
	}
	thread_((void*)1);
	return 0;
}

  

端口复用