首页 > 代码库 > 柯南君:看大数据时代下的IT架构(7)消息队列之RabbitMQ--案例(routing 起航)
柯南君:看大数据时代下的IT架构(7)消息队列之RabbitMQ--案例(routing 起航)
一、回顾
让我们回顾一下,在上几章里都讲了什么?总结如下:
- 《柯南君:看大数据时代下的IT架构(1)业界消息队列对比》
- 《柯南君:看大数据时代下的IT架构(2)消息队列之RabbitMQ-基础概念详细介绍》
- 《柯南君:看大数据时代下的IT架构(3)消息队列之RabbitMQ-安装、配置与监控》
- 《柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航)》
- 《柯南君:看大数据时代下的IT架构(5)消息队列之RabbitMQ--案例(Work Queues起航)》
- 《柯南君:看大数据时代下的IT架构(6)消息队列之RabbitMQ--案例(Publish/Subscribe起航)》
二、Routing(路由) (using the Java client)
在前面的学习中,构建了一个简单的日志记录系统,能够广播所有的日志给多个接收者,在该部分学习中,将添加一个新的特点,就是可以只订阅一个特定的消息源,也就是说能够直接把关键的错误日志消息发送到日志文件保存起来,不重要的日志信息文件不保存在磁盘中,但是仍然能够在控制台输出,那么这便是我们这部分要学习的消息的路由分发机制。
三、Bindings(绑定)
channel.queueBind(queueName, EXCHANGE_NAME, "");
一个绑定就是一个关于exchange和queue的关系,它可以简单的被理解为:队列是从这个exchange中获取消息的。
绑定可以采取一个额外的routingKey的参数,为了避免与basicPublish参数冲突,称之为一个绑定Key,这是如何创建一个带routingKey的绑定的关键。
channel.queueBind(queueName, EXCHANGE_NAME, "black");
四、Direct exchange(直接交换机)
前面实现的日志记录系统中广播所有的消息给所有的消费者,现在对其进行扩展,允许根据信息的严重程度来对消息进行过滤,比如,希望一个程序写入到磁盘的日志消息只接收错误的消息,而不是浪费磁盘保存所有的日志消息。
为了实现这个目标,使用一个fanout类型的exchange,显然是不能够满足这样的需求的,因为它只能广播所有的消息。
为此将使用一个direct exchange来代替fanout exchange,direct exchange使用简单的路由算法,将消息通过绑定的Key匹配将要到达的队列。
从上面的结构图中可以看出direct exchange X绑定着两个queue(Q1,Q2),第一个queue绑定的routingKey为orange,第二个有两个routingKey被绑定,一个routingKey为black,另外一个routingKey为green.
说明:发送带有routingKey为orange的消息到X(exchange)中,X将该消息路由到Q1中,发送带有routingKey为black和green的消息都将被路由到Q2中,其他所有消息将会被丢弃。
五、Multiple bindings(多绑定)
六、Emitting logs(发送的日志)
使用direct代替fanout类型的exchange,发送消息到一个direct exchange中,将根据消息的重要程度作为routingKey,这样接收程序能够选择它想要接收的日志信息,首先必须先创建一个exchange.
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
六、Subscribing(订阅消息)
接收者根据自己感兴趣的severity来创建一个新到的绑定。String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }
七、Putting it all together(代码实现)
public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); channel.close(); connection.close(); } //.. }
ReceiveLogsDirect代码清单如下:
public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } }
编译和往常一样(参见以往教程用于编译和类路径的建议)。现在,为了方便起见,我们将使用一个环境变量$CP(%CP%在Windows上)的运行时类路径的例子。
如果你只想保存 “警告”和“错误”(而不是“信息”)日志消息到一个文件,打开一个控制台和type:
$ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log
如果你想看到所有的日志消息在你的屏幕上,打开一个新终端和do:
$ java -cp $CP ReceiveLogsDirect info warning error
[*] Waiting for logs. To exit press CTRL+C
例如,发布一个错误日志信息
$ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode." [x] Sent ‘error‘:‘Run. Run. Or it will explode.‘
柯南君:看大数据时代下的IT架构(7)消息队列之RabbitMQ--案例(routing 起航)