首页 > 代码库 > RabbitMQ (消息队列)专题学习06 Topic
RabbitMQ (消息队列)专题学习06 Topic
(使用Java客户端)
一、概述
在路由消息分发的学习中,对日志记录系统做了改进,使用direct exchange来替换fanout exchange进行消息分发,可以使日志系统有了直接、并且可以有选择的接收消息。
尽管使用direct exchange改进了系统,但是它仍然有局限性,就是不能根据多个标准来分发消息。
在日志系统中,我们也许想订阅的不仅仅是基于日志消息的严重程度,而且可能是基于日志消息的发送源。
这将给我们带来很多的灵活,我可能想坚挺的错误来自"cron"的消息源,而不是来自"kern"消息源发送的所有消息。
为了按照上述的要求改进我们的日志系统,所以我们需要学习一种更为复杂的exchange.
二、实现步骤
2.1、Topic exchange
消息被发送到一个topic exchange不能使用一个随意的routingKey,而它必须是一个由点号(.)隔开的单词列表,这些单词可以使任何字符,但是它们通常有一些特性用来指定分发的消息,一些有效的routingKey的例子,比如"stock.mus.nyse","nyse.vmw","quick.orange.rabbit",可以用自己喜欢的许多单词来作为routingKey,但是最多不能超过255个字符。
绑定Key必须是相同的形式,topic exchange的逻辑绑定类似与一个direct exchange的丁丁,一个被发送的消息带着一个特定的routingKey被传递到所有与之匹配的routingKey的queues中,但是topic exchange有两个特殊的情况。
1、*(星号)可以代替一个确切的词
2、#(井号)可以替换零个或者多个词
以下是一个最容易的例子的说明:
图-1
说明:
在这个实例中,要发送描述动物的所有消息,该消息将routingKey包含三个单词,两个点号,在routingKey的第一部分描述速度、第二部描述颜色、第三部分描述种类。格式为:"<speed>.<colour>.<species>"。
创建了三个绑定,Q1绑定key为“*.orange.*”,Q2队列的绑定key为"*.*.rabbit"和"lazy.#".
这些绑定可以概括为:
>Q1是一对所有orange的舞动感兴趣的队列。
>Q2项坚挺关于兔子和懒惰的动物一切消息的队列。
若一个消息绑定的key设置为"quick.orange.rabbit"将被发送到所有队列,因为它匹配所有消息队列的绑定关系,消息绑定key为"lazy.orange.elephant"也将被发送到所有的消息队列中,另外一方面,若某条消息绑定key为"quick.orange.fox"仅仅将被发送到Q1中,"lazy.pink.rabbit"这样的绑定key仅仅只有一次被传递到Q2中,即使它符合匹配两个绑定,像"quick.brown.fox"不匹配任何绑定的队列,这些消息将会被丢弃。
如果我们打破上述的这些规则发送包含一个单词或者四个单词的消息,比如说“orange”或者"quick.orange.male.rabbit"这样的key,这些消息将全部都被丢失,因为没有与之匹配的queue.
另外一方面像"lazy.orange.male.rabbit"这样key,即使它有四个单词,将匹配最后的一部分,满足Q2的规则,所以这些消息将被传递到Q2中。
注意:
1、Topic exchange是强大的,当一个queue绑定一个带有#的key时,无论是什么routingKey,它将接收所有的消息,跟fanout exchange是一样的效果。
2、当被绑定的key中没有使用#和*符号时,topic exchange就像一个direct exchange一样,能准确的将消息路由到匹配的queue中。
2.2、代码清单:
使用一个topic exchange在日志系统中,将开始一个带有两个单词的routingKey,格式为"<facility>.<severity>",代码几乎和之前的一样。
消息发送者:
package com.xuz.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); connection = factory.newConnection(); channel = connection.createChannel(); //指定exchange的类型为topic channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String[] msg = new String[]{"xuz RabbitMQ Test!","This is a Topic Exchange Model!"}; //获取发送消息的routing key String[] routType = new String[]{"anonymous.info","anonymous.warning","anonymous.error","*.info","anonymous.#"}; String routingKey = getRouting(routType); //获取发送消息 String message = getMessage(msg); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message .getBytes()); System.out.println("EmitLogTopic---->Sent ‘" + routingKey + "‘:‘" + message + "‘"); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) { } } } } private static String getRouting(String[] strings) { if (strings.length < 1) return "anonymous.info"; //选择key return strings[0]; } private static String getMessage(String[] strings) { if (strings.length < 2) return "Hello World!"; return joinStrings(strings, " ", 1); } private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0) return ""; if (length < startIndex) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
接收者:
package com.xuz.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); connection = factory.newConnection(); channel = connection.createChannel(); //指定exchange类型为topic channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); //指定队列绑定key String[] msgType = new String[]{"anonymous.info","anonymous.warning","anonymous.error","*.info","anonymous.#"}; if (msgType.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } //test1 :接收所有类型的消息 for (String bindingKey : msgType) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received ‘" + routingKey + "‘:‘" + message + "‘"); } } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) { } } } } }
2.3、测试topic exchange
测试方案如下图所示,有一个发送消息的EmitLogTopic,有五个接收消息的类,其中ReceiveLogsTopic03指定的key为:"*.info",意味它只接收绑定key消息中有info这个词的消息源的消息,ReceiveLogsTopic04指定的key为:"anonymous.#",意味着它接收绑定key消息中带有anonymous这个词的一个或者多个消息源的消息。其他的按照类的命名接收绑定指定key的消息源的消息,key类型为:
"anonymous.info","anonymous.warning","anonymous.error","*.info","anonymous.#"
操作步骤:(在此只挑选几个类测试)
1、运行ReceiveLogsTopic03、ReceiveLogsTopic04、ReceiveLogsTopicError、ReceiveLogsTopicInfo、ReceiveLogsTopicWarning,分别如下图所示:
2、运行EmitLogTopic,此时指定发送消息的绑定的key为:anonymous.warning,那么此时ReceiveLogsTopic04、ReceiveLogsTopicWarning两个应该接收到消息,其他的接收不到消息。
3、修改EmitLogTopic的key的类型为:*.info,此时只有ReceiveLogsTopic03才会接收到消息,其他的接收不到消息。
4、修改EmitLogTopic的key为:anonymous.info ,此时只有ReceiveLogsTopic03、ReceiveLogsTopicWarning、ReceiveLogsTopic04能接收到消息。其他的接收不到消息。
Topic exchange是非常强大的,它弥补了fanout exchange(广播)和direct exchange(不能多个queues)各自的不足,它比前两者具有更强的灵活性。通配符*和#的使用,使得topic exchange灵活性大大增强,消费者不光能从匹配绑定key的queue中取出相关消息,还能做到从指定发送消息的消息源的所有信息。
源码下载:
topic 消息交换