首页 > 代码库 > 可动态增减的线程池,主线程accept——基于UNP代码修改

可动态增减的线程池,主线程accept——基于UNP代码修改

可动态增减的线程池,主线程accept——基于UNP代码修改

1.说明

线程池基于一个区间动态变化,在客户连接过多线程不够用时,动态增加一定数量的线程。在线程闲置数量多于一半时,动态减小线程数量到一个基准线。

这个例子模式为:半同步/半异步(half-sync/half-async)

 

2.代码相关说明

代码基于UNP的库函数,要想运行必须先安装相应库。

 

3.代码

#include "unpthread.h"#include <queue>#include <list>#include <vector>using std::vector;using std::list;using std::queue;//可以将下面的信息放入一个结构体中,作为线程池结构const int MAX_NTHREADS     = 30;        //线程池能够创建的最大线程数const int MIN_NTHREADS     = 15;        //线程池中最小线程数const int MINFREE_NTHREADS = 2;        //允许的最小空闲线程数const int ADD_NTHREADS     = 5;        //空闲线程数< MINFREE_NTHREDS时增加的线程数//记录每个线程信息typedef struct {    bool        thread_flag;        //是否正在记录线程信息;true: 是; false: 否    pthread_t   thread_tid;         //线程的ID号    long        thread_count;       //此线程被执行的次数} thread_info;static thread_info thread_info_array[MAX_NTHREADS];  //存放所有的线程信息static int total_nthreads = MIN_NTHREADS;             //当前存在的总线程数,初始化线程池的大小static int free_nthreads = MIN_NTHREADS;              //空闲的线程数量//static queue<int> clifd_list;static list<int> clifd_list;                    //已连接的客户端描述符队列//static forward_list<int> clifd_list;//const int MAXNCLI = 50;                       //UNP中的最大的客户端数目//static int clifd[MAXNCLI], iget, iput;        //循环队列: UNP中用于存放已连接的客户,和取放客户端的位置。可以使用vector做循环队列在不够时动态增加长度。static pthread_mutex_t clifd_mutex = PTHREAD_MUTEX_INITIALIZER;static pthread_cond_t  clifd_cond  = PTHREAD_COND_INITIALIZER;static void sig_int(int signo){    void pr_cpu_time(void);    pr_cpu_time();    exit(0);}//线程运行函数void *thread_main(void *arg){    //arg是在thread_info_array中的位置    int thread_pos = (int)arg;    DPRINTF("thread %d starting\n", thread_pos);    void web_child(int);        for (;;) {        DPRINTF("thread %d, threadID %d want to lock\n", thread_pos,                 thread_info_array[thread_pos].thread_tid);        //给当前的线程上锁        Pthread_mutex_lock(&clifd_mutex);        DPRINTF("thread %d threadID %d, thread_id %d locked\n", thread_pos,                thread_info_array[thread_pos].thread_tid, pthread_self());        //UNP中原本的循环数组        /*        while (iget == iput) {            DPRINTF("no jobs, thread %d wait cond\n");            Pthread_cond_wait(&clifd_cond, &clifd_mutex);            DPRINTF("thread %d wait signal cond\n", thread_pos);        }        int connfd = clifd[iget];        if (++iget == MAXNCLI) {            iget = 0;        }        */        //没有已连接的客户        while (clifd_list.empty()) {            //busy_nthreads = 0;            //memset(thread_busyflag, 0, MAX_NTHREADS);            //free_nthreads = total_nthreads;            DPRINTF("no jobs, thread %d wait cond\n");            //等待条件变量,此时mutex解锁。            //被唤醒后,重新加锁            Pthread_cond_wait(&clifd_cond, &clifd_mutex);            DPRINTF("thread %d wait signal cond\n", thread_pos);        }        //for (int i = 0; i < MAX_NTHREADS; ++i) {        //    if (thread_info_list[i].thread_busyflag == 0) {        //        ++free_nthreads;        //    }        //}        //空线程数目           free_nthreads = total_nthreads - clifd_list.size();        //当前线程不够用        if (free_nthreads < 0) {            free_nthreads = 0;        }        //空闲线程数超过一半        if (free_nthreads > total_nthreads / 2 && total_nthreads > MIN_NTHREADS) {            DPRINTF("*free %d, total %d**********************************Free a thread\n\n",                    free_nthreads, total_nthreads);           //减小线程数目            --total_nthreads;            //标识此结构没有再用            thread_info_array[thread_pos].thread_flag = false;            //分离当前线程,线程结束系统回收资源            Pthread_detach(pthread_self());            //thread_info_list[thread_pos].thread_busyflag = 0;            //解锁互斥量            Pthread_mutex_unlock(&clifd_mutex);            //线程退出            pthread_exit(NULL);        }        int connfd = clifd_list.front();        //clifd_list.pop();        clifd_list.pop_front();        //++busy_nthreads;        //--free_nthreads;        //thread_busyflag[thread_pos] = true;        //thread_info_list[thread_pos].thread_busyflag = 1;        Pthread_mutex_unlock(&clifd_mutex);        DPRINTF("thread %d unlocked\n", thread_pos);        //++thread_info_list[thread_pos].thread_count;
        //执行相应任务 web_child(connfd); Close(connfd); //thread_busyflag[thread_pos] = false; } }//创建线程,记录线程信息:ID,结构是否在用void thread_make(int i){ Pthread_create(&thread_info_array[i].thread_tid, NULL, &thread_main, (void*)i); thread_info_array[i].thread_flag = true; return;}int main(int argc, char *argv[]){ socklen_t addrlen; int listenfd; if (argc == 3) { //IP:Port listenfd = Tcp_listen(NULL, argv[1], &addrlen); } else if (argc == 4) { //用于指定ipv4还是ipv6 listenfd = Tcp_listen(argv[1], argv[2], &addrlen); } else { err_quit("Usage: a.out [ <host> ] <port#> <#threads>"); } struct sockaddr *cliaddr = (struct sockaddr*)Malloc(addrlen);// total_nthreads = atoi(argv[argc - 1]); //thread_info_list = (Thread*)Calloc(total_nthreads, sizeof(Thread)); //iget = iput = 0; //create all threads,开始数目是线程池允许的最小数目 for (int i = 0; i < total_nthreads; ++i) { thread_make(i); } //中断用于比较时间 Signal(SIGINT, sig_int); for (;;) { DPRINTF("Busy thread number is %d\n", clifd_list.size()); DPRINTF("Free thread number is %d\n", free_nthreads); DPRINTF("Total thread number is %d\n", total_nthreads); socklen_t clilen = addrlen; DPRINTF("Wait for a connection\n"); //获取已连接的描述符 int connfd = Accept(listenfd, cliaddr, &clilen); DPRINTF("Accept a connection\n"); DPRINTF("Main thread want to lock\n"); //加锁更改存放描述符的结构 Pthread_mutex_lock(&clifd_mutex); DPRINTF("Main thread locked\n"); /*clifd[iput] = connfd; if (++iput == MAXNCLI) { iput = 0; } if (iput == iget) { err_quit("iput = iget = %d\n", iput); }*/ //clifd_list.push(connfd); clifd_list.push_back(connfd); //空闲线程数 free_nthreads = total_nthreads - clifd_list.size(); if (free_nthreads < 0) { free_nthreads = 0; } //空闲连接数小于允许的最小空闲连接数目,增加线程数木 if (free_nthreads < MINFREE_NTHREADS && total_nthreads < MAX_NTHREADS) { for (int i = 0; i < ADD_NTHREADS; ++i, ++total_nthreads) { DPRINTF("******************************create a new threads\n"); for (int j = 0; j < MAX_NTHREADS; ++j) { if (thread_info_array[j].thread_flag == false) { thread_make(j); } } } } //广播条件变量,唤醒正在等待的线程 Pthread_cond_signal(&clifd_cond); DPRINTF("Main thread singal cond\n"); Pthread_mutex_unlock(&clifd_mutex); DPRINTF("Main thread unlocked\n"); } return 0;}