首页 > 代码库 > 一种基于Qt的可伸缩的全异步C/S架构服务器实现(三)

一种基于Qt的可伸缩的全异步C/S架构服务器实现(三)

三、流水线结构线程池设计

为了无阻塞地实现并发通信及处理,传统的小规模服务器采用每用户一线程的多线程技术,称为“任务伴随者”模式。该模式示意图如下:

然而,当客户端很多时,开启上百组线程,远远超过计算机的物理线程规模,导致大量计算资源浪费在线程上下文切换和环境恢复等维护工作中,有效计算能力显著降低。

   在多线程并行计算技术中,能够有效利用CPU物理核心,避免上下文频繁切换的经典模式是线程池模式。系统仅开启与CPU核心数相等的工作线程,形成线程池(ThreadPool)。各个任务在队列中排队,按照先进先出次序(FIFO)送入线程池中处理。该模式的示意图如下:

该模式尽管避免了任务量较大时,实际计算能力降低的问题,但单位时间内仅能处理一定数量的任务,存在阻塞的可能。万一现在处理的几个任务均很耗时,则其他客户的简单任务也会被耽误。

     在非阻塞的情况下利用线程池模式有效处理大量用户数据,要靠基于线程池的流水线技术,实现最优线程配置条件下低阻塞处理。

该模式的关键是对每个客户的处理任务进行细化,比如每K个指令为一个粒度单位,无论该客户的指令队列缓存了多少指令,一次仅处理K个,随后让出计算资源分配给其他任务使用。示意图如下:

       采用该方法,流水线结构保证了各个客户工作在大粒度上并行化,线程池技术保证了处理器资源的最大利用,可以显著提高系统的吞吐能力。另一个附加好处,是可以让VIP获得高级优先级。

3.1 模块结构

 


在范例代码中,本模块的所有文件位于 pipeline 文件夹下。

命名空间:ZPTaskEngine

主要有三个类组成。

1、zp_pipeline类

该类是流水线线程池的接口类。其管理了各个执行者线程,以及任务队列。执行者线程存储在本类的成员变量中。

		//working threads
		QVector<zp_plWorkingThread *> m_vec_workingThreads;
		QVector<QThread *> m_vec_InternalworkingThreads;
通过方法addThreads可以控制线程池的规模。

2、zp_plWorkingThread类

这个类是用于执行任务的线程对象。在其被创建时,绑定在一个QThread的线程事件循环中运行。创建的方法位于 zp_pipeline::addThreads中,

	/**
	 * @brief Add nThreads to the thread pool
	 *
	 * @fn zp_pipeline::addThreads
	 * @param nThreads how many threads you want to add.
	 * @return int current threads count after add.
	 */
	int zp_pipeline::addThreads(int nThreads)
	{
		if (nThreads>=1 && nThreads <=128)
		{
			for (int i=0;i<nThreads;i++)
			{
				zp_plWorkingThread * thread = new zp_plWorkingThread(this);
				m_vec_workingThreads.push_back(thread);
				QThread * pTh = new QThread(this);
				m_vec_InternalworkingThreads.push_back(pTh);
				thread->moveToThread(pTh);
				connect (this,&zp_pipeline::evt_start_work,thread,&zp_plWorkingThread::FetchNewTask,Qt::QueuedConnection);
				connect (this,&zp_pipeline::evt_stop_work,thread,&zp_plWorkingThread::setStopMark,Qt::QueuedConnection);
				connect (thread,&zp_plWorkingThread::taskFinished,this,&zp_pipeline::on_finished_task,Qt::QueuedConnection);
				pTh->start();
				m_mutex_protect.lock();
				m_nExistingThreads++;
				m_mutex_protect.unlock();

			}

		}
		return m_vec_workingThreads.size();
	}

3、zp_plTaskbase类

本类是一个纯虚基类,用于给应用者重载具体的执行任务。该类的核心方法是 run(),用于在线程池的某个线程中运行。


3.2 工作原理

 1、  当外部需要执行任务时,调用 zp_pipeline::pushTask方法,向任务队列m_list_tasks中传入zp_plTaskbase类型的指针。一旦队列中被插入了新任务,会立刻判断是否有空闲的线程可以执行这个任务。如果有,立刻触发执行。核心代码:

	void zp_pipeline::pushTask(zp_plTaskBase * task,bool bFire )
	{
		m_mutex_protect.lock();
		m_list_tasks.push_back(task);
		task->addRef();
		m_mutex_protect.unlock();

		int nsz =  m_vec_workingThreads.size();
		if (bFire==true)
			for (int i=0;i<nsz;i++ )
			{
				if (m_vec_workingThreads[i]->m_bBusy==false)
				{
					on_finished_task (m_vec_workingThreads[i]);
					break;
				}
			}

	}


2、 zp_pipeline:: on_finished_task 槽既是任务的起始,也是任务的结束。当某个zp_plWorkingThread对象执行完了一次任务,便会触发本方法。在本方法中,zp_pipeline对象检查自己的队列,看看是否还有任务需要执行。如果有,则读入一个任务继续执行

	void  zp_pipeline::on_finished_task (zp_plWorkingThread * task)
	{
		int res = 0;
		m_mutex_protect.lock();
		res = m_list_tasks.size();
		m_mutex_protect.unlock();
		if (res)
			emit evt_start_work(task );
	}

触发执行任务是使用事件 evt_start_work 触发的,这个信号发给 task线程,使得它在自己的槽函数中获取新的任务。

	/**
	 * @brief Call zp_plTaskBase::popTask to fetch new tasks.
	 *
	 * @fn zp_plWorkingThread::FetchNewTask
	 * @param obj the zp_plWorkingThread object recieved by signal-slot system.
	 * this method will omit zp_plWorkingThread objs except for it self.
	 */
	void zp_plWorkingThread::FetchNewTask(zp_plWorkingThread * obj)
	{


		if (obj != this)
			return;
		if (m_bRuning)
		{

			bool bValid = false;
			zp_plTaskBase * ptr = this->m_pipeline->popTask(&bValid);

			if (bValid==true && ptr!=NULL)
			{
				m_bBusy = true;
				int res = ptr->run();
				ptr->delRef();
				m_bBusy = false;
				if (res!=0 )
					this->m_pipeline->pushTask(ptr,false);
			}

			emit taskFinished(this);

		}

	}

在FetchNewTask槽中,会调用 zp_pipeline::popTask方法,弹出一个任务。一旦任务弹出,则会调用虚函数  run() 运行。run()的返回结果为0, 表示本任务彻底完成了,不再塞入队列的尾部;如果返回值非0, 说明任务还没有完成,只是执行了一部分,任务自己就释放了资源,防止阻塞整个流程。在这个情况下,任务被重新 push回队列。

3.3后续预告

下一章,将介绍数据库的简单封装。