首页 > 代码库 > Hadoop 自定义RPC protocol

Hadoop 自定义RPC protocol

RPC的全称为远程过程调用。由于Hadoop是一个分布式系统,因此底层的通信库也就必须实现RPC的基础功能。Hadoop RPC 在整个hadoop中扮演着底层通信模块的角色,举例而言NN和DN、AM和RM之间的通信和协调都是Hadoop RPC来完成的。熟悉使用Hadoop RPC可以加深我们对Hadoop各个模块之间通信过程的理解,也能让我们实现一些自己想要的分布式的小功能。

很多Hadoop相关书籍中都详细介绍了Hadoop RPC,其具体原理大家有兴趣的话可以去看源码加深理解。不过,我觉得作为底层通信库,我们更多的是需要去使用它,因此在这里我利用Hadoop RPC本身提供的接口,实现一个简单的通信模型。

本文所针对的Hadoop版本为2.4.1,其实RPC这一模块,Hadoop的兼容性做的很好。同早起版本相比,对RPC编程库使用者而言,主要的改变时Hadoop现在可以很好地兼容Protocol buffer、Avro和早起的Writable等序列化框架。本文将会建立一个使用org.apache.hadoop.ipc包(在hadoop-common工程目录下)提供的RPC接口,并且使用Writable序列化框架的RPC通信模型。

 

首先,是定义一个通信协议,所有利用Hadoop RPC的通信协议都必须继承VersionedProtocol接口,它主要是为了加入协议的版本信息:

import org.apache.hadoop.ipc.VersionedProtocol;public interface Protocol extends VersionedProtocol{	public static final long versionID = 1L;	public boolean writeFile(Info statics);}

现在我们定义了一个客户端和服务器端进行通信的协议,我们希望客户端发起调用,而后,由远程的服务器端对调用进行处理,再将处理得到的结果进行返回。同时我们也定义了这个协议的版本号用于版本监测。

这里我们希望客户端执行的方法是writeFile,客户端将一个类型为Info的参数传递给此方法,在服务器端执行完writeFile方法后,将执行结果返回给客户端。显然这里会设计到网络通信的问题,我们的参数Info,必须经由网络传递给服务器端,同时,服务器将执行的结果再返回给客户端。

为了方便对象在网络中传输,Hadoop RPC利用两个主要的内部框架:

1.序列化层:将对象转化为字节流在网络中进行传输。

2.Java反射机制:传输过去的字节流,在服务器端利用反射,重新生成这个对象。

通过这两个框架,对于上层而言,我们看不到对象到底是如何传输的,只会感觉在服务器端收到了在客户端建立的完全一致的对象。

通过上面的简单的介绍,我们应该注意到,Info statics这个对象,必须要能够进行序列化,也必须要利用反射建立对象的一些基本条件。

现在让我们看看如何定义Info这个类:

import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;public class Info implements Writable {	public Text fileName;	public LongWritable taskNum;	public Info(){		this.fileName=new Text();		this.taskNum =new LongWritable();					}	public void setFileName(String str){		this.fileName = new Text(str);	}	public void setTaskNum(Long num){		this.taskNum = new LongWritable(num);	}	public String getFileName(){		return fileName.toString();	}	public Long getTaskNum(){		return taskNum.get();	}	@Override	public void write(DataOutput out) throws IOException {		fileName.write(out);		taskNum.write(out);			}	@Override	public void readFields(DataInput in) throws IOException {		System.out.println("Come to invoke my readFields");		// TODO Auto-generated method stub		fileName.readFields(in);		taskNum.readFields(in);		System.out.println("Read Success!!!");	}}

  这里需要特别说明的几点如下:

1.要让一个对象可以序列化,必须继承Writable接口,只要实现了这个接口,那么对象在RPC 中的传递就可以认为对我们而言是透明的。

而实现这个对象的序列化,就必须override 两个方法:public void write(DataOutput out) throws IOException以及public void readFields(DataInput in) throws IOException

2.为了能够在服务器端对对象进行反射,特别需要注意的就是要定义一个无参数的构造函数。在服务器端,会调用此方法,生成一个对象的实例。另外,尤其要注意的是如果说此对象包含了自定义类型的成员变量,注意在构造函数中对其进行实例化。否则,在后续进行readFields调用时,将会报错。由于你没有对该自定义类型的成员变量进行实例化,readFields读取到的数据无法完成赋值操作,产生如下的运行时报错:

Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at com.sun.proxy.$Proxy8.writeFile(Unknown Source)
at SJTU.client.App.main(App.java:30)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.RpcServerException): IPC server unable to read call parameters: null
at org.apache.hadoop.ipc.Client.call(Client.java:1410)
at org.apache.hadoop.ipc.Client.call(Client.java:1363)
at org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:240)
... 2 more

 

现在我们已经定义了一个协议,也实现了协议中方法的参数的序列化,下面的工作就非常简单了,首先是实现我们之前定义的协议。注意,这个实现是会在服务器端运行的,你需要注意到服务器的运行环境和上下文。

下面是我的协议的实现:

