首页 > 代码库 > Nginx学习——Nginx进程间的通信

Nginx学习——Nginx进程间的通信

nginx进程间的通信

进程间消息传递

共享内存

共享内存还是Linux下提供的最基本的进程间通信方式,它通过mmap和shmget系统调用在内存中创建了一块连续的线性地址空间,而通过munmap或者shmdt系统调用可以释放这块内存。使用共享内存的好处是当多个进程使用同一块共享内存时,在任何一个进程修改了共享内存中的内容后,其他进程通过访问这段共享内存都能够得到修改后的内容。
Nginx定义了ngx_shm_t结构体,用于描述一块共享内存,
typedef struct{
	//指向共享内存的其实地址
	u_char* addr;
	//共享内存的长度
	size_t size;
	//这块共享内存的名称
	ngx_str_t name;
	//记录日志的ngx_log_t对象
	ngx_lot_t* log;
	//表示共享内存是否已经分配过的标志位,为1时表示已经存在
	ngx_uint_t exists;
} ngx_shm_t;

操作ngx_shm_t结构体的方法有两个:ngx_shm_alloc(基于mmap实现)用于分配新的共享内存,而ngx_shm_free(基于munmap实现)用于释放已经存在的共享内存。
Nginx各进程间共享数据的主要方式就是使用共享内存。一般是由master进程创建,在master进程fork出子进程后,所有的进程开始使用这块内存中的数据。

Nginx频道

ngx_channel_t频道是Nginx master进程与worker进程之间通信的常用工具,它是使用本机套接字实现的。socketpair方法,用于创建父子进程间使用的套接字。
int socketpair ( int d, int type, int protocol, int sv[2] );
通常在父子进程之间通信前,会先调用socketpair创建一组套接字,在调用fork方法创建出子进程后,将会在父进程中关闭sv[1]套接字,子进程关闭sv[0]套接字。
ngx_channel_t频道结构体是Nginx定义的master父进程和worker子进程间通信的消息格式。如下所示:
typedef struct{
		//传递的TCP消息中的命令
		ngx_uint_t command;
		//进程ID,一般是发送命令方的进程ID
		ngx_pid_t pid;
		//表示发送命令方在ngx_processes进程数组间的序号
		ngx_int_t slot;
		//通信的套接字句柄
		ngx_fd_t fd;
} ngx_channel_t;

这个消息的格式之所以如此简单,是因为Nginx仅用这个频道同步master进程与work进程间的状态,这针对command成员已经定义的命令就可以快拿出来,如下所示:
//打开频道,使用频道这种方式通信前必须发送的命令
#define NGX_CMD_OPEN_CHANNEL 1
//关闭已经打开的频道,实际上也就是关闭套接字
#define NGX_CMD_CLOSE_CHANNEL 2
//要求接收方正常地退出进程
#define NGX_CMD_QUIT 3
//要求接收方强制结束进程
#define NGX_CMD_TERMINATE 4
//要求接收方重新打开进程已经打开过的文件
#define NGX_CMD_REOPEN 5

master进程正是通过socketpair产生的套接字发送命令的,即每次要派生一个进程之前都会调用socketpair方法。在Nginx派生子进程的ngx_spawn_proces方法中,会首先派生基于TCP的套接字。
Nginx封装了4个方法: ngx_write_channel,ngx_write_channel, ngx_write_channel和ngx_close_channel。

