首页 > 代码库 > RPC框架几行代码就够了

RPC框架几行代码就够了

转载:http://javatar.iteye.com/blog/1123915#bc2395513

  原作者为梁飞,dubbo的核心成员。本人认为这段代码不长,但是非常利于对RPC的理解,自己受益匪浅。

   java代码:

  

  1 /*
  2  * Copyright 2011 Alibaba.com All right reserved. This software is the
  3  * confidential and proprietary information of Alibaba.com ("Confidential
  4  * Information"). You shall not disclose such Confidential Information and shall
  5  * use it only in accordance with the terms of the license agreement you entered
  6  * into with Alibaba.com.
  7  */
  8 package com.alibaba.study.rpc.framework;
  9 
 10 import java.io.ObjectInputStream;
 11 import java.io.ObjectOutputStream;
 12 import java.lang.reflect.InvocationHandler;
 13 import java.lang.reflect.Method;
 14 import java.lang.reflect.Proxy;
 15 import java.net.ServerSocket;
 16 import java.net.Socket;
 17 
 18 /**
 19  * RpcFramework
 20  * 
 21  * @author william.liangf
 22  */
 23 public class RpcFramework {
 24 
 25     /**
 26      * 暴露服务
 27      * 
 28      * @param service 服务实现
 29      * @param port 服务端口
 30      * @throws Exception
 31      */
 32     public static void export(final Object service, int port) throws Exception {
 33         if (service == null)
 34             throw new IllegalArgumentException("service instance == null");
 35         if (port <= 0 || port > 65535)
 36             throw new IllegalArgumentException("Invalid port " + port);
 37         System.out.println("Export service " + service.getClass().getName() + " on port " + port);
 38         ServerSocket server = new ServerSocket(port);
 39         for(;;) {
 40             try {
 41                 final Socket socket = server.accept();
 42                 new Thread(new Runnable() {
 43                     @Override
 44                     public void run() {
 45                         try {
 46                             try {
 47                                 ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
 48                                 try {
 49                                     String methodName = input.readUTF();
 50                                     Class<?>[] parameterTypes = (Class<?>[])input.readObject();
 51                                     Object[] arguments = (Object[])input.readObject();
 52                                     ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
 53                                     try {
 54                                         Method method = service.getClass().getMethod(methodName, parameterTypes);
 55                                         Object result = method.invoke(service, arguments);
 56                                         output.writeObject(result);
 57                                     } catch (Throwable t) {
 58                                         output.writeObject(t);
 59                                     } finally {
 60                                         output.close();
 61                                     }
 62                                 } finally {
 63                                     input.close();
 64                                 }
 65                             } finally {
 66                                 socket.close();
 67                             }
 68                         } catch (Exception e) {
 69                             e.printStackTrace();
 70                         }
 71                     }
 72                 }).start();
 73             } catch (Exception e) {
 74                 e.printStackTrace();
 75             }
 76         }
 77     }
 78 
 79     /**
 80      * 引用服务
 81      * 
 82      * @param <T> 接口泛型
 83      * @param interfaceClass 接口类型
 84      * @param host 服务器主机名
 85      * @param port 服务器端口
 86      * @return 远程服务
 87      * @throws Exception
 88      */
 89     @SuppressWarnings("unchecked")
 90     public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception {
 91         if (interfaceClass == null)
 92             throw new IllegalArgumentException("Interface class == null");
 93         if (! interfaceClass.isInterface())
 94             throw new IllegalArgumentException("The " + interfaceClass.getName() + " must be interface class!");
 95         if (host == null || host.length() == 0)
 96             throw new IllegalArgumentException("Host == null!");
 97         if (port <= 0 || port > 65535)
 98             throw new IllegalArgumentException("Invalid port " + port);
 99         System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port);
100         return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new InvocationHandler() {
101             public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
102                 Socket socket = new Socket(host, port);
103                 try {
104                     ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
105                     try {
106                         output.writeUTF(method.getName());
107                         output.writeObject(method.getParameterTypes());
108                         output.writeObject(arguments);
109                         ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
110                         try {
111                             Object result = input.readObject();
112                             if (result instanceof Throwable) {
113                                 throw (Throwable) result;
114                             }
115                             return result;
116                         } finally {
117                             input.close();
118                         }
119                     } finally {
120                         output.close();
121                     }
122                 } finally {
123                     socket.close();
124                 }
125             }
126         });
127     }
128 
129 }

定义服务接口 :

 1 ackage com.alibaba.study.rpc.test;  
 2   
 3 /** 
 4  * HelloService 
 5  *  
 6  * @author william.liangf 
 7  */  
 8 public interface HelloService {  
 9   
10     String hello(String name);  
11   
12 }  

 实现服务 :

 1  package com.alibaba.study.rpc.test;  
 2   
 3 /** 
 4  * HelloServiceImpl 
 5  *  
 6  * @author william.liangf 
 7  */  
 8 public class HelloServiceImpl implements HelloService {  
 9   
10     public String hello(String name) {  
11         return "Hello " + name;  
12     }  
13   
14 }  

服务提供者:

 1 package com.alibaba.study.rpc.test;
 2 
 3 import com.alibaba.study.rpc.framework.RpcFramework;
 4 
 5 /**
 6  * RpcProvider
 7  * 
 8  * @author william.liangf
 9  */
10 public class RpcProvider {
11 
12     public static void main(String[] args) throws Exception {
13         HelloService service = new HelloServiceImpl();
14         RpcFramework.export(service, 1234);
15     }
16 
17 }

服务消费者:

 1 /*
 2  * Copyright 2011 Alibaba.com All right reserved. This software is the
 3  * confidential and proprietary information of Alibaba.com ("Confidential
 4  * Information"). You shall not disclose such Confidential Information and shall
 5  * use it only in accordance with the terms of the license agreement you entered
 6  * into with Alibaba.com.
 7  */
 8 package com.alibaba.study.rpc.test;
 9 
10 import com.alibaba.study.rpc.framework.RpcFramework;
11 
12 /**
13  * RpcConsumer
14  * 
15  * @author william.liangf
16  */
17 public class RpcConsumer {
18     
19     public static void main(String[] args) throws Exception {
20         HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234);
21         for (int i = 0; i < Integer.MAX_VALUE; i ++) {
22             String hello = service.hello("World" + i);
23             System.out.println(hello);
24             Thread.sleep(1000);
25         }
26     }
27     
28 }

 

RPC框架几行代码就够了