import java.io.BufferedWriter;import java.io.FileWriter;import java.io.IOException;import org.apache.hadoop.HadoopIllegalArgumentException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.ipc.ProtocolSignature;import org.apache.hadoop.ipc.RPC;import org.apache.hadoop.ipc.RPC.Server;public class WriteServer implements Protocol{	public Server server;	@Override	public long getProtocolVersion(String protocol, long clientVersion)			throws IOException {		return this.versionID;	}	@Override	public ProtocolSignature getProtocolSignature(String protocol,			long clientVersion, int clientMethodsHash) throws IOException {		return new ProtocolSignature(Protocol.versionID,null);	}	@Override	public boolean writeFile(Info statics) {		// 在服务器端新建一个文件,并将由客户端传递过来的Info statics信息写入进去		FileWriter writer;		try {			writer = new FileWriter("/home/chenershuile/helloWritable");			BufferedWriter bw = new BufferedWriter(writer);			bw.write(statics.getFileName());			bw.write(statics.getTaskNum().toString());			bw.close();	    	writer.close();		} catch (IOException e) {			// TODO Auto-generated catch block			e.printStackTrace();		}		return true;// 执行完成,返回ture	}    public void init(){    	try {			this.server = new RPC.Builder(new Configuration()).setBindAddress("master").setNumHandlers(5).setProtocol(Protocol.class).setPort(50071)					.setInstance(new WriteServer()).build();		} catch (HadoopIllegalArgumentException e) {			// TODO Auto-generated catch block			e.printStackTrace();		} catch (IOException e) {			// TODO Auto-generated catch block			e.printStackTrace();		}    }    @SuppressWarnings("static-access")	public void run(){    	this.server.start();    	System.out.println("The server start at:"+this.server.getPort());    }    public void stop(){    	this.server.stop();    }}

  在这里,我不仅仅实现了协议,也利用org.apache.hadoop.ipc.RPC类提供给我们的Builder静态方法,建立起了一个服务器端。这个服务器端会开启50071端口,进行监听。

每个Set方法,感兴趣的可以去org.apache.hadoop.ipc.RPC类中查看,都非常简单,主要是设置服务器的配置。

要注意的是setProtocol()和setInstance()方法。前者填入此服务器的处理的协议的class(协议是一个接口),后者填入你所实现这个接口的类的实例(new Impl()).

好了,到此为止,我们已经完成了服务器端的工作,现在我们可以利用WriteServer中的方法,建立起一个高效的Hadoop RPC server。

public class App {    public static void main( String[] args )    {    	WriteServer writeServer = new WriteServer();    	writeServer.init();    	writeServer.run();    	    }}

  之后,运行此jar,可以看到程序执行如下:

我们可以发现,服务器端已经开启了运行,现在需要的是建立一个Client端。

 

建立Client端的过程非常简单,我们可以用代码说明如下:

import java.io.IOException;import java.net.InetSocketAddress;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.ipc.RPC;import SJTU.server.Protocol;import SJTU.server.Info;/** * Hello world! * */public class App {    public static void main( String[] args )    {    	InetSocketAddress addr = new InetSocketAddress("master",50071);    	Configuration conf = new Configuration();        try {			Protocol proxy=RPC.getProxy(Protocol.class,Protocol.versionID,addr,conf);			System.out.println("Start Client");			String fileName = "hdfs://";			Long taskNum = 10L;			Info statics = new Info();			statics.setFileName(fileName);			statics.setTaskNum(taskNum);			System.out.println(proxy.writeFile(statics));		} catch (IOException e) {			// TODO Auto-generated catch block			e.printStackTrace();		}    }}

  核心代码同样是借助于org.apache.hadoop.ipc.RPC提供给我们的对外的静态方法getProxy。这里我简单说明下参数的意义:前两者都没什么说的,跟协议相关。addr是一个InetSocketAddress,代表服务器端的地址,当然你要和服务器端保持一致,确保能访问到。conf主要的作用,按照我的理解,是相应的配置文件。具体它有什么用我也说不全面,还希望大神为我解答。

 

为了更好地理解调用的过程,我在相关的类中,写了很多System.out.println()来查看相关过程。。。所以我的执行结果也比较奇怪。。。

服务器端主要被涉及的类,除了org.apache.hadoop.ipc.RPC还有WritableRpcEngine,他俩都在同一个包下,它完成了Writable序列化比较相关的工作。

而对象的真正的序列化读取是在:ObjectWritable中,它在org.apcha.hadoop.io包中

对对象的反射机制,hadoop在底层的java.lang.reflect 上层又添加了一层:ReflectionUtil,它在org.apache.hadoop.util包中

 

最后,程序的执行过程如下,很多在终端的额外输出都是我自己对以上几个类修改添加的,可以帮助我们了解反射,代理,通信,序列化这些过程的执行。当然详细的理解还是需要自己去更加细致的分析整个RPC过程。

服务器端:

 

客户端:

 

 

可以注意一下,对Info对象,其在server端会进行readFields。而且在服务器端会进行反射,实例化等操作。具体的细节大家可以参考Hadoop技术内幕和到上文提到的相关的类中查看。