用于发送消息的ngx_write_channel方法。
ngx_int_t ngx_write_channel(ngx_socket_t s, ngx_channel_t* ch, size_t size, ngx_log_t*log);
这里的s参数是要使用的TCP套接字,ch参数是ngx_channel_t类型的消息,size参数是ngx_channel_t结构体的大小,log参数是日志对象。
读取消息的方法ngx_read_channel
ngx_int_t ngx_read_channel(ngx_socket_t s, ngx_channel_t* ch, size_t size, ngx_log_t* log);
worker进程使用ngx_add_channel_event方法把接受频道消息的套接字添加到epoll中,当接收到父进程消息时子进程会通过epoll的事件回调相应的handler方法来处理这个频道消息。
ngx_int_t ngx_add_channel_event(ngx_cycle_t* cycle, ngx_fd_t fd, ngx_int_t event,ngx_event_handler_pt handler);
cycle参数是每个nginx进程必须具备的ngx_cycle_t核心结构体;fd参数是上面说过的需要接受消息的套接字。event参数是需要检测的事件类型,这里必然是EPOLLIN;handler参数指向的方法就是用于读取消息的方法。

void ngx_close_channel(ngx_fd_t* fd, ngx_lot_t* log);
参数fd就是上面说过的套接字数组。

信号

Nginx定义了一个ngx_signal_t结构体用于描述接收到信号的行为:
typedef struct{
	//需要处理的信号
	int signo;
	//信号对应的字符串名称
	char* siname;
	//这个信号对应着的Nginx命令
	char* name;
	//收到signo信号后就会回调handler方法
	void (*handler)(int signo);
} ngx_signal_t;

还定义了一个数组signals用来定义进程将会处理的所有信号,例如:
ngx_signal_t signals[] = {
	{	
		ngx_signal_value(NGX_RECOFIGURE_SIGNAL),
		“SIG” ngx_value(NGX_RECONFIGURE_SIGNAL),
		“reload”,
		ngx_signal_handler
},
…
}


在定义了signals数组后,ngx_init_signals方法会初始化signals数组中所有的信号,ngx_init_signals其实是调用了sigaction方法注册信号的回调方法。
ngx_int_t ngx_init_signals(nx_log_t* log)
{
	ngx_signal_t* sig;
	struct signaction sa;
	//遍历signals数组,处理每一个ngx_signal_t类型的结构体
	for(sig = signals; sig->signo != 0; sig++){
		ngx_memzero(&sa, sizeof(struct, sigaction));
		//设置信号的处理方法为handler方法
		sa.sa_handler = sig->handler;
		//将sa中的为全部设置为0
		sigemptyset(&sa.sa_mask);
		//注册信号的回调方法
		if(sigaction(sig->signo, &sa, NULL) == -1){
			ngx_log_error(NGX_LOG_EMERG, log, ngx_errno,
“sigaction(%s) failed”, sig->signame);
				return NGX_ERROR;
}
}
	return NGX_OK;
}

这样进程就可以处理信号了。对信号设置并生是在fork()函数调用之前进行的,所以工作金曾等都能受此作用。当然,一般情况下,我们不会向工作进程等子进程发送控制信息,而主要想监控进程父进程发送,父进程收到信号做相应处理后,在根据情况看是否把信号再通知到其他所有子进程。

进程同步

进程同步主要使用了原子操作,信号量和文件锁实现。其中基于原子操作可以实现自旋锁。基于原子操作、信号量以及文件锁,Nginx在更高层次上封装了一个互斥锁,,是用来方便。

原子操作

