首页 > 代码库 > 基于curl的异步http实现

基于curl的异步http实现

简述用于windows客户端的一个异步http模块的实现

1.需要实现的feature

1.1 很容易地发起异步http请求,然后回调。
1.2 能够管理http并发数。
1.3 能够支持http超时:不依赖于curl中实现的连接超时及其它超时。
1.4 请求可以取消。


2.参与者和简要分析:
Manager:接收http请求,调用curl。
Request:封装http请求。
Response:封装http回应。


线程模型:
这里实现异步一般会开线程,假定有一个UI(主)线程,可能有这些模式:
Manager在UI线程中管理若干个工作线程,curl_easy接口。
Manager在自己新起的http线程中管理若干个工作线程,curl_easy接口。
Manager在自己新起的http线程中调用curl_multi接口。


Manager在UI线程中调用curl_multi接口不合适,因为需要占用UI线程时间去select。
开多个work线程浪费资源,线程管理难度大,线程并发度的减少不会造成性能瓶颈,
因为主要耗时在网络IO上。


所以选定模型是:开一个http线程,在上面调用curl_multi接口。


解决回调问题:
这里会遇到两个问题,一个是回调的线程问题:在http中检测到IO完成,
如果直接在http中进行回调,会使得使用者要考虑多线程使用问题,回调中可能的崩溃,
耗时操作会影响异步http模块性能。另一个是回调对象的生命周期问题:如果回调
到对象的成员函数,在回调时有可能该对象已经析构。


考虑到这些,使用了chromium的线程模型:chromium中的base模块提供了几个抽象
层次的线程调度接口。其中一个层次是:知道对方线程的标识符,即可以向对应的线程
派发任务。


于是设计要求调用者是在某个被管理的线程上,那么在发起请求后,在Manager的SendRequest
方法中可以检测到主调线程的MessageLoopProxy,将该代理作为请求的一部分放到请求
队列中。在请求完成后,将response和SendRequest中的callback对象绑定到一起作为
一个任务由MessageLoopProxy派发,这样就回到调用者所在线程上了。


在此同时,对象的生命周期问题也解决,base中有弱指针。


回调接口被设计为void XXX::CallBack(WeakPtr<XXX> ptr, scoped_refptr<Response> ptr);
在绑定到当前对象的弱引用时得到一个签名为 void (scoped_refptr<Response>) 的Runnable Object。
这个签名的对象才可以交给Manager。
在IO完成时,再将这个Runnable Object的参数绑定到对应的Response上,就得到签名为 void (void)的
Runnable Object,同时也是线程池能派发的对象。


在这里引入chromium的线程池,加上了调用者的约束,解决回调问题。


3.参与者设计
Request设计:
包含一堆参数,比如url,HTTP_VERB,HTTP头操作,HTTP的BODY操作,超时设置。
为了简化使用,可以提供一个MakeGetRequest的函数,用于生成一个不需要那么多复杂设置的Request。


Reponse设计:
包含自己定义的错误码,curl的http错误码,http状态码,response body buffer,
response header,原来的url,甚至一个void* user的不透明指针。


Manager设计:
作为单件存在,拥有初始化和反初始化接口。这样,Manager得和所在模块的生命周期
绑定在一起,在合适的地方初始化和反初始化。


在初始化时内部创建http工作线程,整个模块处于就绪状态。


请求队列是少不了的,在访问时注意互斥。
为每个请求分配一个handle,交给调用者。
维护一个当前工作队列,即:加到CURLM中的所有请求。


可以预见http就是不断循环地处理一些事件。


如何及时响应添加的请求。
如何及时退出。
如何处理超时。
如何避免轮询。


如果每次循环都需要主动检测请求队列,可能比较低效。因为一方面肯定要从
curl的select中退出,然后去检测请求队列,而检测时可能发现没有请求。另外一方面
访问队列得加锁。


一般而言,这个问题的解决方案是用个Event。
如果这里用Event,那么退出,删除操作是不是也得有个Event。另外Event的自动切换状态
或手动切换状态会不会切出问题。Event的信息量是1,处理多个请求足够吗。
其实Event的信息量处理多个请求是够的,只要添加请求就触发一次事件能保证work的。


