首页 > 代码库 > RabbitMQ之Topics(多规则路由)

RabbitMQ之Topics(多规则路由)

Exchange中基于direct类型无法基于多种规则进行路由。

例如分析syslog日志,不仅需要基于severity(info/warning/critical/error)进行路由,还需要基于auth、cron或者kernal模式进行路由。

Topic exchange可以满足这种需求。

 

Topic exchange

基于topic类型交换器的routing key不是唯一的,而是一系列词,基于点区分。

例如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"

binding key也是类型方式。

 

*表示只匹配一个关键字

#可以匹配0或者多个关键字

Q1队列匹配对所有orange的关注

Q2队列可以监听rabbits,以及所有lazy开头的

quick.orange.fox只会进入Q1,lazy.brown.fox只会进入Q2

lazy.pink.rabbit尽管和两个topic都匹配上,但是只会进入队列以此

quick.brown.fox不会和任何队列进行绑定,因此会被丢弃

 如果发送orange或者quick.orange.male.rabbit不会和任何队列进行绑定,也都会被丢弃。

 

Topic exchange功能很强大,类似其他exchange。当一个队列设置binding key为#,则它可以接收所有消息,不管routing key如何设置,类似fanout exchange。

当不设置*和#的话,topic exchange就类似direct exchange。

 

例子

发送消息

#!/usr/bin/env python#-*- coding:utf8 -*-import sys import pikaimport logginglogging.basicConfig(format=‘%(levelname)s:$(message)s‘,level=logging.CRITICAL)def emit_log():    pika.connection.Parameters.DEFAULT_HOST = ‘localhost‘    pika.connection.Parameters.DEFAULT_PORT = 5672    pika.connection.Parameters.DEFAULT_VIRTUAL_HOST = ‘/‘     pika.connection.Parameters.DEFAULT_USERNAME = ‘guosong‘    pika.connection.Parameters.DEFAULT_PASSWORD = ‘guosong‘    para = pika.connection.Parameters()    connection = pika.BlockingConnection(para)    channel = connection.channel()    #声明一个topic_logs交换器,类型为topic    channel.exchange_declare(exchange=‘topic_logs‘,type=‘topic‘)    #指定路由key    routing_key = sys.argv[1] if len(sys.argv) >1 else ‘anonymous.info‘    message = ‘ ‘.join(sys.argv[2:]) or "info:Hello World!"                                             #发送的时候指定routing_key为空,没有绑定队列到交换器上,消息将会丢失    #对于日志类消息,如果没有消费者监听的话,这些消息就会忽略    channel.basic_publish(exchange=‘topic_logs‘,routing_key=routing_key,body=message)    #%r也是string类型    print "[x] Sent %r" % (message,)    connection.close()if __name__ == ‘__main__‘:    emit_log()

 

接收消息

 

def callback(ch, method, properties, body):    print " [x] %r " % (body,)def receive_logs():    pika.connection.Parameters.DEFAULT_HOST = ‘localhost‘    pika.connection.Parameters.DEFAULT_PORT = 5672    pika.connection.Parameters.DEFAULT_VIRTUAL_HOST = ‘/‘     pika.connection.Parameters.DEFAULT_USERNAME = ‘guosong‘    pika.connection.Parameters.DEFAULT_PASSWORD = ‘guosong‘    para = pika.connection.Parameters()    connection = pika.BlockingConnection(para)    channel = connection.channel()    #声明一个topic_logs交换器,类型为topic    channel.exchange_declare(exchange=‘topic_logs‘,type=‘topic‘)    #声明一个随机队列,设置exclusive=True,在该consumer退出的时候,对应的队列被删除    result = channel.queue_declare(exclusive=True)    #获取随机队列的名称    queue_name = result.method.queue    binding_keys = sys.argv[1:]    if not binding_keys:        print >> sys.stderr, "Usage: %s [binding_key]" %  sys.argv[0]        sys.exit(1)    for binding_key in binding_keys:         #绑定交换器和队列        channel.queue_bind(exchange=‘topic_logs‘,queue=queue_name,routing_key=binding_key)    print ‘[*] Wating for logs.To exit press CTRL+C‘    #开始消费消息    channel.basic_consume(callback,queue=queue_name,no_ack=True)    channel.start_consuming()

 

接收所有消息

python receive_logs_topic.py "#"

接收kern开头,包含一个关键字

python receive_logs_topic.py "kern.*"

多个绑定

python receive_logs_topic.py "kern.*" "*.critical"
guosong@guosong:~/code/rabbitmq/ch5$ python emit_log.py "kern.critical" "A critical kernel error"[x] Sent ‘A critical kernel error‘guosong@guosong:~/code/rabbitmq/ch5$ ./receive_logs.py "kern.*"[*] Wating for logs.To exit press CTRL+C [x] ‘A critical kernel error‘ 

参考链接

http://www.rabbitmq.com/tutorials/tutorial-five-python.html