首页 > 代码库 > RPC

RPC

RPC

 

  1. 当客户端启动,它创建一个匿名的并且是exclusive的回调queue。

  2. 在一次RPC请求中,客户端发送的消息有两个属性:replyTo,放置的是回调queue的信息。correlationId,放置的是每个请求唯一的值。

  3. 请求被发送到一个rpc_queue中。

  4. RPC服务端在queue的另一端等待请求。当请求到来时,它处理任务并将消息的结果发送回客户端,使用replyTo中设置的queue。

  5. 客户端在回调queue中等待响应的数据,当消息出现时,它先检查correlationId属性。如果匹配的话就将结果返回到应用中。

Callback queue

使用RabbitMQ来进行RPC是非常简单的。客户端发送一个请求到服务端,服务端接收后返回响应的消息。为了接收到响应的消息,我们需要在请求中发送一个callback 的queue地址。我们可以使用默认的queue(在Java的client中它是exclusive的)。

 1 callbackQueueName = channel.queueDeclare().getQueue();
 2 
 3 BasicProperties props = new BasicProperties
 4                             .Builder()
 5                             .replyTo(callbackQueueName)
 6                             .build();
 7 
 8 channel.basicPublish("", "rpc_queue", props, message.getBytes());
 9 
10 // ... then code to read a response message from the callback_queue ...

Message properties

AMQP协议预定义了消息的14种属性。大部分的都很少使用,除了以下这些:

  • deliveryMode:标记一条消息是持久化的(使用值2)还是非持久化的(使用其它值)。在第二节中有过介绍。

  • contentType:用来描述mime类型的编码。例如使用JSON的话就这样设置属性:application/json

  • replyTo:一般用来命名一个回调queue。

  • correlationId:用来关联RPC的请求和响应。

在之前的方法中我们建议为每个RPC请求创建一个回调queue。这显得有点影响性能,幸运的是有一种更好的方式——每个客户端只创建一个回调queue。 但这产生了一个新问题,无法将相应的Response和Request对应起来。这个时候就需要用到correlationId属性。对于每个请求它都将有一个唯一的值。 当我们在回调queue中接收到消息之后,检查该属性,看是否与Request匹配。如果是一个未知的correlationId值,那么我们可以安全的忽略这条消息, 因为它不属于我们的请求。

你也许会问,为什么我们应该忽略回调queue中未知的消息而不是抛出异常?这是因为服务端可能会出现竞争条件。尽管不太常见,但是也有可能RPC server在发送响应后挂了, 并且也没有接收到客户端发送的ack。如果发生了这种情况,RPC server在重启后将会重新处理这个请求。这就是为什么在客户端我们需要优雅的处理重复的响应, RPC应该是幂等的。

 1 package com.rabbitmq.www.publish_subscribe.rpc;
 2 
 3 import com.rabbitmq.client.ConnectionFactory;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.Channel;
 6 import com.rabbitmq.client.DefaultConsumer;
 7 import com.rabbitmq.client.AMQP;
 8 import com.rabbitmq.client.Envelope;
 9 
