首页 > 代码库 > RabbitMQ官网教程---发布/订阅
RabbitMQ官网教程---发布/订阅
(使用python客户端pika 0.9.8)
在前面的教程中我们创建了一个工作队列。假设在一个工作队列后面是每一个被传递给正确的工作者的任务。在这部分中我们将做一些完全不同的事情--我们将给多个消费者传递一个消息。这种模式被称为“发布/订阅”。
为了阐明这个模式,我们将构建一个简单的日志系统。它将由两个程序构成--第一个将发出日志消息并且第二个将接收并且打印它们。
在我们的日志系统中每个运行的接收程序副本将获得这个消息。用这种方式我们将可以运行一个接收器并且直接日志到磁盘;而且同时我们将运行另一个接收器并且在屏幕上看日志。
本质上,被发布的消息将广播到所有的接收器。
交换
在这个教程的前面部分我们从一个队列中发送和接收消息。现在该到介绍在Rabbit中的完整的消息模型了。
我们来快速的复习一下前面教程涉及到的一些东西:
?生产者是一个发送消息的用户应用程序。
?队列是一个存储消息的缓冲
?消费者是一个接收消息的用户应用程序。
在RabbitMQ中在消息模型中的核心思想是生产者永远不直接发送任何消息给一个队列。事实上,生产者甚至一点都不知道是否一个消息将被传递给任何队列。
相反,生产者只能发送消息到一个exchange。一个exchange是一件非常简单的事情。一边它从生产者中接收消息另一边它把消息推送到队列中。这个exchange必须知道它接收到的这个消息是做什么的。它应该被追加到一个特别的队列中吗?它应该被追加到许多队列中吗?或者它应该被取消吗?这些规则通过exchange的type被定义。
有几个exchange有效的type:direct,topic,headers和fanout。我们将聚焦于最后一个--fanout。我们来创建一个这种类型的exchange,并且调用它的日志:
channel.exchange_declare(exchange=‘logs‘,type=‘fanout‘)
fanout 的exchange是非常简单的。正如你可能从名字中猜到的,它仅仅是广播所有的它接收到的消息给所有它知道的队列。而且那就是我们的日志器需要的。
Listing exchanges 为了列出在服务器上的exchange你可以使用rabbitmqctl运行: $ sudo rabbitmqctl list_exchanges Listing exchanges... logs fanout amq.direct direct amq.topic topic amq.fanout fanout amq.headers headers ... done. 在这个列表中有一些amq.*的exchange和默认的exchange。那些事默认被创建的,但是你不可能在同时使用它们。 Nameless exchange 在这个教程的前面部分我们关于exchange完全不了解,但是它任何能给队列发送消息。那是可能的因为我们正在使用一个默认的exchange,这个默认的exchange是我们用空字符串标记的。 在我们发布一个消息以前回调: channel.basic_publish(exchange=‘‘,routing_key=‘hello‘, body=message) 这个exchange参数是以exchange命名的。空字符串表示默认或者未命名的exchange:如果消息存在,消息被指定的routing_key名字路由到队列。
现在,我们可以发布到一个被命名的exchange:
channel.basic_publish(exchange=‘log‘,routing_key=‘‘,body=message)
临时队列
像你也许还记得我们前面使用的用一个指定的名字(记得hello和task_queue)队列。能够命名一个队列对我们是非常重要的--我们需要把工作者指向相同的队列。当你想在生产者和消费者之间共享这个队列的时候,为一个队列取一个名字是重要的。
但是对我们的日志器那不是重要的。我们想接听到关于所有日志的消息,不仅仅是一个它们的子集。我们也对下面的消息而不是在老的队列里的感兴趣。解决这个问题我们需要两件事情。
首先,无论我们什么时候连接到Rabbit我们需要一个最新的空的队列。为了做这个我们需要用一个随机的名字创建一个队列或者甚至更好的-让服务器选择一个随机队列为我们命名。我们能够通过给queue_declare不应用queue参数来做这件事情:
result=channel.queue_declare()
在这时,result.method.queue包含了一个随机队列名字。例如它也许看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg.
其次,一旦我们跟消费者连接断掉了那就删除这个队列。exclusive是为这个的标识:
result=channel.queue_declare(exclusive=True)
绑定
我们已经创建了fanout类型的exchange和一个队列。现在我们需要告诉这个exchange给我们的队列发送消息。在exchange和队列之间的关系被叫做binding。
channel.queue_bind(exchange=‘logs‘,queue=result.method.queue)
从现在起这个日志exchange将给我们的队列追加消息。
Listing Bindings 你可以列出已经存的绑定,你猜到了,rabbitmqctl list_bindings。
把代码合在一起
生产者程序,它发出日志消息,跟前面的教程看起来没有很大程度的不同。最终要的改变是我们现在想发布消息给我们的日志exhcnage代替没有名字的exchange。我们需要在发送的时候提供一个routing_key,但是它的值会因为fanout类型exchange而被忽略。这是emit_log.py脚本的代码:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘logs‘, type=‘fanout‘) message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange=‘logs‘, routing_key=‘‘, body=message) print " [x] Sent %r" % (message,) connection.close()
正如你看到的,在建立了连接之后我们定义了这个exchange。这一步是必须的,同样地发布到一个不存在的exchange是被拒绝的。
如果还没有队列绑定到这个exchange,那么消息将被丢失,但是对我们那是没问题的;如果到现在还没有消费者监听我们可以安全的取消这个消息。
receive_logs.pyt的代码:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host=‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘logs‘, type=‘fanout‘) result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange=‘logs‘, queue=queue_name) print ‘ [*] Waiting for logs. To exit press CTRL+C‘ def callback(ch, method, properties, body): print " [x] %r" % (body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
?
RabbitMQ官网教程---发布/订阅