首页 > 代码库 > RabbitMQ (消息队列)专题学习05 routing(路由)

RabbitMQ (消息队列)专题学习05 routing(路由)

(使用Java客户端)

一、概述

在前面的学习中,构建了一个简单的日志记录系统,能够广播所有的日志给多个接收者,在该部分学习中,将添加一个新的特点,就是可以只订阅一个特定的消息源,也就是说能够直接把关键的错误日志消息发送到日志文件保存起来,不重要的日志信息文件不保存在磁盘中,但是仍然能够在控制台输出,那么这便是我们这部分要学习的消息的路由分发机制。

二、路由功能实现

2.1、绑定(bindings)

在前面的学习中已经创建了绑定(bindings),代码如下:

channel.queueBind(queueName, EXCHANGE_NAME, "")

一个绑定就是一个关于exchange和queue的关系,它可以简单的被理解为:队列是从这个exchange中获取消息的。

绑定可以采取一个额外的routingKey的参数,为了避免与basicPublish参数冲突,称之为一个绑定Key,这是如何创建一个带routingKey的绑定的关键。

channel.queueBind(queueName, EXCHANGE_NAME, "black");
一个绑定Key依赖于exchange的类型,像之前使用fanout类型的exchange,完全忽略了该绑定key的值。

2.2、直接交换(Direct exchange)

前面实现的日志记录系统中广播所有的消息给所有的消费者,现在对其进行扩展,允许根据信息的严重程度来对消息进行过滤,比如,希望一个程序写入到磁盘的日志消息只接收错误的消息,而不是浪费磁盘保存所有的日志消息。

为了实现这个目标,使用一个fanout类型的exchange,显然是不能够满足这样的需求的,因为它只能广播所有的消息。

为此将使用一个direct exchange来代替fanout exchange,direct exchange使用简单的路由算法,将消息通过绑定的Key匹配将要到达的队列。

-1

从上面的结构图中可以看出direct exchange X绑定着两个queue(Q1,Q2),第一个queue绑定的routingKey为orange,第二个有两个routingKey被绑定,一个routingKey为black,另外一个routingKey为green.

说明:发送带有routingKey为orange的消息到X(exchange)中,X将该消息路由到Q1中,发送带有routingKey为black和green的消息都将被路由到Q2中,其他所有消息将会被丢弃。

2.3、多绑定(Multiple bindings)

图-2

多个队列绑定相同的routingKey是允许的,在上述实例中,可以把X和Q1用routingKey:black绑定起来,这种情况下,direct exchange将像fanout类型的exchange一样会将消息广播都到所有匹配的queues中,即一个routingKey为black的消息将会被发送到Q1和Q2中。

2.4、发送的日志

使用direct代替fanout类型的exchange,发送消息到一个direct exchange中,将根据消息的重要程度作为routingKey,这样接收程序能够选择它想要接收的日志信息,首先必须先创建一个exchange.

channel.exchangeDeclare(EXCHANGE_NAME, "direct");
其次,发送一条信息:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
为了简化程序,将severity设定为info、warning、error三种类型中的一种。

2.5、订阅消息(Subscribing Message)

接收者根据自己感兴趣的severity来创建一个新到的绑定。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){    
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
2.6、代码实现

-3

EmitLogDirect.java代码清单如下:

package com.xuz.route;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLogDirect {
	private static final String EXCHANGE_NAME = "direct_logs";
	
	public static void main(String[] args) throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		Connection conn = factory.newConnection();
		Channel channel = conn.createChannel();
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		//日志类型
		String[] msgType = new String[]{"info","warning","error"};
		String severity = getSeverity(msgType);
		//测试信息
		String[] msg = new String[]{"xuz RabbitMQ Routing Test!","very Good!","This is a Info"};
		String message = getMessage(msg);
		channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
		System.out.println("send:["+severity+":"+message+"]");
		channel.close();
		conn.close();
	}

	private static String getMessage(String[] strings) {
		if(strings.length<2){
			return "Hello World!";
		}
		return joinStrings(strings,"",1);
	}

	private static String joinStrings(String[] strings, String string,int startIndex) {
		int length = strings.length;
	    if (length == 0 ) return "";
	    if (length < startIndex ) return "";
	    StringBuilder words = new StringBuilder(strings[startIndex]);
	    for (int i = startIndex + 1; i < length; i++) {
	        words.append(string).append(strings[i]);
	    }
	    return words.toString();
	}

	private static String getSeverity(String[] strings) {
		if(strings.length<1){
			return "info";
		}
		return strings[0];
	}
}

ReceiveLogsDirect代码清单如下:

package com.xuz.route;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
 * 接收所有类型的消息
 * @author Administrator
 *
 */
public class ReceiveLogsDirectAll {
	private static final String EXCHANGE_NAME = "direct_logs";
	
	public static void main(String[] args) throws Exception {
		ConnectionFactory factory = new ConnectionFactory();
	    factory.setHost("127.0.0.1");
	    Connection connection = factory.newConnection();
	    Channel channel = connection.createChannel();
	    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
	    String queueName = channel.queueDeclare().getQueue();
	    String[] msgType = new String[]{"info","warning","error"};
	    if (msgType.length < 1){
	      System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
	      System.exit(1);
	    }
	    /**
	     * 绑定多种类型包括:info、warning、error
	     */
	    for(String severity : msgType){    
	      channel.queueBind(queueName, EXCHANGE_NAME, severity);
	    }
	    
	    System.out.println(" ReceiveLogsDirectAll---->Waiting for messages. To exit press CTRL+C");
	    QueueingConsumer consumer = new QueueingConsumer(channel);
	    channel.basicConsume(queueName, true, consumer);
	    while (true) {
	      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
	      String message = new String(delivery.getBody());
	      String routingKey = delivery.getEnvelope().getRoutingKey();
	      System.out.println("ReceiveLogsDirectAll----->Received ‘" + routingKey + "‘:‘" + message + "‘");   
	    }
	}
}

2.7、测试Routing

根据上述代码清单,接收者我们需要做一些修改,改为ReceiveLogsDirectInfoReceiveLogsDirectWarningReceiveLogsDirectAllReceiveLogsDirectError,分别用来接收消息、警告、所有、错误等类型的消息。

1、分别启动ReceiveLogsDirectInfo、ReceiveLogsDirectWarning、ReceiveLogsDirectAll、ReceiveLogsDirectError类。

2、启动EmitLogDirect类,此时serverity类型为info

此时只有ReceiveLogsDirectInfo和ReceiveLogsDirectAll接收到了信息。

其他的两个类型的接收者没有接收到消息:

其他两种类型的消息不再演示,有兴趣的朋友下载源码自己研究。只要将EmitLogDirect类稍微改造一下代码即可。

消息发送者会根据serverity类型来将要发送的消息路由到指定的队列中,消费者会同样根据指定的serverity类型匹配对应的queue中取出相对应的消息。达到了准确的路由消息传递功能。


源码下载:

基于RabbitMQ消息队列的路由分发