首页 > 代码库 > Hadoop学习<四>--HDFS的RPC通信原理总结

Hadoop学习<四>--HDFS的RPC通信原理总结

这里先写下自己学习RPC的笔记总结,下面将详细介绍学习过程:

RPC(remote procedure call)

  不同java进程间的对象方法的调用。
  一方称作服务端(server),一方称作客户端(client)。
  server端提供对象,供客户端调用的,被调用的对象的方法的执行发生在server端。
   
 RPC是hadoop框架运行的基础。
通过rpc小例子获得的认识?
1. 服务端提供的对象必须是一个接口,接口extends VersioinedProtocal

2. 客户端能够的对象中的方法必须位于对象的接口中。

查看一个基类或者接口的派生类或实现类---鼠标指向类名,Ctrl + T  ;

查看函数的调用关系(找到所有调用该方法的函数)--Ctrl + Alt + H (ubuntu系统快捷键占用,可以类名右键找open call Hierarchy,结果在控制台输出) ;

快速查找类对象的相关信息 -- Ctrl + O(查找类名的所有成员变量和方法),F3查看类名的定义。


RPC 是远程过程调用(Remote Procedure Call),即远程调用其他虚拟机中运行的 java object。RPC 是一种客户端/服务器模式,那么在使用时包括服务端代码和客户端代码,还有我们调用的远程过程对象。

HDFS 的运行就是建立在此基础之上的。这里通过分析实现一个简单的 RPC 程序来分析HDFS 的运行机理。

1.首先定义远程调用类的接口,接口继承的 VersionedProtocal,是hadoop 的 RPC 的接口,所有的 RPC 通信必须实现这个一接口,用于保证客户端和服务端的端口一致。服务端被调用的类必须继承这个接口 VersionedProtocal。

package com.RPC;

import org.apache.hadoop.ipc.VersionedProtocol;
public interface MyBizable extends VersionedProtocol{
    //定义抽象类方法hello
	public abstract String hello(String name);
}
2.然后编写远程调用类,实现这个接口MyBizable,这里面有两个方法被实现,一个就是 hello方法,另一个是 getProtocalVersion 方法。

package com.RPC;

import java.io.IOException;
//实现接口MyBizable,重写hello和getProtocolVersion方法
public class MyBiz implements MyBizable{
	public static long BIZ_VERSION = 123456L;
	@Override
	public String hello(String name) {
		System.out.println("我是ByBiz,我被调用了。");
		return "hello" + name;
	}

	@Override
	public long getProtocolVersion(String protocol, long clientVersion)
			throws IOException {
		//<span style="color:#FF0000;">返回BIZ_VERSION,保证服务器和客户端请求版本一致</span>
		return BIZ_VERSION;
	}
}
3.有了远程调用对象,我们就可以编写服务器端代码,详细在代码中有介绍。

package com.RPC;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;

public class MyServer {
	//定义final类型服务器地址和端口
	public static final String SERVER_ADDRESS = "localhost";
	public static final int SERVER_PORT = 1234;
	/**
	 * RPC是远程过程调用(Remote Procedure Call)
	 */
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		//重点RPC.getServer方法,该方法有四个参数,第一个参数是被调用的 java对象,
		//第二个参数是服务器的地址,第三个参数是服务器的端口。获得服务器对象后,
		//启动服务器。这样,服务器就在指定端口监听客户端的请求。
		final Server server = RPC.getServer(new MyBiz(), SERVER_ADDRESS, SERVER_PORT, conf);
		server.start();
	}
}
4.最后,我们就可以编写客户端代码,来调用服务器方法,注意方法在服务器实现。


package com.RPC;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

public class MyClient {
	/**
	 * RPC客户端
	 */
	public static void main(String[] args) throws Exception {
		//RPC.getProxy(),该方法有四个参数,第一个参数是被调用的接口类,
		//第二个是客户端版本号,第三个是服务端地址。返回的代理对象,
		//就是服务端对象的代理,内部就是使用 java.lang.Proxy 实现的。
		final MyBizable proxy = (MyBizable) RPC.getProxy(MyBizable.class, 
				MyBiz.BIZ_VERSION, new InetSocketAddress(MyServer.SERVER_ADDRESS, MyServer.SERVER_PORT) ,new Configuration() );
		//调用接口中的方法
		final String result = proxy.hello("world");
		//打印返回结果,然后关闭网络连接
		System.out.println(result);
		RPC.stopProxy(proxy);
	}
}
注意上面RPC获取代理方法中接口是调用对象的接口对象,由此可以得出在客户端调用的业务类的方法是定义在业务类的接口中的。该接口实现了 VersionedProtocal 接口。

完成了上面的操作,我们先启动服务端,再启动客户端。观察服务端和客户端输出信息。然后,我们在命令行输入之前查看hadoop节点运行情况的命令jps,输出如下图:

