首页 > 代码库 > Thrift源码分析(四)-- 方法调用模型分析

Thrift源码分析(四)-- 方法调用模型分析

RPC调用本质上就是一种网络编程,客户端向服务器发送消息,服务器拿到消息之后做后续动作。只是RPC这种消息比较特殊,它封装了方法调用,包括方法名,方法参数。服务端拿到这个消息之后,解码消息,然后要通过方法调用模型来完成实际服务器端业务方法的调用。


这篇讲讲Thrfit的方法调用模型。Thrift的方法调用模型很简单,就是通过方法名和实际方法实现类的注册完成,没有使用反射机制,类加载机制。


和方法调用相关的几个核心类:

1. 自动生成的Iface接口,是远程方法的顶层接口

2. 自动生成的Processor类及相关父类,包括TProcessor接口,TBaseProcess抽象类

3. ProcessFunction抽象类,抽象了一个具体的方法调用,包含了方法名信息,调用方法的抽象过程等

4. TNonblcokingServer,是NIO服务器的默认实现,通过Args参数来配置Processor等信息

5. FrameBuffer类,服务器NIO的缓冲区对象,这个对象在服务器端收到全包并解码后,会调用Processor去完成实际的方法调用

6. 服务器端的方法的具体实现类,实现Iface接口


下面逐个来分析相关的类。

Iface接口是自动生成的,描述了方法的接口。 服务器端服务提供方DemoService要实现Iface接口

public class DemoService {

  public interface Iface {

    public int demoMethod(String param1, Parameter param2, Map<String,String> param3) throws org.apache.thrift.TException;

  }}public class DemoServiceImpl implements DemoService.Iface{    @Override    public int demoMethod(String param1, Parameter param2,            Map<String, String> param3) throws TException {                return 0;    }}

来看TProcess相关类和接口

1. TProcessor就定义了一个顶层的调用方法process,参数是输入流和输出流

2. 抽象类TBaseProcessor提供了TProcessor的process的默认实现,先读消息头,拿到要调用的方法名,然后从维护的一个Map中取ProcessFunction对象。ProcessFunction对象是实际方法的抽象,调用它的process方法,实际是调用了实际的方法。

3. Processor类是自动生成了,它依赖Iface接口,负责把实际的方法实现和方法的key关联起来,放到Map中维护

public interface TProcessor {
  public boolean process(TProtocol in, TProtocol out)
    throws TException;
}public abstract class TBaseProcessor<I> implements TProcessor {  private final I iface;  private final Map<String,ProcessFunction<I, ? extends TBase>> processMap;  protected TBaseProcessor(I iface, Map<String, ProcessFunction<I, ? extends TBase>> processFunctionMap) {    this.iface = iface;    this.processMap = processFunctionMap;  }  @Override  public boolean process(TProtocol in, TProtocol out) throws TException {    TMessage msg = in.readMessageBegin();    ProcessFunction fn = processMap.get(msg.name);    if (fn == null) {      TProtocolUtil.skip(in, TType.STRUCT);      in.readMessageEnd();      TApplicationException x = new TApplicationException(TApplicationException.UNKNOWN_METHOD, "Invalid method name: '"+msg.name+"'");      out.writeMessageBegin(new TMessage(msg.name, TMessageType.EXCEPTION, msg.seqid));      x.write(out);      out.writeMessageEnd();      out.getTransport().flush();      return true;    }    fn.process(msg.seqid, in, out, iface);    return true;  }}

