首页 > 代码库 > RPC与hadoop
RPC与hadoop
rlgdj的这样的话,真正的实现类在Server端,客户端调用方法的时候,只能得到得到从Server端的返回值。看来接口中的抽象方法必须要有返回值啊。ps。右下角的Client端的main()中rpc.refer()方法,返回一个继承了Proxy实现了HelloService的一个代理类。再调用里面的invoke方法。具体可以去看代理模式。
附上源码:
1 package test.PRC; 2 3 public class Client { 4 public static void main(String[] args) throws Exception { 5 HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234); 6 for (int i = 0; i < Integer.MAX_VALUE; i ++) { 7 String hello = service.hello("World" + i); 8 System.out.println(hello); 9 Thread.sleep(1000); 10 } 11 } 12 13 }
服务端:
1 package test.PRC; 2 3 public class Server { 4 5 6 public static void main(String []args) throws Exception { 7 HelloService service = new HelloServiceImpl(); 8 RpcFramework.export(service, 1234); 9 } 10 11 12 }
rpc框架:
1 package test.PRC; 2 3 import java.io.ObjectInputStream; 4 import java.io.ObjectOutputStream; 5 import java.lang.reflect.InvocationHandler; 6 import java.lang.reflect.Method; 7 import java.lang.reflect.Proxy; 8 import java.net.ServerSocket; 9 import java.net.Socket; 10 11 public class RpcFramework { 12 /** 13 * 暴露服务 14 * 15 * @param service 服务实现 16 * @param port 服务端口 17 * @throws Exception 18 */ 19 public static void export(final Object service, int port) throws Exception { 20 if (service == null) 21 throw new IllegalArgumentException("service instance == null"); 22 if (port <= 0 || port > 65535) 23 throw new IllegalArgumentException("Invalid port " + port); 24 System.out.println("Export service " + service.getClass().getName() + " on port " + port); 25 ServerSocket server = new ServerSocket(port); 26 for(;;) { 27 try { 28 final Socket socket = server.accept();//服务器端一旦收到消息,就创建一个线程进行处理 29 new Thread(new Runnable() { 30 public void run() { 31 try { 32 try { 33 ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); 34 try { 35 String methodName = input.readUTF();//service是服务器端提供服务的对象,但是,要通过获取到的调用方法的名称,参数类型,以及参数来选择对象的方法,并调用。获得方法的名称 36 Class<?>[] parameterTypes = (Class<?>[])input.readObject();//获得参数的类型 37 Object[] arguments = (Object[])input.readObject();//获得参数 38 ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); 39 try { 40 Method method = service.getClass().getMethod(methodName, parameterTypes);//通过反射机制获得方法 41 Object result = method.invoke(service, arguments);//通过反射机制获得类的方法,并调用这个方法 42 output.writeObject(result);//将结果发送 43 } catch (Throwable t) { 44 output.writeObject(t); 45 } finally { 46 output.close(); 47 } 48 } finally { 49 input.close(); 50 } 51 } finally { 52 socket.close(); 53 } 54 } catch (Exception e) { 55 e.printStackTrace(); 56 } 57 } 58 }).start(); 59 } catch (Exception e) { 60 e.printStackTrace(); 61 } 62 } 63 } 64 65 /** 66 * 引用服务 67 * 68 * @param <T> 接口泛型 69 * @param interfaceClass 接口类型 70 * @param host 服务器主机名 71 * @param port 服务器端口 72 * @return 远程服务 73 * @throws Exception 74 *///原理是通过代理,获得服务器端接口的一个“代理”的对象。对这个对象的所有操作都会调用invoke函数,在invoke函数中,是将被调用的函数名,参数列表和参数发送到服务器,并接收服务器处理的结果 75 @SuppressWarnings("unchecked") 76 public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception { 77 if (interfaceClass == null) 78 throw new IllegalArgumentException("Interface class == null"); 79 if (! interfaceClass.isInterface()) 80 throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!"); 81 if (host == null || host.length() == 0) 82 throw new IllegalArgumentException("Host == null!"); 83 if (port <= 0 || port > 65535) 84 throw new IllegalArgumentException("Invalid port " + port); 85 System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port); 86 return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new InvocationHandler() { 87 public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { 88 Socket socket = new Socket(host, port); 89 try { 90 ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); 91 try { 92 output.writeUTF(method.getName()); 93 output.writeObject(method.getParameterTypes()); 94 output.writeObject(arguments); 95 ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); 96 try { 97 Object result = input.readObject(); 98 if (result instanceof Throwable) { 99 throw (Throwable) result; 100 } 101 return result; 102 } finally { 103 input.close(); 104 } 105 } finally { 106 output.close(); 107 } 108 } finally { 109 socket.close(); 110 } 111 } 112 }); 113 } 114 115 116 }
接口:
package test.PRC; public interface HelloService { String hello(String name); }
实现类:
1 package test.PRC; 2 3 public class HelloServiceImpl implements HelloService{ 4 5 public String hello(String name) { 6 return "Hello " + name; 7 } 8 9 10 }
具体情况具体部署。
RPC与hadoop
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。