首页 > 代码库 > 发布订阅
发布订阅
"发布订阅"
三、”发布订阅”
上一节的练习中我们创建了一个工作队列。队列中的每条消息都会被发送至一个工作进程。这节,我们将做些完全不同的事情——我们将发送单个消息发送至多个消费者。这种模式就是广为人知的“发布订阅”模式。
为了说明这种模式,我们将构建一个简单的日志系统。包括2个应用程序,一个传送日志消息另一个接收并打印这些消息。
我们的日志系统中每一个运作的接收端程序都会收到这些消息。这种方式下,我们就可以运行一个接收端发送日志消息至硬盘,同时可以运行另一个接收端将日志打印到屏幕上。
理论上讲,已发布的日志消息将会被广播到所有的接收者。
交换器(Exchange)
之前的几节练习中我们发送接收消息都是在队列中进行,是时候介绍下RabbitMQ完整的消息传递模式了。
先来迅速的回顾下我们之前章节:
- 一个生产者就是一个用来发送消息的应用程序
- 一个 队列好比存储消息的缓存buffer
- 一个消费者就是一个用户应用程序用来接收消息
RabbitMQ消息传递模型的核心思想是生产者从来不会直接发送消息至队列。事实上,生产者经常都不知道消息会被分发至哪个队列。
相反的是,生产者仅仅发送消息至交换器。交换器是非常简单的东西:一边从生产者那边接收消息一边发送这些消息至队列。交换器必须准确的知道这些被接收的消息该如何处理。它应该被添加到某个特定队列?或者添加到多个队列?甚至直接放弃。具体的传输规则就是通过交换器类型来定义的。
交换器类型有四种:direct、topic、headers、fanout。这节我们主要关注最后一种——fanout。让我们来创建一个fanout类型的交换器,命名为logs:
err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments )
正如你从名字中猜测的一样,它仅仅广播所有消息到所有已知的接收队列。实际上这正是我们需要的日志系统。
备注:之前的几节练习中我们并不知道交换器,但我们依然能够将消息发送至队列中,之所以可以实现是因为我们使用了默认的交换器,使用空字符串表示。
回顾下之前我们发送消息是这样子的:
err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })
这里我们可以使用默认也可以自己命名交换器:如果路由键存在的话,消息会被路由到加上路由键参数的地址,注意fanout类型会直接忽略路由键的存在。
以下是修改后的代码:
err = ch.ExchangeDeclare( "logs", // name 定义一个名为logs的交换器 "fanout", // type 交换器类型为fanout即广播类型 true, // durable 持久化 false, // auto-deleted 无队列绑定时是否自动删除 false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs", // exchange 指定消息发送的交换器名称 "", // routing key 因为fanout类型会自动忽略路由键,所以这里的路由键参数任意,一般不填 false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })
临时队列
你可能记得之前我们声明队列的时候都会指定一个队列名称(记得hello和task_queue?)。队列的命名对我们来说至关重要——我们需要将工作进程指向同一个队列。当你需要在消费者和生产者之间共享队列的话声明队列就显得很重要。
但这对我们的日志系统来说无关重要。我们需要监听的是所有的日志消息,而不是他们中的某一类。我们只关注当前流中的消息而不关注旧的那些。解决这个我们需要做两件事。
首先,每当链接RabbitMQ的时候我们需要创建一个新的、空的队列。为做到这点,我们必须创建一个名称随机的队列,甚至更好的实现方式是——让服务端给我们自动生成一个随机的队列。
其次,一旦消费者链接断开,该队列便会自动删除。
在amqp客户端中,当我们给一个队列名称设定为空字符串时,我们就创建了一个非持久化的生成队列:
q, err := ch.QueueDeclare( "", // name 满足第一点:服务端自动产生随机队列 false, // durable false, // delete when usused true, // exclusive 满足第二点:连接断开立即删除 false, // no-wait nil, // arguments )
当该方法返回的时候,声明好的队列便包含一个由RabbitMQ生成的随机队列名称。举例来说,队列名称形如:amq.gen-JzTY20BRgKO-HjmUJj0wLg这种的。
当消费者的链接宣布关闭后,队列便像exclusive参数设置的那样,自动删除。
绑定
我们已经创建了一个fanout类型的交换器和一个队列,现在我们需要告诉交换器将消息发送至我们的队列。这种交换器和队列中的关联关系就叫做绑定。
err = ch.QueueBind( q.Name, // queue name 绑定的队列名称 "", // routing key 绑定的路由键 "logs", // exchange 绑定的交换器名称 false, nil )
从现在起,logs交换器便能发送消息至我们的队列。
糅合在一起
生产者的程序,也就是发送消息端,跟之前几节的发送代码差不多。最重要的是我们现在要发送消息到logs交换器而非默认的交换器。发送的时候我们可以设置一个路由键,但是对于fanout类型的交换器来说它将被忽略。下面就是发送日志方的代码:
// rabbitmq_3_emit_log.go project main.go package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s:%s", err, msg) panic(fmt.Sprintf("%s:%s", err, msg)) } } func bodyForm(args []string) string { var s string if len(args) < 2 || os.Args[1] == "" { s = "Hello World! This is a test!" } else { s = strings.Join(args[1:], " ") } return s } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "failed to dial rabbitmq server") defer conn.Close() ch, err := conn.Channel() failOnError(err, "failed to declare the channel") defer ch.Close() //声明一个交换器,交换器名称logs,类型fanout err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil) failOnError(err, "failed to declare the exchange") body := bodyForm(os.Args) //发送消息到交换器 err = ch.Publish("logs", "", false, false, amqp.Publishing{ Body: []byte(body), ContentType: "text/plain", }) failOnError(err, "failed to publish the message") }
备注:这里发送方并不需要声明队列之类的,不像之前的代码需要声明,这里的发送方唯一关联的是交换器,所以只需声明交换器并发送消息至交换器即可。
正如你想的那样,链接建立后我们声明交换器,这一步是必须的因为发送消息到一个不存在的交换器是完全禁止的。
如果该交换器上面没有队列绑定的话那么发送至该交换器的消息将全部丢失,但这对我们来时ok;如果没有消费者我们会安全地丢弃这些消息。
下面是日志接收方的代码:
// rabbitmq_3_receive_logs.go project main.go package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s:%s", err, msg) panic(fmt.Sprintf("%s:%s", err, msg)) } } func bodyForm(args []string) string { var s string if len(args) < 2 || os.Args[1] == "" { s = "Hello World! This is a test!" } else { s = strings.Join(args[1:], " ") } return s } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "failed to dial rabbitmq server") defer conn.Close() ch, err := conn.Channel() failOnError(err, "failed to declare the channel") defer ch.Close() //声明一个交换器,交换器名称logs,类型fanout err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil) failOnError(err, "failed to declare the exchange") //声明一个队列 q, err := ch.QueueDeclare("", false, false, true, false, nil) failOnError(err, "failed to declare the queue") //设置绑定(第二个参数为路由键,这里为空) err = ch.QueueBind(q.Name, "", "logs", false, nil) failOnError(err, "failed to bind the queue") //注册一个消费者 msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil) failOnError(err, "Failed to register a consumer") forever := make(<-chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }
如果你想将日志保存到文件,执行如下命令:
go run receive_logs.go > logs_from_rabbit.log
如果你仅仅想在屏幕上查看日志,开启一个新的控制台执行如下命令:
go run receive_logs.go
当然了,你最后还要发出日志才行:
go run emit_log.go
使用rabbitmqctl list_bindings命令可以直接查看所有的绑定,如运行2个receive_logs.go程序你就会看到如下输出:
rabbitmqctl list_bindings Listing bindings ... logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done.
实际效果:
分别开启两个控制台,均监听相同队列,同时收到消息并打印了,说明两个随机的队列均收到了logs交换器发来的消息,发送方略。
发布订阅