10 import java.io.IOException;
11 import java.util.UUID;
12 import java.util.concurrent.ArrayBlockingQueue;
13 import java.util.concurrent.BlockingQueue;
14 import java.util.concurrent.TimeoutException;
15 
16 public class RPCClient {
17 
18   private Connection connection;
19   private Channel channel;
20   private String requestQueueName = "rpc_queue";
21   private String replyQueueName;
22   
23   private final static String HOST_ADDR = "172.18.112.102";
24 
25   public RPCClient() throws IOException, TimeoutException {
26     ConnectionFactory factory = new ConnectionFactory();
27     factory.setHost(HOST_ADDR);
28 
29     connection = factory.newConnection();
30     channel = connection.createChannel();
31 
32     replyQueueName = channel.queueDeclare().getQueue();
33   }
34 
35   public String call(String message) throws IOException, InterruptedException {
36     String corrId = UUID.randomUUID().toString();
37 
38     AMQP.BasicProperties props = new AMQP.BasicProperties
39             .Builder()
40             .correlationId(corrId)
41             .replyTo(replyQueueName)
42             .build();
43     //调用服务端请求(在制定的queue上发布消息)
44     channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
45 
46     final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
47 
48     channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
49       @Override
50       public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
51         if (properties.getCorrelationId().equals(corrId)) {
52           response.offer(new String(body, "UTF-8"));
53         }
54       }
55     });
56 
57     return response.take();
58   }
59 
60   public void close() throws IOException {
61     connection.close();
62   }
63 
64   public static void main(String[] argv) {
65     RPCClient fibonacciRpc = null;
66     String response = null;
67     try {
68       fibonacciRpc = new RPCClient();
69 
70       System.out.println(" [x] Requesting fib(30)");
71       response = fibonacciRpc.call("30");
72       System.out.println(" [.] Got ‘" + response + "‘");
73     }
74     catch  (IOException | TimeoutException | InterruptedException e) {
75       e.printStackTrace();
76     }
77     finally {
78       if (fibonacciRpc!= null) {
79         try {
80           fibonacciRpc.close();
81         }
82         catch (IOException _ignore) {}
83       }
84     }
85   }
86 }
package com.rabbitmq.www.publish_subscribe.rpc;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RPCServer {

  private static final String RPC_QUEUE_NAME = "rpc_queue";
  
  private final static String HOST_ADDR = "172.18.112.102";

  private static int fib(int n) {
    if (n ==0) return 0;
    if (n == 1) return 1;
    return fib(n-1) + fib(n-2);
  }

  public static void main(String[] argv) {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(HOST_ADDR);

    Connection connection = null;
    try {
      connection      = factory.newConnection();
      Channel channel = connection.createChannel();

      channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

      channel.basicQos(1);

      System.out.println(" [x] Awaiting RPC requests");

      Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
          AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                  .Builder()
                  .correlationId(properties.getCorrelationId())
                  .build();

          String response = "";

          try {
            String message = new String(body,"UTF-8");
            int n = Integer.parseInt(message);

            System.out.println(" [.] fib(" + message + ")");
            response += fib(n);
          }
          catch (RuntimeException e){
            System.out.println(" [.] " + e.toString());
          }
          finally {
            channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));

            channel.basicAck(envelope.getDeliveryTag(), false);
          }
        }
      };

      channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

      //loop to prevent reaching finally block
      while(true) {
        try {
          Thread.sleep(100);
        } catch (InterruptedException _ignore) {}
      }
    } catch (IOException | TimeoutException e) {
      e.printStackTrace();
    }
    finally {
      if (connection != null)
        try {
          connection.close();
        } catch (IOException _ignore) {}
    }
  }
}

server端的代码非常直观:

  • 首先创建一个连接、channel和声明一个queue。

  • 我们也许想要运行不止一个服务端进程。为了在多个server间做到负载均衡,通过channel.basicQos设置prefetchCount

  • 我们使用basicConsume来进入queue。然后使用无限循环来等待请求的消息,处理之后再返回响应。

客户端代码有一点点的复杂:

  • 我们创建连接和channel,以及声明一个exclusive的回调queue用来接收响应的消息。

  • 订阅回调queue,这样就可以接收到RPC服务端响应的消息。

  • call方法发出一个RPC请求。

  • 我们首先生成一个唯一的correlationId数字并且保存它——在while循环中使用它来匹配相应的response。

  • 下一步,发送请求的消息,使用两个属性:replyTocorrelationId

  • 之后就是等待响应的消息返回。

  • 在while循环中做了一些简单的工作,检查响应的消息的correlationId是否与Request相匹配。如果是的话,则保存响应。

  • 最终向用户返回响应。

 

RPC