首页 > 代码库 > hadoop2.0 rpc分析-1.0

hadoop2.0 rpc分析-1.0

  1. 使用的实例:

package com.rpc;
import org.apache.hadoop.ipc.VersionedProtocol;
public interface  MyProtocol extends VersionedProtocol{
public static final long versionID=1L;
String getlength(String str);
int add(int a, int b);
}
package com.rpc;
import java.io.IOException;
import org.apache.hadoop.ipc.ProtocolSignature;
public class MyProtocolImp  implements MyProtocol{
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
// TODO Auto-generated method stub
return MyProtocol.versionID;
}
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return new ProtocolSignature(MyProtocol.versionID,null);
}
public String getlength(String str) {
return String.valueOf(str.length());
}
public int add(int a, int b) {
return a+b;
}
}
// rpc server
package com.rpc;
import java.io.IOException;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
public class MyServer {
public static void main(String[] args) throws HadoopIllegalArgumentException, IOException {
Configuration conf = new Configuration();
Server server = new RPC.Builder(conf).setProtocol(MyProtocol.class)
.setInstance(new MyProtocolImp()).setBindAddress("0.0.0.0").setPort(8080)
.setNumHandlers(10).build();
server.start();
}
}
// rpc客户端
package com.rpc;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
public class MyClient {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
InetSocketAddress ip = new InetSocketAddress(8080);
MyProtocol proxy = (MyProtocol)RPC.getProxy(MyProtocol.class, MyProtocol.versionID, ip,conf );
int sum = proxy.add(10, 2);
System.out.println(sum);
}
}

2. 分析:

(1)

rpc 提供了可以修改序列化的方式。默认是:WritableRpcEngine

代码:

Rpc.class:
static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
      Configuration conf) {
    RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
    if (engine == null) {
      Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
                                    WritableRpcEngine.class);
      engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
      PROTOCOL_ENGINES.put(protocol, engine);
    }
    return engine;
  }

(2)

Rpc.Listener.class 负责监听
 public Listener() throws IOException {
      address = new InetSocketAddress(bindAddress, port);
      // Create a new server socket and set to non blocking mode
      acceptChannel = ServerSocketChannel.open();
      acceptChannel.configureBlocking(false);

      // Bind the server socket to the local host and port
      bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
      // create a selector;
      selector= Selector.open();
      readers = new Reader[readThreads];
      for (int i = 0; i < readThreads; i++) {
        Reader reader = new Reader(
            "Socket Reader #" + (i + 1) + " for port " + port);
        readers[i] = reader;
        reader.start();
      }

      // Register accepts on the server socket with the selector.
      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
      this.setName("IPC Server listener on " + port);
      this.setDaemon(true);
    }


hadoop2.0 rpc分析-1.0