首页 > 代码库 > RPC与hadoop

RPC与hadoop

技术分享

rlgdj的这样的话,真正的实现类在Server端,客户端调用方法的时候,只能得到得到从Server端的返回值。看来接口中的抽象方法必须要有返回值啊。ps。右下角的Client端的main()中rpc.refer()方法,返回一个继承了Proxy实现了HelloService的一个代理类。再调用里面的invoke方法。具体可以去看代理模式。

附上源码:

 

 1 package test.PRC;
 2 
 3 public class Client {
 4     public static void main(String[] args) throws Exception {    
 5         HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234);    
 6         for (int i = 0; i < Integer.MAX_VALUE; i ++) {    
 7             String hello = service.hello("World" + i);    
 8             System.out.println(hello);    
 9             Thread.sleep(1000);    
10         }    
11     }    
12 
13 }

 

服务端:

 1 package test.PRC;
 2 
 3 public class Server {
 4 
 5 
 6     public static void main(String []args) throws Exception {  
 7             HelloService service = new HelloServiceImpl();  
 8             RpcFramework.export(service, 1234);   
 9     }  
10 
11 
12 }

rpc框架:

  1 package test.PRC;
  2 
  3 import java.io.ObjectInputStream;
  4 import java.io.ObjectOutputStream;
  5 import java.lang.reflect.InvocationHandler;
  6 import java.lang.reflect.Method;
  7 import java.lang.reflect.Proxy;
  8 import java.net.ServerSocket;
  9 import java.net.Socket;
 10 
 11 public class RpcFramework {
 12     /** 
 13     * 暴露服务 
 14     *  
 15     * @param service 服务实现 
 16     * @param port 服务端口 
 17     * @throws Exception 
 18     */  
 19    public static void export(final Object service, int port) throws Exception {  
 20        if (service == null)  
 21             throw new IllegalArgumentException("service instance == null");  
 22         if (port <= 0 || port > 65535)  
 23             throw new IllegalArgumentException("Invalid port " + port);  
 24         System.out.println("Export service " + service.getClass().getName() + " on port " + port);  
 25         ServerSocket server = new ServerSocket(port);  
 26         for(;;) {  
 27             try {  
 28                 final Socket socket = server.accept();//服务器端一旦收到消息,就创建一个线程进行处理  
 29                 new Thread(new Runnable() {  
 30                     public void run() {  
 31                         try {  
 32                             try {  
 33                                 ObjectInputStream input = new ObjectInputStream(socket.getInputStream());  
 34                                 try {  
 35                                     String methodName = input.readUTF();//service是服务器端提供服务的对象,但是,要通过获取到的调用方法的名称,参数类型,以及参数来选择对象的方法,并调用。获得方法的名称  
 36                                     Class<?>[] parameterTypes = (Class<?>[])input.readObject();//获得参数的类型  
 37                                     Object[] arguments = (Object[])input.readObject();//获得参数  
 38                                     ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());  
 39                                     try {  
 40                                         Method method = service.getClass().getMethod(methodName, parameterTypes);//通过反射机制获得方法  
 41                                         Object result = method.invoke(service, arguments);//通过反射机制获得类的方法,并调用这个方法  
 42                                         output.writeObject(result);//将结果发送  
 43                                     } catch (Throwable t) {  
 44                                         output.writeObject(t);  
 45                                     } finally {  
 46                                         output.close();  
 47                                     }  
 48                                 } finally {  
 49                                     input.close();  
 50                                 }  
 51                             } finally {  
 52                                 socket.close();  
 53                             }  
 54                         } catch (Exception e) {  
 55                             e.printStackTrace();  
 56                         }  
 57                     }  
 58                 }).start();  
 59             } catch (Exception e) {  
 60                 e.printStackTrace();  
 61             }  
 62         }  
 63     }  
 64   
 65     /** 
 66      * 引用服务 
 67      *  
 68      * @param <T> 接口泛型 
 69      * @param interfaceClass 接口类型 
 70      * @param host 服务器主机名 
 71      * @param port 服务器端口 
 72      * @return 远程服务 
 73      * @throws Exception 
 74      *///原理是通过代理,获得服务器端接口的一个“代理”的对象。对这个对象的所有操作都会调用invoke函数,在invoke函数中,是将被调用的函数名,参数列表和参数发送到服务器,并接收服务器处理的结果  
 75     @SuppressWarnings("unchecked")  
 76     public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception {  
 77         if (interfaceClass == null)  
 78             throw new IllegalArgumentException("Interface class == null");  
 79         if (! interfaceClass.isInterface())  
 80             throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");  
 81         if (host == null || host.length() == 0)  
 82             throw new IllegalArgumentException("Host == null!");  
 83         if (port <= 0 || port > 65535)  
 84             throw new IllegalArgumentException("Invalid port " + port);  
 85         System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);  
 86         return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new InvocationHandler() {  
 87             public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {  
 88                 Socket socket = new Socket(host, port);  
 89                 try {  
 90                     ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());  
 91                     try {  
 92                         output.writeUTF(method.getName());  
 93                         output.writeObject(method.getParameterTypes());  
 94                         output.writeObject(arguments);  
 95                         ObjectInputStream input = new ObjectInputStream(socket.getInputStream());  
 96                         try {  
 97                             Object result = input.readObject();  
 98                             if (result instanceof Throwable) {  
 99                                 throw (Throwable) result;  
100                             }  
101                             return result;  
102                         } finally {  
103                             input.close();  
104                         }  
105                     } finally {  
106                         output.close();  
107                     }  
108                 } finally {  
109                     socket.close();  
110                  }  
111              }  
112          });  
113      }  
114 
115 
116 }

接口:

package test.PRC;

public interface HelloService {

    String hello(String name);  
}

实现类:

 1 package test.PRC;
 2 
 3 public class HelloServiceImpl implements HelloService{
 4 
 5     public String hello(String name) {  
 6         return "Hello " + name;  
 7     }  
 8 
 9 
10 }

具体情况具体部署。

 

RPC与hadoop