首页 > 代码库 > Swoole源码学习记录(十)——Factory模块(下)
Swoole源码学习记录(十)——Factory模块(下)
Swoole版本:1.7.5-stable
本章将分析FactoryProcess.c中剩下的函数,这些函数用于操作worker、manager以及writer。这些函数提供了最核心的进程创建、管理等功能,是Swoole的master-worker结构的基石。
先从worker相关的函数开始(manager相关函数基本都涉及操作worker进程)。在FactoryProcess.c中一共声明了4个操作函数,分别是:
static int swFactoryProcess_worker_loop(swFactory *factory, int worker_pti); static int swFactoryProcess_worker_spawn(swFactory *factory, int worker_pti); static sw_inline uint32_t swServer_worker_schedule(swServer *serv, uint32_t schedule_key); static sw_inline int swFactoryProcess_worker_excute(swFactory *factory, swEventData *task);
先分析两个内联函数。swServer_worker_schedule用于根据不同的调度模式查找对应的需要执行任务的worker_id。其核心源码如下:
if(serv->dispatch_mode == SW_DISPATCH_ROUND) { target_worker_id = (serv->worker_round_id++) % serv->worker_num; } //Using the FD touch access to hash else if (serv->dispatch_mode == SW_DISPATCH_FDMOD) { target_worker_id = schedule_key % serv->worker_num; } //Preemptive distribution else { if (serv->ipc_mode == SW_IPC_MSGQUEUE) { //msgsnd参数必须>0 //worker进程中正确的mtype应该是pti + 1 target_worker_id = serv->worker_num; } else { int i; sw_atomic_t *round = &SwooleTG.worker_round_i; for (i = 0; i < serv->worker_num; i++) { sw_atomic_fetch_add(round, 1); target_worker_id = (*round) %serv->worker_num; if (serv->workers[target_worker_id].status== SW_WORKER_IDLE) { break; } } swTrace("schedule=%d|round=%d\n", target_worker_id, *round); } }
源码解释:根据swServer的dispatch_mode(调度模式)决定具体的选取方法。SW_DISPATCH_ROUND为轮询模式,SW_DISPATCH_FDMOD为根据FD取模(同一个fd一定会被同一个worker进程处理),否则就是抢占模式(从第一个worker开始查找,找到第一个空闲的worker就占用该worker)。另外,在抢占模式下,如果使用了消息队列,则不需要分配worker_id,每个worker会从消息队列中获取自己需要处理的数据。
swFactoryProcess_worker_excute函数用于处理一个worker接收到的swEventData*task,根据task的type不同执行不同的逻辑。核心代码如下:
swServer *serv =factory->ptr; swString*package = NULL; factory->last_from_id= task->info.from_id; //workerbusy serv->workers[SwooleWG.id].status= SW_WORKER_BUSY; switch(task->info.type) { //nobuffer caseSW_EVENT_TCP: caseSW_EVENT_UDP: caseSW_EVENT_UNIX_DGRAM: //ringbuffershm package caseSW_EVENT_PACKAGE: onTask: factory->onTask(factory,task); if(!SwooleWG.run_always) { //onlyonTask increase the count worker_task_num--; } if(task->info.type == SW_EVENT_PACKAGE_END) { package->length= 0; } break; //packagetrunk caseSW_EVENT_PACKAGE_START: caseSW_EVENT_PACKAGE_END: //inputbuffer package= SwooleWG.buffer_input[task->info.from_id]; //mergedata to package buffer memcpy(package->str+ package->length, task->data, task->info.len); package->length+= task->info.len; //printf("package[%d].from_id=%d|data_len=%d|total_length=%d\n", task->info.type,task->info.from_id, task->info.len, package->length); //packageend if(task->info.type == SW_EVENT_PACKAGE_END) { gotoonTask; } break; caseSW_EVENT_CLOSE: serv->onClose(serv,task->info.fd, task->info.from_id); break; caseSW_EVENT_CONNECT: serv->onConnect(serv,task->info.fd, task->info.from_id); break; caseSW_EVENT_FINISH: serv->onFinish(serv,task); break; default: swWarn("[Worker]error event[type=%d]", (int)task->info.type); break; } //workeridle serv->workers[SwooleWG.id].status= SW_WORKER_IDLE;
源码解释:设置factory的最近一次接收的reactor的id为task的from_id。设置worker的状态值为BUSY状态。根据task的type类型决定具体的调用类型。这里的调用逻辑稍微复杂点:
1. SW_EVENT_TCP、SW_EVENT_UDP、SW_EVENT_UNIX_DGRAM这三种类型没有缓存区,因此直接调用onTask投递任务,如果没有设置run_always(一直运行),则将剩余可处理的worker_task_num减一;
2. SW_EVENT_PACKAGE是RingBuffer共享内存池的包,因为该package是完整的不需要拼接,因此同样直接onTask发送;
3. SW_EVENT_PACKAGE_START 获取reactor对应的缓存,将task的data放入缓存区中;
4. SW_EVENT_PACKAGE_END 所有数据发送完毕,跳到onTask,通过onTask投递任务,并将缓存区置空。(这里存在一个疑问,缓存区中的数据并没有通过onTask投递,也没有发现将缓存区数据存入task的行为,待解决)
5. SW_EVENT_CLOSE、SW_EVENT_CONNECT、SW_EVENT_FINISH分别调用对用的PHP回调函数。
执行完成后,设置worker状态为IDLE。
接下来是swFactoryProcess_worker_spawn,该函数的功能是重启一个worker进程,函数核心源码如下:
pid = fork(); if(pid < 0) { swWarn("ForkWorker failed. Error: %s [%d]", strerror(errno), errno); returnSW_ERR; } //workerchild processor elseif (pid == 0) { ret= swFactoryProcess_worker_loop(factory, worker_pti); exit(ret); } //parent,addto writer else { returnpid; }
源码解释:调用fork创建worker进程,在进程内调用swFactoryProcess_worker_loop进入worker的工作循环,循环结束后推出进程。在父进程中返回worker进程的pid。
然后就是重要的swFactoryProcess_worker_loop函数,该函数定义了一个worker进程的主要工作循环,由于该函数较长,所以仍然是分段分析。下面上源码:
swServer *serv =factory->ptr; struct { longpti; swEventDatareq; }rdata; intn; intpipe_rd = serv->workers[worker_pti].pipe_worker;
源码分析:获取swServer对象,构建rdata结构体(该结构体的成员和swDispatchData结构体完全一样)用于存放对应的worker_id和需要处理的数据。同时获取对应worker的读管道。
#ifdef HAVE_CPU_AFFINITY if(serv->open_cpu_affinity == 1) { cpu_set_tcpu_set; CPU_ZERO(&cpu_set); CPU_SET(worker_pti% SW_CPU_NUM, &cpu_set); if(0 != sched_setaffinity(getpid(), sizeof(cpu_set), &cpu_set)) { swWarn("pthread_setaffinity_npset failed"); } } #endif
源码解释:如果定义了HAVE_CPU_AFFINITY宏且swServer中指定打开了cpu affinity setting(CPU亲和性设置,英文解释为always run this process on processor one,大概翻译一下就是每个进程只能在某个指定的处理器上运行(针对多核CPU)),则先通过CPU_SET将worker_id对应的CPU加入到CPU集合中,然后通过sched_setaffinity函数指定当前worker进程运行在这个CPU上。(这里就能理解为何Swoole要求将worker_num设置为CPU的核的数目)
//signalinit swWorker_signal_init(); //worker_id SwooleWG.id= worker_pti; #ifndef SW_USE_RINGBUFFER inti; //foropen_check_eof and open_check_length if (serv->open_eof_check || serv->open_length_check ||serv->open_http_protocol) { SwooleWG.buffer_input = sw_malloc(sizeof(swString*) *serv->reactor_num); if (SwooleWG.buffer_input == NULL) { swError("malloc for SwooleWG.buffer_input failed."); return SW_ERR; } for (i = 0; i < serv->reactor_num; i++) { SwooleWG.buffer_input[i] = swString_new(serv->buffer_input_size); if (SwooleWG.buffer_input[i] == NULL) { swError("buffer_input initfailed."); return SW_ERR; } } } #endif
源码解释:调用swWorker_signal_init指定对应信号的处理函数,并设置SwooleWG全局变量中的id为当前worker_id。接下来这段就是处理Swoole提供的自动分包功能,如果使用RingBuffer,并且设置了eof检测、长度检测、http协议中的任意一种,则都需要设置worker的缓存区。首先给缓存区buffer_input分配空间,开辟一个长度为reactor_num的、存放swString指针的数组,然后遍历数组,为数组中的每一位创建一个swString对象,swString的长度为指定的缓存区长度(buffer_input_size,对应config中的package_max_length)。
if(serv->ipc_mode == SW_IPC_MSGQUEUE) { //抢占式,使用相同的队列type if(serv->dispatch_mode == SW_DISPATCH_QUEUE) { //这里必须加1 rdata.pti= serv->worker_num + 1; } else { //必须加1 rdata.pti= worker_pti + 1; } }
源码解释:如果使用了消息队列,在抢占模式下,使用相同的队列type,其他模式下,指定rdata的进程id为worker_id + 1(我没理解为何要必须加1,后面发现了会过来补充)。
else { SwooleG.main_reactor= sw_malloc(sizeof(swReactor)); if(SwooleG.main_reactor == NULL) { swError("[Worker]malloc for reactor failed."); returnSW_ERR; } if(swReactor_auto(SwooleG.main_reactor, SW_REACTOR_MAXEVENTS) < 0) { swError("[Worker]create worker_reactor failed."); returnSW_ERR; } swSetNonBlock(pipe_rd); SwooleG.main_reactor->ptr= serv; SwooleG.main_reactor->add(SwooleG.main_reactor,pipe_rd, SW_FD_PIPE); SwooleG.main_reactor->setHandle(SwooleG.main_reactor,SW_FD_PIPE, swFactoryProcess_worker_onPipeReceive); #ifdef HAVE_SIGNALFD if(SwooleG.use_signalfd) { swSignalfd_setup(SwooleG.main_reactor); } #endif }
源码解释:如果没有使用消息队列,初始化并调用swReactor_auto函数创建主要的reactor,设置读管道pipe_rd为非阻塞模式,并将该管道加入reactor中监听,并设置回调函数为swFactoryProcess_worker_onPipeReceive(从名字就能看出这是干嘛的……)。最后一段,如果设置了HAVE_SIGNALFD宏并且开启了use_signalfd,则调用swSignalfd_setup函数设置main_reactor。(对于这段的分析将在对Swoole的signal.c文件分析时给出)
if (serv->max_request < 1) { SwooleWG.run_always = 1; } else { worker_task_num = serv->max_request; worker_task_num += swRandom(worker_pti); }
源码解释:如果没有设置max_request(每个worker允许处理的最大请求数),则设置run_always为true,否则,设置worker_task_num数量为max_request,然后加上一个随机数(个位数的随机数* worker_id)
//worker start swServer_worker_onStart(serv); if (serv->ipc_mode == SW_IPC_MSGQUEUE) { while (SwooleG.running > 0) { n = serv->read_queue.out(&serv->read_queue, (swQueue_data *)&rdata, sizeof(rdata.req)); if (n < 0) { if (errno == EINTR) { if (SwooleG.signal_alarm) { swTimer_select(&SwooleG.timer); } } else { swWarn("[Worker%ld]read_queue->out() failed. Error: %s [%d]", rdata.pti, strerror(errno),errno); } continue; } swFactoryProcess_worker_excute(factory, &rdata.req); } } else { struct timeval timeo; timeo.tv_sec = SW_REACTOR_TIMEO_SEC; timeo.tv_usec = SW_REACTOR_TIMEO_USEC; SwooleG.main_reactor->wait(SwooleG.main_reactor, &timeo); } //worker shutdown swServer_worker_onStop(serv);
源码解释:首先调用swServer_worker_onStart回调(这里对应onWorkerStart回调,通知PHP程序)。如果是消息队列模式,则进入循环,每次从消息队列中取出一条消息,调用swFactoryProcess_worker_excute处理该消息;如果不是消息队列模式,则调用main_reactor的wait方法进入事件监听(详情请看第八章Reactor模块)。循环结束后,调用swServer_worker_onStop回调通知PHP程序。
swFactoryProcess_worker_onPipeReceive函数的核心源码如下:
if(read(event->fd, &task, sizeof(task)) > 0) { /** * Big package */ ret = swFactoryProcess_worker_excute(factory,&task); if (task.info.type ==SW_EVENT_PACKAGE_START) { //no data if (ret < 0 && errno == EAGAIN) { return SW_OK; } else if (ret > 0) { goto read_from_pipe; } } return ret; }
源码解释:从管道中读出一个task数据,然后调用swFactoryProcess_worker_excute函数执行任务。(这里的task来源就是reactor的监听)
这里可以整理出一个完整的worker流程:swFactoryProcess_worker_spawn创建worker进程,随后通过swFactoryProcess_worker_loop进入worker的主循环;当有任务来临时,通过swServer_worker_schedule获取对应的worker,调用swFactoryProcess_worker_excute执行任务。
FactoryProcess中共声明了两个manager操作函数,如下:
static int swFactoryProcess_manager_loop(swFactory *factory); static int swFactoryProcess_manager_start(swFactory *factory);
swFactoryProcess_manager_start函数用于启动一个manager进程,该manager进程用于创建和管理每个worker进程。其核心源码较长,分段分析:
if(serv->ipc_mode == SW_IPC_MSGQUEUE) { //读数据队列 if(swQueueMsg_create(&serv->read_queue, 1, serv->message_queue_key, 1)< 0) { swError("[Master]swPipeMsg_create[In] fail. Error: %s [%d]", strerror(errno), errno); returnSW_ERR; } //为TCP创建写队列 if(serv->have_tcp_sock == 1) { //写数据队列 if(swQueueMsg_create(&serv->write_queue, 1, serv->message_queue_key +1, 1) < 0) { swError("[Master]swPipeMsg_create[out] fail. Error: %s [%d]", strerror(errno), errno); returnSW_ERR; } } }
源码解释:消息队列模式下,首先创建读数据队列,如果swServer设置了TCP sock的监听,则需创建写队列。
else { object->pipes= sw_calloc(serv->worker_num, sizeof(swPipe)); if(object->pipes == NULL) { swError("malloc[worker_pipes]failed. Error: %s [%d]", strerror(errno), errno); returnSW_ERR; } //worker进程的pipes for(i = 0; i < serv->worker_num; i++) { if(swPipeUnsock_create(&object->pipes[i], 1, SOCK_DGRAM) < 0) { returnSW_ERR; } serv->workers[i].pipe_master= object->pipes[i].getFd(&object->pipes[i], 1); serv->workers[i].pipe_worker= object->pipes[i].getFd(&object->pipes[i], 0); } }
源码解释:非消息队列模式下,创建通讯管道(使用unix sock管道,以此保证管道是双向可用的)
if(SwooleG.task_worker_num > 0) { key_tmsgqueue_key = 0; if(SwooleG.task_ipc_mode > 0) { msgqueue_key= serv->message_queue_key + 2; } if(swProcessPool_create(&SwooleG.task_workers, SwooleG.task_worker_num,serv->task_max_request, msgqueue_key) < 0) { swWarn("[Master]create task_workers failed."); returnSW_ERR; } swWorker*worker; for(i= 0; i < SwooleG.task_worker_num; i++) { worker = swServer_get_worker(serv, serv->worker_num+ i); if (swWorker_create(worker) < 0) { return SW_ERR; } } //设置指针和回调函数 SwooleG.task_workers.ptr= serv; SwooleG.task_workers.onTask= swTaskWorker_onTask; SwooleG.task_workers.onWorkerStart= swTaskWorker_onWorkerStart; SwooleG.task_workers.onWorkerStop= swTaskWorker_onWorkerStop; }
源码解释:如果设置了task_worker_num,则创建一个PorcessPool进程池(这里将在对Swoole的进程池、线程池做分析时给出详解)用于管理多个task进程,随后循环创建指定个数的task进程并设置swServer指针和回调函数。
pid = fork(); switch(pid) { //创建manager进程 case0: //创建子进程 for(i = 0; i < serv->worker_num; i++) { //close(worker_pipes[i].pipes[0]); reactor_pti= (i % serv->writer_num); serv->workers[i].reactor_id= reactor_pti; pid= swFactoryProcess_worker_spawn(factory, i); if(pid < 0) { swError("Forkworker process fail"); returnSW_ERR; } else { serv->workers[i].pid= pid; } } /** * create task worker pool */ if(SwooleG.task_worker_num > 0) { swProcessPool_start(&SwooleG.task_workers); } //标识为管理进程 SwooleG.process_type= SW_PROCESS_MANAGER; ret= swFactoryProcess_manager_loop(factory); exit(ret); break; //主进程 default: SwooleGS->manager_pid= pid; break; case-1: swError("fork()failed."); returnSW_ERR; }
源码解释:调用fork函数创建manager进程,在manager进程中,调用swFactoryProcess_worker_spawn创建子进程,调用swProcessPool_start启动task进程池,标记该进程为管理进程,并调用swFactoryProcess_manager_loop进入管理进程核心工作循环。
接下来是swFactoryProcess_manager_loop函数,该函数定义了manager进程的主工作流程,其核心工作是监听worker的exit事件并创建新的worker进程。核心源码如下:
swServer *serv =factory->ptr; swWorker *reload_workers; swSignal_set(SIGTERM,swWorker_signal_handler, 1, 0); if(serv->onManagerStart) { serv->onManagerStart(serv); } reload_worker_num= serv->worker_num + SwooleG.task_worker_num; reload_workers= sw_calloc(reload_worker_num, sizeof(swWorker)); if(reload_workers == NULL) { swError("[manager]malloc[reload_workers] failed"); return SW_ERR; } //forr eload swSignal_add(SIGUSR1,swManager_signal_handle);
设置Term终止事件的信号监听,如果设置了onManagerStart回调,调用它(onStart)。设置需要reload的worker数量为worker_num + task_worker_num.并分配reload_workers数组的空间用于临时存放worker结构体。同时设置针对reload信号的监听。
while(SwooleG.running > 0) { pid= wait(&worker_exit_code); if(pid < 0) { if(manager_worker_reloading == 0) { swTrace("[Manager]wait failed. Error: %s [%d]", strerror(errno), errno); } elseif (manager_reload_flag == 0) { memcpy(reload_workers,serv->workers, sizeof(swWorker) * serv->worker_num); if(SwooleG.task_worker_num > 0) { memcpy(reload_workers+ serv->worker_num, SwooleG.task_workers.workers, sizeof(swWorker)* SwooleG.task_worker_num); } manager_reload_flag= 1; gotokill_worker; } } if(SwooleG.running == 1) { for(i = 0; i < serv->worker_num; i++) { //comparePID if(pid != serv->workers[i].pid) { continue; } else { if(serv->onWorkerError!=NULL && WEXITSTATUS(worker_exit_code) > 0) { serv->onWorkerError(serv,i, pid, WEXITSTATUS(worker_exit_code)); } pid= 0; while(1) { new_pid= swFactoryProcess_worker_spawn(factory, i); if(new_pid < 0) { usleep(100000); continue; } else { serv->workers[i].pid= new_pid; break; } } } } //taskworker if(pid > 0) { swWorker*exit_worker = swHashMap_find_int(SwooleG.task_workers.map, pid); if(exit_worker != NULL) { swProcessPool_spawn(exit_worker); } } } //reloadworker kill_worker:if (manager_worker_reloading == 1) { //reloadfinish if(reload_worker_i >= reload_worker_num) { manager_worker_reloading= 0; reload_worker_i= 0; continue; } ret= kill(reload_workers[reload_worker_i].pid, SIGTERM); if(ret < 0) { swWarn("[Manager]kill()failed, pid=%d. Error: %s [%d]", reload_workers[reload_worker_i].pid,strerror(errno), errno); continue; } reload_worker_i++; } }
源码解释:进入manager的核心工作循环。调用wait等待worker_exit事件的发生。wait函数会返回exit的worker进程的pid。
如果pid小于0,说明wait函数出错,如果manager_worker_reloading标记为0,说明当前并没有调用reload函数,则报错。如果manager_worker_reloading为1,且manager_reload_flag为0,则将所有worker进程的结构体copy进reload_workers中,,设置manager_reload_flag为1(代表正在重启进程),随后goto到kill_worker标签处;
如果pid大于0且swServer正在运行,首先遍历worker数组找到pid对应的worker进程,如果设置了onWorkerError回调并且worker是异常退出的,就调用该回调。随后,循环调用swFactoryProcess_worker_spawn函数创建新的worker进程直到创建成功,设置workerpid为新的pid。如果是task进程,则先通过swHashMap_find_int函数从SwooleG全局变量的task_workers里的map中找到对应的task_worker结构体,随后调用swProcessPool_spawn函数重建该task进程。
Kill_worker标签处,如果manager_worker_reloading为1,说明调用了reload函数,如果reload_worker_i索引大于或等于reload_worker_num,说明所有进程都重启了一遍,因此重置manager_worker_reloading标记和reload_worker_i索引,继续下一次循环;否则,调用kill函数杀死reload_worker_i索引指向的worker进程,将reload_worker_i后移一位。
这里处理了两种情况,一种是worker进程自己退出,这时仅重启退出的worker进程。一种是调用了reload函数,这时要重启全部的worker进程。
到此,FactoryProcess模块全部分析完毕。下一章将分析Swoole的Worker.c模块、Connection.c模块和ReactorProcess模块。
Swoole源码学习记录(十)——Factory模块(下)