首页 > 代码库 > RabbitMQ官网教程---主题
RabbitMQ官网教程---主题
(使用python客户端pika 0.9.8)
在前面的教程中我们提高了我们的日志系统。我们使用一个direct类型的exchange代替使用一个fanout类型的exchange的虚拟广播,获取一个可选的接收日志。
尽管使用direct类型的exchange提高了我们的系统,但是它任然是有限的-它不基于多个判断标准路由。
在我们的日志系统中,我们也许想不仅订阅基于严重程度的日志,而且还有基于源的生产日志。你也许从unix的syslog工具中知道了这个概念,它基于严重程度路由日志和设施。
那将给我们许多弹性-我们也许只想监听来自原‘cron‘的严重的错误而不是所有的‘kem‘日志。
为了实现我们的日志系统,我们需要了解更多关于topic的exchange。
Topic exchange
发送消息到一个topic类型的exchange不能有一个任意的routing_key-它必须是一个通过圆点分隔的单词列表。这些单词可能是任何事物,但是通常他们指定一些被连接的消息的特性。一些有效的路由键例子:“stock.usd.nyse”,"nyse.vmw","quick.orange.rabbit"。可能在路由键中海油更多你所想要的单词,上线是255字节。
绑定键必须是用相同的方式。topic类型的exchange的背后的逻辑跟direct的是相似的-用一个特别的路由键发送的消息将被传递给所有的用匹配的绑定键绑定的队列。然而,对于绑定键有两个重要的特别的例子:
*(star)可以定义精确的一个单词。
#(hash)可以订阅0到多个单词。
用一个例子是最容易解释这个的:
在这个例子中我们将发送给所有描述动物的消息。这个消息将使用由三个单词(两个圆点)组成的路由键发送。在路由键中的第一个单词将描述一个快速的移动,第二个是一个颜色,第三个是物种:
"<celerity>.<colour>.<species>"。
我们创建三个绑定:Q1使用绑定键"*.orange.*"被绑定并且Q2使用"*.*.rabbit"和"lazy.#"被绑定。
这些绑定可以被概述为:
Q1对所有的orange动物感兴趣
Q2想监听关于兔子和懒散动物的每件事情。
用一个路由键设置为"quick.orange.rabbit"的消息将被传递给两个队列。消息"lazy.orange.elephant"也将进入他们两个。另一方面,"quick.orange.fox"仅仅将进入第一个队列并且"lazy.brown.fox"仅仅进入第二个。"lazy.pink.rabbit"将被传递个第二个队列仅仅一次,尽管它匹配了两个绑定。"quick.brown.fox"不比配任何绑定,因此它将被丢弃。
如果我们中断了我们的通信并且用一个或者四个单词像"orange"或者"quick.orange.male.rabbit"发送一个消息会发生什么?那些消息将不会匹配任何的绑定并且将丢失。
在另一方面,尽管"lazy.orange.male.rabbit"有四个单词,但是它将匹配最后的绑定并且传递给第二个队列。
Topic exchange topic类型的exchange是强大的并且可以有像其它exchange的行为。 当一个队列用"#"(hash)绑定键被绑定 - 它将接收所有的消息,忽略路由键 - 就像用fanout类型的exchange一样。 当特殊字符"*"(star)和"#"(hash)未被用于绑定,topic类型的exchange将只跟direct的行为相像。
把代码合在一起
我们将在我们的日志系统中使用topic类型的exchange。我们将开始一个工作,假设日志的路由键有两个单词:"<facility>.<severity>"。
所有的代码几乎和前面的教程是相似的。
emit_log_topic.py的代码:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange = ‘topic_logs‘, type=‘topic‘) routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonmyous.info‘ message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘ channel.basic_publish(exchange=‘topic_logs‘, routing_key=routing_key, body=message) print "[x] Sent %r:%r" %(routing_key, message) connection.close()
receive_logs_topic的代码:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange = ‘topic_logs‘, type=‘topic‘) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: print >> sys.stderr, "Usage:%s [binding_key]..." %(sys.argv[0],) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange=‘topic_logs‘, queue=queue_name, routing_key=binding_key) print ‘[*] Waiting for logs. To exit press CTRL+C‘ def callback(ch, method, properties, body): print "[x] %r:%r" %(method.routing_key, body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
接收所有的日志可以运行:
python receive_logs_topic.py "#"
为了接收所有来自于设施的"kern"日志:
python receive_logs_topic.py "kern.*"
或者如果你只想监听关于"critical"日志:
python receive_logs_topic.py "*.critical"
你可以创建多个绑定:
python receive_logs_topic.py "kern.*" "*.critical"
生产日志的路由键为"kern.critical"类型:
python emit_log_topic.py "kern.critical" "A critical kernel error"
?
RabbitMQ官网教程---主题