首页 > 代码库 > nginx源码分析——线程池


源码: nginx 1.13.0-release
Syntax: thread_pool name threads=number [max_queue=number];
Default: thread_pool default threads=32 max_queue=65536;
Context: main
     根据之前讲到过的模块初始化流程(在master启动worker之前) create_conf--> command_set函数-->init_conf,下面就按照这个流程看看thread_pool模块的初始化
/*******************  nginx/src/core/ngx_thread_pool.c  ************************/
static void * ngx_thread_pool_create_conf(ngx_cycle_t *cycle)
    ngx_thread_pool_conf_t  *tcf;
    tcf = ngx_pcalloc(cycle->pool, sizeof(ngx_thread_pool_conf_t));
    if (tcf == NULL) {
        return NULL;
    if (ngx_array_init(&tcf->pools, cycle->pool, 4,
                       sizeof(ngx_thread_pool_t *))
        != NGX_OK)
        return NULL;
    return tcf;
static char *  ngx_thread_pool(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
    ngx_str_t          *value;
    ngx_uint_t          i;
    ngx_thread_pool_t  *tp;
    value = http://www.mamicode.com/cf->args->elts;"threads=", 8) == 0) {
        if (ngx_strncmp(value[i].data, "max_queue=", 10) == 0) {
static char * ngx_thread_pool_init_conf(ngx_cycle_t *cycle, void *conf)
    ngx_thread_pool_t  **tpp;
    tpp = tcf->pools.elts;
    for (i = 0; i < tcf->pools.nelts; i++) {
    return NGX_CONF_OK;
/*******************  nginx/src/core/ngx_thread_pool.c  ************************/
static ngx_int_t
ngx_thread_pool_init_worker(ngx_cycle_t *cycle)
    ngx_uint_t                i;
    ngx_thread_pool_t       **tpp;
    ngx_thread_pool_conf_t   *tcf;
    if (ngx_process != NGX_PROCESS_WORKER
        && ngx_process != NGX_PROCESS_SINGLE)
        return NGX_OK;
    tpp = tcf->pools.elts;
    for (i = 0; i < tcf->pools.nelts; i++) {
        if (ngx_thread_pool_init(tpp[i], cycle->log, cycle->pool) != NGX_OK) {
            return NGX_ERROR;
    return NGX_OK;
static ngx_int_t  ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
    if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {
        return NGX_ERROR;
    if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
        (void) ngx_thread_mutex_destroy(&tp->mtx, log);
        return NGX_ERROR;
    for (n = 0; n < tp->threads; n++) {
        err = pthread_create(&tid, &attr, ngx_thread_pool_cycle, tp);
        if (err) {
            ngx_log_error(NGX_LOG_ALERT, log, err,
                          "pthread_create() failed");
            return NGX_ERROR;
static void *ngx_thread_pool_cycle(void *data)
     for ( ;; ) {
        if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
            return NULL;
        /* the number may become negative */
        while (tp->queue.first == NULL) {
            if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)
                != NGX_OK)
                (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
                return NULL;
        task = tp->queue.first;
        tp->queue.first = task->next;
        if (tp->queue.first == NULL) {
            tp->queue.last = &tp->queue.first;
        if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
            return NULL;
        task->handler(task->ctx, tp->log);
        ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048);
        *ngx_thread_pool_done.last = task;
        ngx_thread_pool_done.last = &task->next;
        (void) ngx_notify(ngx_thread_pool_handler);
static void  ngx_thread_pool_handler(ngx_event_t *ev)
    ngx_spinlock(&ngx_thread_pool_done_lock, 1, 2048); 
    task = ngx_thread_pool_done.first;
    ngx_thread_pool_done.first = NULL;
    ngx_thread_pool_done.last = &ngx_thread_pool_done.first;
    while (task) {
        ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0,
                       "run completion handler for task #%ui", task->id);
        event = &task->event;
        task = task->next;
        event->complete = 1;
        event->active = 0;
/*********************** nginx/src/os/unix/ngx_files.c  **********************/
static ssize_t  ngx_http_file_cache_aio_read(ngx_http_request_t *r, ngx_http_cache_t *c)
    if (clcf->aio == NGX_HTTP_AIO_THREADS) {
        c->file.thread_task = c->thread_task;
        c->file.thread_handler = ngx_http_cache_thread_handler;
        c->file.thread_ctx = r;
        n = ngx_thread_read(&c->file, c->buf->pos, c->body_start, 0, r->pool);
        c->thread_task = c->file.thread_task;
        c->reading = (n == NGX_AGAIN);
        return n;
    return ngx_read_file(&c->file, c->buf->pos, c->body_start, 0);
static ngx_int_t  ngx_http_cache_thread_handler(ngx_thread_task_t *task, ngx_file_t *file)
    tp = clcf->thread_pool;
    task->event.data = http://www.mamicode.com/r;"task #%ui already active", task->id);
        return NGX_ERROR;
    if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
        return NGX_ERROR;
    if (tp->waiting >= tp->max_queue) {
        (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
        ngx_log_error(NGX_LOG_ERR, tp->log, 0,
                      "thread pool \"%V\" queue overflow: %i tasks waiting",
                      &tp->name, tp->waiting);
        return NGX_ERROR;
    task->event.active = 1;
    task->id = ngx_thread_pool_task_id++;
    task->next = NULL;
    if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
        (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
        return NGX_ERROR;
    *tp->queue.last = task;
    tp->queue.last = &task->next;
    (void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
    ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
                   "task #%ui added to thread pool \"%V\"",
                   task->id, &tp->name);
    return NGX_OK;
