首页 > 代码库 > libthrift0.9.0解析(四)之TThreadPoolServer&ServerContext

libthrift0.9.0解析(四)之TThreadPoolServer&ServerContext

TThreadPoolServer直接继承自TServer,实现类serve和stop操作。

在serve中可以接受多个连接,每个连接单独开一个线程进行处理,在每个线程中,按顺序处理该线程所绑定连接的请求,因此对同一个连接来说,是同步的。

serve函数主要代码:

while (!stopped_) {      int failureCount = 0;      try {        TTransport client = serverTransport_.accept();        WorkerProcess wp = new WorkerProcess(client);        executorService_.execute(wp);      } catch (TTransportException ttx) {        if (!stopped_) {          ++failureCount;          LOGGER.warn("Transport error occurred during acceptance of message.", ttx);        }      }    }

 其中线程池以及最大线程数通过ThreadPoolExecutor类来实现。

在每个WorkerProcess中,处理请求关键代码如下:

    public void run() {      TProcessor processor = null;      TTransport inputTransport = null;      TTransport outputTransport = null;      TProtocol inputProtocol = null;      TProtocol outputProtocol = null;      TServerEventHandler eventHandler = null;      ServerContext connectionContext = null;      try {        processor = processorFactory_.getProcessor(client_);        inputTransport = inputTransportFactory_.getTransport(client_);        outputTransport = outputTransportFactory_.getTransport(client_);        inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);        outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);	          eventHandler = getEventHandler();        if (eventHandler != null) {          connectionContext = eventHandler.createContext(inputProtocol, outputProtocol);        }        // we check stopped_ first to make sure we‘re not supposed to be shutting        // down. this is necessary for graceful shutdown.        while (true) {            if (eventHandler != null) {              eventHandler.processContext(connectionContext, inputTransport, outputTransport);            }            if(stopped_ || !processor.process(inputProtocol, outputProtocol)) {              break;            }        }      } catch (TTransportException ttx) {        // Assume the client died and continue silently      } catch (TException tx) {        LOGGER.error("Thrift error occurred during processing of message.", tx);      } catch (Exception x) {        LOGGER.error("Error occurred during processing of message.", x);      }      if (eventHandler != null) {        eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol);      }      if (inputTransport != null) {        inputTransport.close();      }      if (outputTransport != null) {        outputTransport.close();      }    }

 注意一下类型为ServerContext的connectionContext变量的生命周期以及角色:
1)在处理所有请求之前,通过eventHandler.createContext(inputProtocol, outputProtocol)创建;

2)在处理每个请求前,通过eventHandler.processContext(connectionContext, inputTransport, outputTransport)进行处理;

3)在停止处理时,通过eventHandler.deleteContext(connectionContext, inputProtocol, outputProtocol) 释放相关资源,生命宣告停止。

使用者可以通过eventHandler插入自己的代码。

【Tips】

根据上面对connectionContext生命周期的分析,我们可以自己实现TServerEventHandler接口,然后在实现类中保存ServerContext信息以及其它附属信息,因为TServerEventHandler可以访问传输层级别,因此可以在其中保存一些连接的底层信息,比如客户端ip,端口等;在processContext函数中,可以更新每次请求前的临时信息供该次请求使用,这样,在我们的IDL中定义的实现函数中,可以访问TServerEventHandler对象,取得保存在其中的信息进行处理,这是典型的【黑板报】设计模式。

 

libthrift0.9.0解析(四)之TThreadPoolServer&ServerContext