首页 > 代码库 > RabbitMQ官网教程---路由

RabbitMQ官网教程---路由

(使用python客户端pika 0.9.8)

在前面的教程中我们构建了一个简单的日志系统。我们可以给许多接收者广播日志消息。


在这个教程中我们将添加一个特性给它-我们将订阅仅仅一种消息子集成为可能。例如,我们可以指挥仅仅错误消息到日志文件(保存到磁盘空间),它任然可以在控制台打印所有的日志消息。


绑定

在前面的例子中我们已经创建了绑定,你可以重新调用像这样的代码:

channel.queue_bind(exchange=exchange_name, queue=queue_name)


绑定是exchange和队列之间的一种关系。这可以简单的读作:队列对来自于这个exchange的消息是感兴趣的。


绑定可以使用一个额外的routing_key参数。为了避免跟一个basic_publish参数混乱我们将调用它一个绑定的键。这就是我们如何使用一个键创建一个绑定:

channel.queue_bind(exchange=exchange_name,queue=queue_name, routing_key=‘black‘)

绑定键意味着以来exchange类型。fanout类型的exchange,我们用了前面的,简单的忽略它的值。


Direct exchange

前面的教程中我们的日志系统给所有的消费者广播所有的消息。我们想扩展它允许基于它们的服务过滤消息。例如我们也许想让写的脚本仅仅只接收严重的错误日志消息,并且对警告和信息日志上不浪费磁盘空间。


我们使用一个fanout类型的exchange,它不会给我们太多的扩展性-它仅仅能无意识的广播。


我们将使用direct类型的exchange代替它。在direct类型的exchange后面的路由算法是简单的-一个消息进入一个正真匹配绑定键的消息routing_key的队列。


为了阐明这个,考虑下面的设置:

技术分享

在这个设置中,我们看到direct类型的exchange X有两个队列被绑定给它。第一个队列使用绑定键orange绑定,第二个有两个绑定,一个用绑定键black并且另一个用green。


像这样的设置一个消息使用一个路由键orange被发布到exchange将被路由到队列Q1。使用路由键black或者green将进入Q2。所有的消息将被丢弃。


多个绑定

技术分享

它完美合法的使用相同的绑定键绑定了多个队列。在我们的例子中我们可以在X和Q1之间天剑一个绑定键black。在那个例子中,direct类型的exchange将表现相似于fanout并且将给所有匹配的队列广播消息。一个使用路由键的black消息将被传递给Q1和Q2。


生产日志

我们将给我们的日志系统使用这种模式。单体fanout我们将给一个direct类型的exchange发送消息。我们将提供日志严重程度作为一个路由键。接收脚本用这种方式就能选择它想要接收的严重程度。我们首先聚焦于生产日志。


我们总是需要首先创建exchange:

channel.exchange_declare(exchange=‘direct_logs‘, type=‘direct‘)


并且我们准备发送一个消息:

channel.basic_publish(exchange=‘direct_logs‘,routing_key=severity,body=message)


对于简化的事情我们建议‘严重程度‘可能是‘info‘,‘warning‘,‘error‘中的其中之一。


订阅

接收消息将仅仅像前面的教程一样生效,带着一个异常-我们将给每个我们感兴趣的严重程度创建一个绑定。

result=channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange=‘direct_logs‘, queue=queue_name, routing_key=severity)


把代码合在一起

技术分享

emit_log_direct.py的代码:

#!/usr/bin/env python

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘direct_logs‘,type=‘direct‘)

severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘
message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘
channel.basic_publish(exchange=‘direct_logs‘, routing_key=severity, body=message)
print "[x] sent %r:%r" %(severity, message)
connection.close()


receive_logs_direct.py的代码:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘direct_logs‘, type=‘direct‘)

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    print >> sys.stderr, "Usage : %s [info] [warning] [error]" %(sys.argv[0],)
   sys.exit(1)
   
for severity in severities:
    channel.queue_bind(exchange=‘direct_logs‘, queue=queue_name, routing_key=severity)
    
print ‘[*] waiting for logs. To exit press CTRL+C‘

def callback(ch, method, properties, body):
    print "[x] %r:%r" %(method, routing_key, body,)
    
channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()


RabbitMQ官网教程---路由