然而在这里不使用Event解决这个问题,使用windows消息,为添加,删除,结束分别定义消息。
用一个状态变量记录上次处理请求队列时是否有未处理的请求,否则需要加锁地读请求队列状态。
当状态变量为true时,请求队列一定非空。当状态变量为false时,请求队列可能是空,也可以有
请求,这说明在上一次读请求队列后请求队列中又加了请求,会通过消息来唤起当前线程。


先来看IO循环,在这些条件下应该退出IO循环:
1.still running的handle数为0,不需要进入IO循环。
2.still running的handle数目发生变化时应该退出IO循环,这样,外部就有可能处理IO完成。
3.still running数没有达到最大,且有未处理请求时。
4.有消息时应该退出IO循环,这个不用说。


这里select是在socket上等待,而其它4个退出条件需要轮询,所以在实现上有点违背curl_multi
的设计初衷。如果每次都额外select一个socket,在发线程消息时,往这个socket发点数据,
就可以将对线程消息的检测放到select中来。前三个退出条件和当前的still running的handle数
相关,可以在select前检测一次。select退出前still running的handle数不会发生变化,所以
可以放心select同时保持敏感。不过这里没有再创建socket了,还是用轮询吧。


在IO循环外是http线程的主循环,主循环可能干这些事:
A.处理IO完成。
B.处理消息。
C.在没有任务时放弃自己的线程执行,通过GetMessage进入休眠。
D.请求队列非空,且当前执行的handle没有达到最大时,需要处理请求队列。
E.设置TIMER(需要建立一个消息窗口),用于检测超时。消息时间是最先可能造时的请求超时的时间。


这些事如何安排?
最开始两个是事件派发类:B和C是同一类逻辑,一量地B或C需要处理,即使进入了IO循环也会
马上退出。
接着是D,检测一下是否能进入GetMessage的调用状态,如果能,则一直等待有消息,重新进入BC的逻辑。
然后是A,表示没有外部的事要处理,专心做IO吧。

而E,在任何可能影响超时时间的后面都加一个。


for (;;)
{
	if (m_UnHandleRequest > 0 && m_RunningRequest < MAX_RUNNING_REQUEST)
	{
		HandleQueueingRequest();
		ModifyTimer();
	}
	while (::PeekMessage())
	{
		ProcessMessage();
		if (m_QuitFlag)
		{
			ClearResource();
			break;
		}
	}
	if (m_QuitFlag)
	{
		break;
	}
	ModifyTimer();
	
	if (!HaveRequestAndIO())
	{
		GetMessage();
		ProcessMessage();
		if (m_QuitFlag)
		{
			ClearResource();
			break;
		}
	}
	if (m_QuitFlag)
	{
		break;
	}
	ModifyTimer();
	
	IOLoop();
	IOComplete();
	ModifyTimer();
}

void ProcessMessage()
{
	case 退出:m_QuitFlag = 1; break;
	case 取消Request:HandleCanceledRequest(); break;
	case 添加Request:HandleQueueingRequest();break;
}

void HandleCanceledRequest()
{
	在请求队列中,直接remove掉。
	在running中,对应的remove操作。
	// bad case,在处理请求时,完成的task已经进入调用者线程的task队列
}

void HandleQueueingRequest()
{
	将请求队列中的请求放到Running队列中。
	调用curl对应接口开始处理对应的请求。
	更新m_UnHandleRequest。
}

void ModifyTimer()
{
	计算最近超时,如果存在则更新timer。
}

void IOLoop()
{
	int m_OldRunningRequest = m_RunningRequest;
	while (m_RunningRequest)
	{
		if (m_UnHandleRequest > 0 && m_RunningRequest < MAX_RUNNING_REQUEST)
		{
			break;
		}
		if (PeekMessage())
		{
			break;
		}
		if (m_OldRunningRequest != m_RunningRequest)
		{
			break;
		}
		
		curl_multi_timeout();
		修改超时时间,如果超时时间超过100毫秒。
		
		如果超时时间是0,需要curl_multi_perform一次。
		curl_multi_fdset();
		select();
	}
}