首页 > 代码库 > Hadoop中客户端和服务器端的方法调用过程

Hadoop中客户端和服务器端的方法调用过程

Java 动态代理一个简单的demo:(用以对比Hadoop中的动态代理)

Hello接口:

public interface Hello {        void sayHello(String to);        void print(String p);   }

Hello接口的实现类:

技术分享
public class HelloImpl implements Hello {            public void sayHello(String to) {          System.out.println("Say hello to " + to);      }            public void print(String s) {          System.out.println("print : " + s);      }        }
技术分享

与代理类(HelloImpl类)相关联的InvocationHandler对象

技术分享
public class LogHandler implements InvocationHandler {            private Object dele;            public LogHandler(Object obj) {          this.dele = obj;      }            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {          doBefore();          //在这里完全可以把下面这句注释掉,而做一些其它的事情          Object result = method.invoke(dele, args);          after();          return result;      }            private void doBefore() {          System.out.println("before....");      }            private void after() {          System.out.println("after....");      }  }
技术分享

最后测试代码如下:

技术分享
public class ProxyTest {        public static void main(String[] args) {          HelloImpl impl = new HelloImpl();          LogHandler handler = new LogHandler(impl);          //这里把handler与impl新生成的代理类相关联          Hello hello = (Hello) Proxy.newProxyInstance(impl.getClass().getClassLoader(), impl.getClass().getInterfaces(), handler);                    //这里无论访问哪个方法,都是会把请求转发到handler.invoke          hello.print("All the test");          hello.sayHello("Denny");      }    }
技术分享
 

Hadoop中的动态代理---客户端方法调用过程

IPC客户端的处理比动态代理实例稍微复杂:代理对象上的调用被InvocationHandler捕获后,请求被打包并通过IPC连接发送到服务器上,客户端等待并在服务器的处理应答到达后,生成并返回调用结果。IPC上的调用是个同步操作,即,线程会一直等待调用结束,才会开始后续处理;而网络的处理时异步的,请求发送后,不需要等待应答。客户端通过java的wait()/notify()机制简单地解决了异步网络处理和同步IPC调用的差异。

 

Hadoop对外提供查询文件状态的接口,如下:

public interface IPCQueryStatus extends VersionedProtocol {    IPCFileStatus getFileStatus(String filename);}

客户端通过如下代码调用:

IPCQueryStatus query = (IPCQueryStatus) RPC.getProxy(IPCQueryStatus.class, IPCQueryServer.IPC_VER, addr, new Configuration());IPCFileStatus status = query.getFileStatus("\tmp\testIPC");

在RPC的getProxy代码如下:

public static VersionedProtocol getProxy(      Class<? extends VersionedProtocol> protocol,      long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,      Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException {    ......    VersionedProtocol proxy =        (VersionedProtocol) Proxy.newProxyInstance(            protocol.getClassLoader(), new Class[] { protocol },            new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));    ......    return proxy;    ......  }
需要制定一个InvocationHandler,对于所有的调用请求,这个InvocationHandler都是Invoke,如下:

private static class Invoker implements InvocationHandler {    private Client.ConnectionId remoteId;// 用来标示一个connection,用以复用    private Client client;//最重要的成员变量,RPC客户端    private boolean isClosed = false;    public Invoker(Class<? extends VersionedProtocol> protocol,        InetSocketAddress address, UserGroupInformation ticket,        Configuration conf, SocketFactory factory,        int rpcTimeout) throws IOException {      this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,          ticket, rpcTimeout, conf);      this.client = CLIENTS.getClient(conf, factory);//    }    ......        public Object invoke(Object proxy, Method method, Object[] args)      ......            ObjectWritable value = (ObjectWritable)        client.call(new Invocation(method, args), remoteId);      ......            return value.get();    }}

在上面的代码中,client负责发送IPC请求,并获取结果,类似最上面demo中LogHandler中的dele。

如下为client.call方法:

public Writable call(Writable param, ConnectionId remoteId)                         throws InterruptedException, IOException {    Call call = new Call(param);    Connection connection = getConnection(remoteId, call);    connection.sendParam(call);                 // send the parameter    ...    synchronized (call) {      while (!call.done) {        try {          call.wait();                           // wait for the result        } catch (InterruptedException ie) {          ...        }      }      ...      if (call.error != null) {        ...        throw call.error;        ...      } else {        return call.value;      }    }}

connection.sendParam后,会再调用receiveMessage来获取返回结果。如下:

private class Connection extends Thread {    ......        public void run() {        ......        while (waitForWork()) {//wait here for work - read or close connection        receiveResponse();        }        ......    }    ......    private void receiveResponse() {      ......      touch();            try {        int id = in.readInt();                    // try to read an id        ......        Call call = calls.get(id);        int state = in.readInt();     // read call status        if (state == Status.SUCCESS.state) {          Writable value = ReflectionUtils.newInstance(valueClass, conf);          value.readFields(in);                 // read value          call.setValue(value);          calls.remove(id);        } else if (state == Status.ERROR.state) {          call.setException(new RemoteException(WritableUtils.readString(in),                                                WritableUtils.readString(in)));          calls.remove(id);        } else if (state == Status.FATAL.state) {          // Close the connection          markClosed(new RemoteException(WritableUtils.readString(in),                                          WritableUtils.readString(in)));        }      } catch (IOException e) {        markClosed(e);      }    }}

connection会调用call的setValue或者setException,两个方法都会调用callComplete方法,来调用notify通知进程IPC调用已结束

protected synchronized void callComplete() {      this.done = true;      notify();                                 // notify caller    }    public synchronized void setException(IOException error) {      this.error = error;      callComplete();    }            public synchronized void setValue(Writable value) {      this.value =http://www.mamicode.com/ value;      callComplete();    }

 

服务器端方法调用过程

服务端由Listener接收,Listener主要运行NIO选择器循环,并在Listener.doRead()方法中读取数据,Connection.readAndProcess()中恢复数据帧,然后调用processData().

void Listener.doRead(SelectionKey key) throws InterruptedException {    int count = 0;    Connection c = (Connection)key.attachment();    ...    count = c.readAndProcess();    ...      }public int Connection.readAndProcess() throws IOException, InterruptedException {    ......    processOneRpc(data.array());    ......}private void Connection.processOneRpc(byte[] buf) throws IOException,        InterruptedException {    if (headerRead) {        processData(buf);    } else {        processHeader(buf);        ......    }}private void Connection.processData(byte[] buf) throws  IOException, InterruptedException {    DataInputStream dis =        new DataInputStream(new ByteArrayInputStream(buf));    int id = dis.readInt();                    // try to read an id    ......    Writable param = ReflectionUtils.newInstance(paramClass, conf);//★??paramClass在哪儿设置的★    param.readFields(dis);            Call call = new Call(id, param, this);    callQueue.put(call);              // queue the call; maybe blocked here}

ProcessData反序列化调用参数,构造服务器端的Call对象。然后放入callQueue队列中。callQueue阻塞队列定义于Server类中,是Listener和Handler的边界。(生产者Listener消费者Handler)。

Hadoop中客户端和服务器端的方法调用过程