首页 > 代码库 > Erlang:RabbitMQ源码分析 5. worker pool 实现分析

Erlang:RabbitMQ源码分析 5. worker pool 实现分析

worker_pool 由一个worker_pool 和N个worker_pool_worker组成, N = number of scheduler threads

外部程序有两种方式来call worker_pool ,  submit(func) and submit_async(func)

submit(func) 是同步的提交Task,worker_pool 用worker_pool_worker做完返回

submit_async(func)是异步的提交Task,worker_pool 找个worker_pool_worker异步去做,如果没有worker_pool_worker就把Task加到Queue里


worker_pool是一个gen_server2的behavior,而且使用了hibernate和backoff,保证在没有Task request时候进入hibernate状态。


submit

submit是同步提交Task的方法,

              1. 如果State里有available的worker_pool_worker,

                        1.1 monitor call submit的pid, 如果这个pid 意外退出,worker_pool_worker会收到request,在worker_pool_worker的handle_info里处理,避免有pid 意外退出,却没有释放worker_pool_worker

                         1.2 就把call submit的pid 放到worker_pool_worker的State里

                         1.3 停止monitor

                         1.4 run and reply

                         1.5 通知worker_pool,这个worker_pool_worker 空闲了

                         1.6 清空worker_pool_worker的State

               2. 如果State里没有available的worker_pool_worker

                         2.1 把submit的Pid放到worker_pool的State的Pending Queue里

                         2.2 当1.5里worker_pool收到有worker_pool_worker 空闲了的通知,就返回这个worker_pool_worker 的pid

                         2.3 再重复1.1 - 1.6


submit_async

submit_async 是异步提交Task的方法 ,如果有available的worker_pool_worker,就直接异步run,run 完通知worker_pool 空闲。如果没有available的worker_pool_worker,就塞到worker_pool的State的Pending Queue里,但有worker_pool_worker空闲通知过来的时候,用这个worker_pool_worker异步去run

                

Erlang:RabbitMQ源码分析 5. worker pool 实现分析