能够执行原子操作的原子变量只有整型,包括无符号整型ngx_atomic_uint_t和有符号整型ngx_atomic_t,这两种类型都使用了volatile关键字告诉C编译器不要做优化。
Nginx提供两个方法来使用原子操作来修改、获取整型变量:
ngx_atomic_cmp_set和ngx_atomic_fetch_add。这两个方法都可以用来修改原子变量的值,而ngx_atomic_cmp_set方法同时还可以比较原子变量的值。
static ngx_inline ngx_atomic_uint ngx_atomic_cmp_set(ngx_atomic_t* lock, ngx_atomic_uint_t olc, ngx_atomic_uint_t set)
ngx_atomic_cmp_set方法会将old参数与原子变量lock的值做比较,如果他们相等,则将lock设为参数set,同时方法返回1;如果它们不相等,则不作任何修改,返回0。
static ngx_inline ngx_atomic_int_t ngx_atomic_fetch_add(ngx_atomic_t* value,ngx_atomic_int_t add)
ngx_atomic_fetch_add方法会把原子变量value的值加上参数add,同时翻译value的值。
自旋锁
基于原子操作,Nginx实现了一个自旋锁。自旋锁是一种非睡眠锁,也就是说,某进程如果试图获取自旋锁,当发现锁已经被其他进程获取时,那么不会使得当前进程进入睡眠状态,而是始终保持在可执行状态,每当内核调度到这个进程执行时就持续检查是否可以获取锁。在拿不到锁时,这个进程的代码将会一直在自旋锁代码出执行,知道其他进程释放了锁且当前进程获取到了锁后,代码才会继续向下执行。
可见自旋锁主要是为了多处理器操作系统而设置的,它要解决的共享资源保护场景就是进程使用锁的时间非常短。大部分Nginx的worker进程最好都不要进入睡眠状态,因为它非常繁忙,在这个进程的epoll上可能会有十万甚至百万的TCP连接等等待着处理,进程一旦睡眠后必须等待其他时间的唤醒,这中间及其频繁的进程切换带来的负载消耗可能无法让用户接受。
下面介绍基于原子操作的自旋锁方法ngx_spinlock是如何实现的。
它有3个参数,其中lock参数就是原子变量表达的锁,当lock值为0时,表示锁是被释放的,而lock值不为0时则表示锁已经被某个进程持有了;value参数表示希望当锁没有被任何进程持有时,把lock值设为value表示当前进程持有了锁;第三个参数spin表示在多处理器系统内,当ngx_spinlock方法没有拿到锁时,当前进程在内核的一次调度中,该方法等待其他处理器释放锁的时间。下面看一下它的源码:

/*
 * Copyright (C) Igor Sysoev
 * Copyright (C) Nginx, Inc.
 */


#include <ngx_config.h>
#include <ngx_core.h>

//函数:基于原子操作的自旋锁方法ngx_spinlock的实现
//参数解释:
//lock:原子变量表达的锁
//value:标志位,锁是否被某一进程占用
//spin:在多处理器系统内,当ngx_spinlock方法没有拿到锁时,当前进程在内核的一次调度中该方法等待其他处理器释放锁的时间
void
ngx_spinlock(ngx_atomic_t *lock, ngx_atomic_int_t value, ngx_uint_t spin)
{

#if (NGX_HAVE_ATOMIC_OPS)//支持原子操作

    ngx_uint_t  i, n;

    //一直处于循环中,直到获取到锁
    for ( ;; ) {

        //lock为0表示没有其他进程持有锁,这时将lock值设置为value参数表示当前进程持有了锁
        if (*lock == 0 && ngx_atomic_cmp_set(lock, 0, value)) {
            return;
        }

        //如果是多处理器系统
        if (ngx_ncpu > 1) {
			/*
在多处理器下,当发现锁被其他进程占用时,当前进程并不是立刻让出正在使用的CPU处理器,而是等待一段时间,看看其他处理器上的进程是否会释放锁,这会减少进程间切换的次数。
*/
            for (n = 1; n < spin; n <<= 1) {
                //随着等待的次数越来越多,实际去检查锁的间隔时间越来越大
                for (i = 0; i < n; i++) {
					/*
ngx_cpu_pause是许多架构体系中专门为了自旋锁而提供的指令,它会告诉CPU现在处于自旋锁等待状态,通常一个CPU会将自己置于节能状态,降低功耗。但是当前进程并没有让出正在使用的处理器。
*/
                    ngx_cpu_pause();//
                }

                /*
检查锁是否被释放了,如果lock值为0且释放了锁后,就把它的值设为value,当前进程持有锁成功并返回
				*/
                if (*lock == 0 && ngx_atomic_cmp_set(lock, 0, value)) {
                    return;
                }
            }
        }

        /*
`		当前进程让出处理器,但仍然处于可执行状态,使得处理器优先调度其他可执行状态的进程,这样,在进程被内核再次调度时,在for循环代码中可以期望其他进程释放锁。
		*/
        ngx_sched_yield();
    }

#else

#if (NGX_THREADS)

#error ngx_spinlock() or ngx_atomic_cmp_set() are not defined !

#endif

#endif

}