我们可以看到一个 java 进程,是“MyServer”,该进程正是我们刚刚运行的 rpc 的服务端类MyServer。那么可以判断,hadoop 启动时产生的 5 个 java 进程也应该是RPC 的服务端。我们观察 NameNode 的源代码,可以看到 NameNode 确实创建了RPC 的服务端(在namenode类的初始化Initialize方法中,为方便观察,我复制源码,重点查看create server部分)。

 /**
   * Initialize name-node.
   * 
   * @param conf the configuration
   */
  private void initialize(Configuration conf) throws IOException {
    InetSocketAddress socAddr = NameNode.getAddress(conf);
    UserGroupInformation.setConfiguration(conf);
    SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, 
        DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
    int handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
    
    // set service-level authorization security policy
    if (serviceAuthEnabled = 
          conf.getBoolean(
            ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
      PolicyProvider policyProvider = 
          (PolicyProvider)(ReflectionUtils.newInstance(
              conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
                  HDFSPolicyProvider.class, PolicyProvider.class), 
              conf));
        ServiceAuthorizationManager.refresh(conf, policyProvider);
    }
    
    myMetrics = NameNodeInstrumentation.create(conf);
    this.namesystem = new FSNamesystem(this, conf);

    if (UserGroupInformation.isSecurityEnabled()) {
      namesystem.activateSecretManager();
    }

    <span style="color:#FF0000;">// create rpc server</span>
    InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
    if (dnSocketAddr != null) {
      int serviceHandlerCount =
        conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                    DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
      this.serviceRpcServer = <span style="color:#FF0000;">RPC.getServer(this, dnSocketAddr.getHostName(), 
          dnSocketAddr.getPort(), serviceHandlerCount,
          false, conf, namesystem.getDelegationTokenSecretManager());</span>
      this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
      setRpcServiceServerAddress(conf);
    }
    this.server = RPC.getServer(this, socAddr.getHostName(),
        socAddr.getPort(), handlerCount, false, conf, namesystem
        .getDelegationTokenSecretManager());

    // The rpc-server port can be ephemeral... ensure we have the correct info
    this.serverAddress = this.server.getListenerAddress(); 
    FileSystem.setDefaultUri(conf, getUri(serverAddress));
    LOG.info("Namenode up at: " + this.serverAddress);

    startHttpServer(conf);
    this.server.start();  //start RPC server   
    if (serviceRpcServer != null) {
      serviceRpcServer.start();      
    }
    startTrashEmptier(conf);
  }
由上可以看到 NameNode 本身就是一个 java 进程。观察图 5-2 中 RPC.getServer()方法的第一个参数,发现是 this,说明 NameNode 本身就是一个位于服务端的被调用对象,即 NameNode 中的方法是可以被客户端代码调用的。根据 RPC 运行原理可知,NameNode暴露给客户端的方法是位于接口中的。
继续查看namenode类的接口实现,可以看到 NameNode 实现了 ClientProtocal、DatanodeProtocal、NamenodeProtocal 等接口。

下面在接着分析namenode实现的这些常用接口作用,由谁调用实现:

ClientProtocal 由DFSclient调用
这个接口是供客户端调用的。这里的客户端不是指的我们自己写的代码,而是hadoop 的一个类叫做 DFSClient。在 DFSClient 中会调用 ClientProtocal 中的方法,完成一些操作。
该接口中的方法大部分是对 HDFS 的操作,如 create、delete、mkdirs、rename 等。


DatanodeProtocalDataNode调用
这个接口是供 DataNode 调用的。 DataNode 调用该接口中的方法向 NameNode 报告本节点的状态和 block 信息。
NameNode 不 能 向 DataNode 发 送 消 息 , 只 能 通 过 该 接 口 中 方 法 的 返 回 值 向DataNode 传递消息。


NamenodeProtocal 由SecondaryNameNode 调用
这个接口是供 SecondaryNameNode 调用的。SecondaryNameNode 是专门做NameNode 中 edits 文件向 fsimage 合并数据的。

对于datanode节点的接口实现,分析思路大体一致,就是根据具体接口,查看接口协议功能,这是查看hadoop源码的学习经验,常用的一些myeclipse快捷键如下:

查看一个基类或者接口的派生类或实现类---鼠标指向类名,Ctrl + T  ;

查看函数的调用关系(找到所有调用该方法的函数)--Ctrl + Alt + H (ubuntu系统快捷键占用,可以类名右键找open call Hierarchy,结果在控制台输出) ;

快速查找类对象的相关信息 -- Ctrl + O(查找类名的所有成员变量和方法),F3查看类名的定义;

具体快捷键可以通过鼠标类名,右键查看所有可用方法。

那对于datanode的接口大家可以通过上面快捷键练习下,也分析下接口功能,学习datanode重要两个接口,分别是 InterDatanodeProtocal、ClientDatanodeProtocal。今天就总结到这里。

Hadoop学习<四>--HDFS的RPC通信原理总结