首页 > 代码库 > Hadoop IPC的代码结构分析

Hadoop IPC的代码结构分析

与IPC相关的代码在org.apache.hadoop.ipc包下。共七个文件,其中4个辅助类:

RemoteException

Status

VersionedProtocol

ConnectionHeader

主要实现类3个:

Client

Server

RPC

客户端Client:

技术分享

如上图:

与IPC连接相关的

  • Client.Connection
  • Client.ConnectionId
  • ConnectionHeader

与远程调用Call相关的

  • Client.Call
  • Client.ParallelCall
  • Client.ParallelResults

服务器端Server:

技术分享

与IPC连接相关的

  • Server.Connection
  • ConnectionHeader

与远程调用Call相关的

  • Server.Call
  • Server.Responder
  • Server.Listener
  • Server.Handler

 

RPC

RPC是在Server及Client的基础上实现了Hadoop IPC。

技术分享

与客户端相关的功能:

  • RPC.ClientCache
  • RPC.Invoker(继承java.lang.reflect.InvocationHandler)
  • RPC.Invocation

与服务端相关的功能:

  • RPC.Server

 

Connection

客户端与服务器端对连接的抽象不一样,所以有Server.Connection和Client.Connection。Hadoop远程调用采用TCP协议通信。

1)客户端Client.ConnectionId

连接复用:当多个IPC客户端的ConnectionId相同时,他们共享一个IPC连接。连接复用可以减少Hadoop Server、Client的资源占用,同时节省IPC连接时间。

2)ConnectionHeader

Server与Client间TCP连接建立后交换的第一条信息,包含ConnectionId.ticket(UserGroupInformation)用户信息和IPC接口信息,检验是否实现了IPC接口,以及该用户是否有权使用接口。

Call

建立连接后,即可以进行远程过程调用服务,即对IPC接口方法的调用,源码抽象为Call。

远程调用Client.Call对象和Server.Call对象,是一个IPC调用产生的,存在于IPC客户端(存根)和IPC服务端(骨架)中的实体。

Client.Call对象通过IPC连接到服务器后,自然会构成相应的Server.Call对象。

 

Client.Call何时产生以及如何产生?

技术分享

如上图所示流程:

  1. 用户发起远程接口调用
  2. 动态代理,RPC.Invoker调用句柄捕获远程调用
  3. 根据invoke的输入参数method、args生成RPC.Invocation对象
  4. 并调用Client.call,call会将上一步的Invocation对象序列化并通过IPC连接发送到服务器。Client.call会等待服务端返回的结果。
  5. 服务器端Listener监听Client发来的连接请求和数据请求,并调用Server端的连接对象。
  6. 连接对象接收远程调用请求帧,反序列化,并将请求放于阻塞队列中,由Handler处理。
  7. Handler调用对应的IPC接口实现类,完成过程调用,将结果序列化。
  8. 如果此时连接的应答队列为空,返回给客户端。
  9. 否则,客户端比较忙,应答队列不为空,Handler将结果放入响应队列,由Responser通过IPC发送给客户端。

 

IPC连接

  • 连接建立
  • 连接上的数据读写
  • 连接维护
  • 连接关闭

 

服务器端的IPC连接代码分散在Listener和Server.Connection中。

Listener.run() 实现了NIO中的选择器循环。如下代码:

//Listener构造函数public Listener() throws IOException {      address = new InetSocketAddress(bindAddress, port);      // Create a new server socket and set to non blocking mode      acceptChannel = ServerSocketChannel.open();      acceptChannel.configureBlocking(false);      // Bind the server socket to the local host and port      bind(acceptChannel.socket(), address, backlogLength);      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port      // create a selector;      selector= Selector.open();      readers = new Reader[readThreads];      readPool = Executors.newFixedThreadPool(readThreads);      for (int i = 0; i < readThreads; i++) {        Selector readSelector = Selector.open();        Reader reader = new Reader(readSelector);        readers[i] = reader;        readPool.execute(reader);      }

Listener.run()开启选择器循环,并处理Accept请求,如下:

//Listener运行函数public void run() {      LOG.info(getName() + ": starting");      SERVER.set(Server.this);      while (running) {        SelectionKey key = null;        try {          selector.select();          Iterator<SelectionKey> iter = selector.selectedKeys().iterator();          while (iter.hasNext()) {            key = iter.next();            iter.remove();            try {              if (key.isValid()) {                if (key.isAcceptable())                  doAccept(key);              }            } catch (IOException e) {            }            key = null;          }        } catch (OutOfMemoryError e) {          // we can run out of memory if we have too many threads          // log the event and sleep for a minute and give           // some thread(s) a chance to finish          LOG.warn("Out of Memory in server select", e);          closeCurrentConnection(key, e);          cleanupConnections(true);          try { Thread.sleep(60000); } catch (Exception ie) {}        } catch (Exception e) {          closeCurrentConnection(key, e);        }        cleanupConnections(false);      }      LOG.info("Stopping " + this.getName());      synchronized (this) {        try {          acceptChannel.close();          selector.close();        } catch (IOException e) { }        selector= null;        acceptChannel= null;                // clean up all connections        while (!connectionList.isEmpty()) {          closeConnection(connectionList.remove(0));        }      }    }

doAccept()中通过server.accpet获取SocketChannel,并获取一个Reader对象,该对象包含一个Selector:readerSelector,通过reader.registerChannel,将SocketChannel注册到readerSelector下.并新建connection对象。

//Do_Acceptvoid doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {      Connection c = null;      ServerSocketChannel server = (ServerSocketChannel) key.channel();      SocketChannel channel;      while ((channel = server.accept()) != null) {        channel.configureBlocking(false);        channel.socket().setTcpNoDelay(tcpNoDelay);        Reader reader = getReader();        try {          reader.startAdd();          SelectionKey readKey = reader.registerChannel(channel);          c = new Connection(readKey, channel, System.currentTimeMillis());          readKey.attach(c);          synchronized (connectionList) {            connectionList.add(numConnections, c);            numConnections++;          }          if (LOG.isDebugEnabled())            LOG.debug("Server connection from " + c.toString() +                "; # active connections: " + numConnections +                "; # queued calls: " + callQueue.size());                  } finally {          reader.finishAdd();         }      }    }
public synchronized SelectionKey registerChannel(SocketChannel channel)                                                          throws IOException {          return channel.register(readSelector, SelectionKey.OP_READ);      }

Hadoop IPC的代码结构分析