首页 > 代码库 > hadoop2.0 rpc分析-1.0
hadoop2.0 rpc分析-1.0
1. 使用的实例:
package com.rpc; import org.apache.hadoop.ipc.VersionedProtocol; public interface MyProtocol extends VersionedProtocol{ public static final long versionID=1L; String getlength(String str); int add(int a, int b); } package com.rpc; import java.io.IOException; import org.apache.hadoop.ipc.ProtocolSignature; public class MyProtocolImp implements MyProtocol{ public long getProtocolVersion(String protocol, long clientVersion) throws IOException { // TODO Auto-generated method stub return MyProtocol.versionID; } public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException { return new ProtocolSignature(MyProtocol.versionID,null); } public String getlength(String str) { return String.valueOf(str.length()); } public int add(int a, int b) { return a+b; } } // rpc server package com.rpc; import java.io.IOException; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; public class MyServer { public static void main(String[] args) throws HadoopIllegalArgumentException, IOException { Configuration conf = new Configuration(); Server server = new RPC.Builder(conf).setProtocol(MyProtocol.class) .setInstance(new MyProtocolImp()).setBindAddress("0.0.0.0").setPort(8080) .setNumHandlers(10).build(); server.start(); } } // rpc客户端 package com.rpc; import java.io.IOException; import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; public class MyClient { public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); InetSocketAddress ip = new InetSocketAddress(8080); MyProtocol proxy = (MyProtocol)RPC.getProxy(MyProtocol.class, MyProtocol.versionID, ip,conf ); int sum = proxy.add(10, 2); System.out.println(sum); } }
2. 分析:
(1)
rpc 提供了可以修改序列化的方式。默认是:WritableRpcEngine
代码:
Rpc.class: static synchronized RpcEngine getProtocolEngine(Class<?> protocol, Configuration conf) { RpcEngine engine = PROTOCOL_ENGINES.get(protocol); if (engine == null) { Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), WritableRpcEngine.class); engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf); PROTOCOL_ENGINES.put(protocol, engine); } return engine; }
(2)
Rpc.Listener.class 负责监听 public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // Create a new server socket and set to non blocking mode acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // Bind the server socket to the local host and port bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig); port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; for (int i = 0; i < readThreads; i++) { Reader reader = new Reader( "Socket Reader #" + (i + 1) + " for port " + port); readers[i] = reader; reader.start(); } // Register accepts on the server socket with the selector. acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); }
hadoop2.0 rpc分析-1.0
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。