public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
    public Processor(I iface) {
      super(iface, getProcessMap(new HashMap<String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
    }

    protected Processor(I iface, Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
      super(iface, getProcessMap(processMap));
    }

    private static <I extends Iface> Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> getProcessMap(Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> processMap) {
      processMap.put("demoMethod", new demoMethod());
      return processMap;
    }

    private static class demoMethod<I extends Iface> extends org.apache.thrift.ProcessFunction<I, demoMethod_args> {
      public demoMethod() {
        super("demoMethod");
      }

      protected demoMethod_args getEmptyArgsInstance() {
        return new demoMethod_args();
      }

      protected demoMethod_result getResult(I iface, demoMethod_args args) throws org.apache.thrift.TException {
        demoMethod_result result = new demoMethod_result();
        result.success = iface.demoMethod(args.param1, args.param2, args.param3);
        result.setSuccessIsSet(true);
        return result;
      }
    }

  }

自动生成的demoMethod类继承了ProcessFunction类,它负载把方法参数,iface, 方法返回值这些抽象的概念组合在一起,通过抽象模型来完成实际方法的调用。实际方法的实现者实现了Iface接口。

TNonblockingServer是NIO服务器的实现,它通过Selector来检查IO就绪状态,进而调用相关的Channel。就方法调用而言,它处理的是读事件,用handelRead()来进一步处理

 private void select() {
      try {
        // wait for io events.
        selector.select();

        // process the io events we received
        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();

          // skip if not valid
          if (!key.isValid()) {
            cleanupSelectionKey(key);
            continue;
          }

          // if the key is marked Accept, then it has to be the server
          // transport.
          if (key.isAcceptable()) {
            handleAccept();
          } else if (key.isReadable()) {
            // deal with reads
            handleRead(key);
          } else if (key.isWritable()) {
            // deal with writes
            handleWrite(key);
          } else {
            LOGGER.warn("Unexpected state in select! " + key.interestOps());
          }
        }
      } catch (IOException e) {
        LOGGER.warn("Got an IOException while selecting!", e);
      }
    }   protected void handleRead(SelectionKey key) {      FrameBuffer buffer = (FrameBuffer) key.attachment();      if (!buffer.read()) {        cleanupSelectionKey(key);        return;      }      // if the buffer's frame read is complete, invoke the method.      <strong>if (buffer.isFrameFullyRead()) {        if (!requestInvoke(buffer)) {          cleanupSelectionKey(key);        }      }</strong>    }   protected boolean requestInvoke(FrameBuffer frameBuffer) {    frameBuffer.invoke();    return true;  }

非阻塞同步IO的NIO服务器都会使用缓冲区来存放读写的中间状态。FrameBuffer就是这样的一个缓冲区,它由于涉及到方法调用,所以提供了invoke()方法来实现对Processor的调用。

public void invoke() {
      TTransport inTrans = getInputTransport();
      TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
      TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());

      try {
        processorFactory_.getProcessor(inTrans).process(inProt, outProt);
        responseReady();
        return;
      } catch (TException te) {
        LOGGER.warn("Exception while invoking!", te);
      } catch (Throwable t) {
        LOGGER.error("Unexpected throwable while invoking!", t);
      }
      // This will only be reached when there is a throwable.
      state_ = FrameBufferState.AWAITING_CLOSE;
      requestSelectInterestChange();
    }

FrameBuffer使用了processorFactory来获得Processor。ProcessorFactory是在创建服务器的时候传递过来的,只是对Processor的简单封装。

protected TServer(AbstractServerArgs args) {
    processorFactory_ = args.processorFactory;
    serverTransport_ = args.serverTransport;
    inputTransportFactory_ = args.inputTransportFactory;
    outputTransportFactory_ = args.outputTransportFactory;
    inputProtocolFactory_ = args.inputProtocolFactory;
    outputProtocolFactory_ = args.outputProtocolFactory;
  }public class TProcessorFactory {  private final TProcessor processor_;  public TProcessorFactory(TProcessor processor) {    processor_ = processor;  }  public TProcessor getProcessor(TTransport trans) {    return processor_;  }} public T processor(TProcessor processor) {      this.processorFactory = new TProcessorFactory(processor);      return (T) this;    }


下面是一个实际的TNonblockingServer的配置实例

除了配置服务器运行的基本参数,最重要的就是把实际的服务提供者通过服务器参数的方式作为Processor传递给TNonblockingServer,供FrameBuffer调用。

public class DemoServiceImpl implements DemoService.Iface{

	@Override
	public int demoMethod(String param1, Parameter param2,
			Map<String, String> param3) throws TException {
		
		return 0;
	}
		
	public static void main(String[] args){
		TNonblockingServerSocket socket;
		try {
			socket = new TNonblockingServerSocket(9090);
			TNonblockingServer.Args options = new TNonblockingServer.Args(socket);
			TProcessor processor = new DemoService.Processor(new DemoServiceImpl());
			options.processor(processor);
			options.protocolFactory(new TCompactProtocol.Factory());
			TServer server = new TNonblockingServer(options);
			server.serve();			
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

}



  

Thrift源码分析(四)-- 方法调用模型分析