首页 > 代码库 > Thrift Server总结
Thrift Server总结
Server Server 框架流程:
(1)首先获取连接
client = serverTransport_->accept();
(2)然后获取thransport和protocol
inputTransport = inputTransportFactory_->getTransport(client);
outputTransport = outputTransportFactory_->getTransport(client);
inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
(3)接着获取processor
shared_ptr<TProcessor> processor = getProcessor(inputProtocol,outputProtocol, client);
(4)最后处理请求
processor->process(inputProtocol, outputProtocol,connectionContext);
TThreadedServer是多线程处理请求,来一个连接请求就开启一个线程处理它,处理完就退出线程。处理线程有2种类型:PthreadThread和BoostThread,前者是基posix函数,后者基于boost线程类,它们均通过对应的工厂类创建;还有个Task类,继承于Runnable,主要的处理请求流程在此实现。
TNonblockingServer
(1)采用libevent库的事件驱动框架
// Register the server event
event_set(&serverEvent_,
serverSocket_,
EV_READ | EV_PERSIST,
TNonblockingServer::eventHandler,
this);
event_base_set(eventBase_, &serverEvent_);
event_add(&serverEvent_, 0);
// Create an event to be notified when a task finishes
event_set(¬ificationEvent_,
getNotificationRecvFD(),
EV_READ | EV_PERSIST,
TConnection::taskHandler,
this);
event_base_set(eventBase_, ¬ificationEvent_);
event_add(¬ificationEvent_, 0);
当为threadPoolProcessing mode时,主线程+工作线程架构,主线程负责接收新连接和监听网络数据,当有数据到达时,生成一个task放入队列,工作线程负责取出task,并执行task。这里还用到unix socket来通知主线程任务已完成和使用TMemoryBuffer来缓存读写数据。
connectionStack_是保存空闲TConnection对象的栈,有新连接就从栈里取出一个TConnection对象并初始化它,栈为空时才new一个TConnection对象。连接被close掉时会将TConnection对象交还给connectionStack_,即入栈,但connectionStack_的大小达到connectionStackLimit_时delete掉该TConnection对象。
当numActiveProcessors_ > maxActiveProcessors_ 或 activeConnections > maxConnections_时,server达到overloaded状态,其中numActiveProcessors_是指目前尚未处理的task数;
(5)server达到overloaded状态时,可能采取以下三种措施:
/// Overload condition actions.
enum TOverloadAction {
T_OVERLOAD_NO_ACTION, ///< Don‘t handle overload */
T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop one tasks from head of task queue */
};
有新连接被accept时,会先检查是否达到overloaded,但是会进行如下处理Overload的代码:
// If we‘re overloaded, take action here
if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
nConnectionsDropped_++;
nTotalConnectionsDropped_++;
if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
close(clientSocket);
return;
} else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
if (!drainPendingTask()) {
// Nothing left to discard, so we drop connection instead.
close(clientSocket);
return;
}
}
}
drainPendingTask()函数:
bool TNonblockingServer::drainPendingTask() {
if (threadManager_) {
boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
if (task) {
TConnection* connection =
static_cast<TConnection::Task*>(task.get())->getTConnection();
assert(connection && connection->getServer()
&& connection->getState() == APP_WAIT_TASK);
connection->forceClose();
return true;
}
}
return false;
}
} //可见,只drop one task
Thrift Server总结