释放锁时需要Nginx模块通过ngx_atomic_cmp_set方法将原子变量设为0。

信号量

Nginx仅把信号量作为简单的互斥锁来使用,使用信号量作为互斥锁有可能导致进程睡眠。不做详解。

文件锁

文件锁是一种文件读写机制,在任何特定的时间只允许一个进程访问一个文件。利用这种机制能够使读写单个文件的过程变得更安全。不做详解。

Nginx实现的互斥锁

基于原子操作、信号量以及文件锁,Nginx在更高层次封装了一个互斥锁,使用起来很方便,许多Nginx模块也只接受使用它。下面介绍的是操作这个互斥锁的5中方法:
ngx_shmtx_create 初始化互斥锁
ngx_shmtx_destory 销毁互斥锁
ngx_shmtx_trylock 无阻塞地试图获取互斥锁,返回1表示获取互斥锁成功,返回0表示获取互斥锁失败
ngx_shmtx_lock 以阻塞进程的方式获取互斥锁,在方法返回时就已经持有了互斥锁了
ngx_shmtx_unlock 释放互斥锁
获取互斥锁时既可以使用不会阻塞进程的ngx_shmtx_trylock方法,也可以使用ngx_shmtx_lock方法告诉Nginx必须持有互斥锁后才能继续向下执行代码。它们都通过操作ngx_shmtx_t类型的结构来实现互斥结构,下面来看一下ngx_shmtx_t有哪些成员。
typedef struct{
		#if (	NGX_HAVE_ATOMIC_OPS)
			//原子变量锁
			ngx_atomic_t*	lock;
		#if (NGX_HAVE_POSIX_SEM)
		      //semaphore为1 时表示获取锁将可能使用到的信号量
			ngx_uint_t semaphonre;
			//sem就是信号量锁
			sem_t sem;
		#endif;
		#else
			//使用文件锁时fd表示使用的文件句柄
			ngx_fd_t fd;
			//name表示文件名
			u_char* name;
		#endif
			/*自旋次数,表示在自旋状态下等待其他处理器结果中释放的时间。由文件锁实现,spin没有任何意义*/
			ngx_uint_t spin;
} ngx_shmtx_t;


ngx_shmtx_t结构涉及两个宏:NGX_HAVE_ATOMIC_OPS、NGX_HVE_POIX_SEM,这两个宏对应着互斥锁的3种不同实现。
第1种实现:当不支持原子操作时,会使用文件锁来实现ngx_hmtx_t互斥锁,这时它仅有fd和name成员。这两个成员使用上面介绍的文件锁来提供阻塞、非阻塞的互斥锁。
第2种实现,支持原子操作却又不支持信号量。
第3种实现,在支持原子操作的同时,操作系统也支持信号量。
后两种实现的唯一区别是ngx_shmtx_lock方法执行时的效果,也就是说,支持信号量只会影响阻塞进程的ngx_shmtx_lock方法持有锁的方式。当不支持信号量时,ngx_shmtx_lock取锁与上面介绍的自旋锁是一致的,而支持信号量后,ngx_shmtx_lock将在spin指定的一段时间内自旋等待其他处理器释放锁,如果达到spin上限还没有获取到锁,那么将会使用sem_wait使得当前进程进入睡眠状态,等其他进程时回访了锁内核后,才会唤醒这个进程。当然,在实际过程中,ngx_shmtx_lock方法运行一段时间后,如果其他进程始终不放弃锁,那么当前进程将有可能强制性地获取到这把锁,这也是出于Nginx不宜使用阻塞进程的睡眠锁方面的考虑。