首页 > 代码库 > 消息队列_RabbitMQ-0002.深入MQ生产者/信道/交换机/队列/消费者?
消息队列_RabbitMQ-0002.深入MQ生产者/信道/交换机/队列/消费者?
形象说明:
比喻: RabbitMQ提供的消息投递服务类似于现实生活中的快递公司,双11我们可能会买很多东西,自然会陆续收到很多寄自淘宝店主由快递公司发来的快件,但是可能很多时候买回来的东西并不合心意,自然会陆续通过快递公司退回快件,所以回归到架构,这里的快件就相当于消息,我们相当于应用程序,淘宝店主相当于服务器,而快递公司相当于路由器,应用程序可以发送和接收消息,服务器也可以发送和接收消息,所以当应用程序连接到RabbitMQ时,就必须做一个决定:我是发送还是接收哪?
现实: 生产者(Producer)创建消息,然后发布(发送)到消息代理服务器(RabbitMQ),消息包含两部分内容:有效载荷(想要传输的数据,支持任何内容)和标签(描述有效载荷,最终由RabbitMQ来决定谁将获得消息的拷贝),消费者(Consumer)启动时连接消息代理服务器上,并订阅指定队列,每当消息达到此队列时,RabbitMQ会将其发送给订阅的消费者,当消费者接收到消息时,它只是得到了有效载荷,因为消息在路由的过程中,消息的标签并没有随着有效载荷一起传递,RabbitMQ甚至不会告诉你生产者是谁?当然如果觉得有必要,也可以将身份信息加入有效载荷一起传递~
信道连接:
说明: 使用消息代理服务器RabbitMQ的前提是建立AMQP信道,应用程序可以基于一条TCP连接快速创建销毁无数信道来减少传统TCP连接消耗,每个信道有唯一ID(由AMQP库维护),AMQP命令都是通过信道发送
消息路由:
# 消费消息
1. 消费者通过AMQP的basic.consume命令订阅,这样做会将信道置为接收模式,订阅消息后,消息一到达队列时就自动接收,直到取消队列的订阅为止
2. 消费者通过AMQP的basic.get命令订阅,这样做会将信道置为接收模式,订阅消息后,获得单条消息后,然后自动取消订阅,千万不要妄想放在循环里代替basic.consume,否则无法发挥其高吞吐量特性
3. 如果消息到达了无人订阅的队列,消息会在队列中等待,一旦有消费者订阅该队列,队列的消息会发送给消费者
4. 如果队列拥有多个消费者时,队列的消息以轮询的方式发送给消费者,每条消息只会发送给一个订阅的消费者,且每个消费者接收到的每一条消息都必须进行确认,消费者必须通过AMQP的basic.ack命令显式地向RabbitMQ发送一个确认,或者在订阅到队列的时候将auto_ack参数设置为true,此时一旦消费者接收消息,RabbitMQ会自动认为其确认了消息,一旦消息被确认,RabbitMQ才会安全的把消息从队列中删除,主要是防止确认之前RabbitMQ断开连接或取消订阅或程序崩溃,RabbitMQ会认为这条消息没有分发,然后重新分发给下一个订阅的消费者,RabbitMQ会认为没有确认的消费者并没有准备好接收下一条消息,所以可以好好利用这一点,如果处理消息内容非常耗时,则你的应用程序可以延迟确认消息,直到消息处理完成再确认,这样可防止RabbitMQ持续不断的消息导致过载
5. 如果收到消息后想要明确拒绝而不是确认收到消息的话,可使用AMQP的basic.reject,当把其basic.reject参数设置为true时,RabbitMQ会将消息重新发送给下一个订阅的消费者,如果设置为false,则RabbitMQ会把消息从队列中移除,而不会把它发送给新的消费者,当然也可以通过对消息确认的方式来简单地忽略该消息,如当你检测到一条格式错误的消息而任何一个消费者都无法处理的时候,此时就非常有用了.
# 队列创建
1. 消费者和生产者都能使用AMQP的queue.declare命令来创建队列,但是如果消费者在同一条信道上订阅了另一个队列的话,就无法再声明队列,必须首先取消订阅,将信道设置为"传输"模式,
2. 创建队列时,最好指定一个队列名称,消费者订阅队列时需要队列名称,并在创建绑定时也需要队列名称,如果不指定,RabbitMQ会随机分配一个名称作为queue.declare的返回值(常用于构建在AMQP上的RPC应用,此时零时匿名队列很有用),创建队列时exclusive为true时,队列会变为私有,此时只有你的应用程序才能消费队列消息,当你想要限制一个队列只有一个消费者时很有有,auto-delete为true时,当最后一个消费者取消订阅的时候,队列就会自动移除,当你需要零时队列只为一个消费者服务的话,可结合auto-delete和exclusive,当消费者断开连接时,队列就被移除了.
3. 如果尝试声明一个已经存在的队列时,RabbitMQ就什么都不做,并成功返回,如果你只是为了检测队列是否存在,可设置queue.declare的passive为true,如果存在会成功返回,否则会直接返回一个错误
4. 由于生产者和消费者都可以通过queue.declare创建队列,但是由于如果消息路由到了不存在的队列RabbitMQ会直接忽略它们,所以最好是生产者和消费者都建队列
#交换绑定
1. 如果你想要将消息投递到队列时,首先得把消息发送给交换机,然后根据确定的规则,RabbitMQ会将决定消息该投递到哪个队列,这些规则被称为路由键(Routing Key),队列通过路由键绑定到交换机,当你把消息发送到消息代理服务器时,消息将拥有一个路由键,即便为空,RabbitMQ也会将其和绑定使用的路由键进行匹配,如果匹配成功,消息会被投递到该队列,如果不匹配将进入"黑洞"
2. Direct直接交换机(channel->basic_publish(message, exchange, routingkey)),非常简单,如果路由键匹配的话,消息就被投递到对应的队列,当声明队列时,会自动绑定到默认交换机,并以队列名称作为路由键,所以发送消息时exchange为空则会发送到默认交换机,routingkey直接填写对应的队列名即可,如果默认交换机无法满足应用程序需求时,可通过exchange.declare创建其它交换机
3. Fanout扇形交换机,非常简单,当你发送一条消息到fanout交换机时,它会把消息投递给所有附加在此交换机上的队列,这允许你对单条消息做不同方式的反应,如一个WEB应用程序可能需要在用户上传新的图片时,用户相册必须清除缓存,同时用户应该得到些积分奖励,你可以将两个队列绑定到图片上传交换机上,一个用于清除缓存,另一个用于增加用户积分,后期如果有其它需求只需要为新的消费者写段代码,然后声明新的队列并将其绑定到fanout交换机上,这样就可以实现生产者和消费者完全解耦,允许你轻而易举的添加应用程序的功能.
4. Topic主题交换机,非常简单,当你发送一条消息到topic交换机时,它会把消息投递给以点号分割的路由键,匹配模式中*匹配特定位置的任意文本,"#"匹配所有的规则,是没有类似"*"以点号特定块儿匹配的概念的,它匹配包括点号在内的所有规则.
总结: 从上面几种模式可以看出其实RabbitMQ在开发中的角色可以非常灵活,既可以作为队列服务器使用,也可以作为RPC服务器使用,完全取决于你如何组织这些功能.
虚机隔离:
说明: RabbitMQ还支持Vhost"虚拟主机",每个Vhost本质上是一个迷你版拥有自己的队列/交换机/绑定以及权限机制的RabbitMQ服务器,这样就可以通过一个RabbitMQ服务众多应用程序,Vhost之间相互隔离,有效的避免了队列/交换机的命名冲突,否则你不得不运行多个RabbitMQ,默认Vhost为vhost: "/"可通过guest/guest访问,但是为了安全起见,应该及时更改
添加虚机: /xm-workspace/xm-apps/rabbitmq/sbin/rabbitmqctl add_vhost xmzoomeye
查看虚机: /xm-workspace/xm-apps/rabbitmq/sbin/rabbitmqctl list_vhosts
删除虚机: /xm-workspace/xm-apps/rabbitmq/sbin/rabbitmqctl delete_vhost xmzoomeye
说明: 一旦Vhost创建成功之后,就可以连接上去开始添加队列和交换机,如果想连接远程RabbitMQ节点可通过rabbitmqctl -n rabbit@hostname list_vhosts,需要注意的是rabbit@hostname中rabbit@是固定的,而hostname必须正确的是远程主机名
持久存储:
1. 默认重启RabbitMQ后,之前定义的交换机/队列都会消失,但是如果设置队列和交换机的durable属性为true,则在崩溃重启之后会重建队列和交换机,但是消息并不会重建,如果要实现持久化消息,则需要首先将"投递模式"设置为2将消息标记成持久化,然后发布到持久化的交换机并到达持久化的队列,这样才可以保证消息的持久化.
2. RabbitMQ确保持久化消息能从服务器重启中恢复其实是将它们写入磁盘上的一个持久化日志文件,当发布一条持久化消息到持久化交换机时,RabbitMQ会在消息提交到日志文件后才发送响应,如果消息后来被路由到非持久化队列,它会自动从持久化日志中删除,并且无法从服务器重启中恢复,如果消息后来被路由到持久化队列且被消费者消费并确认,则RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集,但是并不是所有的消息都需要启用持久化,不然会严重影响RabbitMQ每秒处理的消息总数
3. 从业务分析性能需求,如果要单台RabbitMQ服务器每秒处理10万条消息则[可以考虑更快的存储系统]或[通过在生产者单独信道上监听应答队列,发送消息时有效载荷带上此队列名,消费者就可以回答应答确认接收返回给生产者]或[分开建立持久化热备非集群负载均衡和非持久化集群],这样持久化消息通信负载不会减慢非持久化消息的处理.
4. AMQP中,一旦把信道设置成事务模式后,通过信道发送需要确认的消息,如果第一个消息失败则后续命令会忽略,虽然可以借助它确认消息是否持久化到磁盘,但是事务不但会降低消息吞吐量,而且会使生产者应用程序产生同步,而你使用消息通信就是想要避免同步,其实还有另一种发送确认模式和事务相仿,只需要将信道设置为confirm模式,所有信道上发布的消息都会被指派一个唯一的ID,一旦消息被投递给匹配的队列后,信道会发送一个发送方确认模式给生产者应用程序(包含唯一ID),使得生产者知道消息已经安全到达目的队列,如果消息和队列是可持久化的,那么确认消息只会在队列将消息写入磁盘后才会发出,相比于事务来说,最大的好处在于都是异步的,一旦发布了一条消息,生产者应用程序就可以在等待确认的同时继续发送下一条,当确认消息最终收到的时候,生产者应用的回调方法就会触发来处理该确认消息,如果RabbitMQ发生内部错误而导致消息丢失,会发送一条nack未确认消息,只是这次说明消息确实丢失了,此方式更加轻量级对于RabbitMQ消息代理服务器的性能影响几乎不记.
贯穿实例:
说明: 如上讲述了RabbitMQ的所有组件以及架构,但要结合起来理解一条真实消息的生命周期的最好方法是实践出真知,下面会使用PY的pika模块来演示Hello Word消息传递过程.
发布: 连接RabbitMQ->获取信道->声明交换机->创建消息->发布消息->关闭信道->关闭连接
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import sys import pika # 说明: 导入其它模块 if __name__ == ‘__main__‘: # 创建凭证对象 credentials = pika.PlainCredentials(‘guest‘, ‘guest‘) # 创建参数对象 conn_params = pika.ConnectionParameters( # RabbitMQ服务地址 host=‘127.0.0.1‘, # RabbitMQ服务端口 port=5672, # RabbitMQ登录凭证 credentials=credentials, # RabbitMQ虚拟主机 virtual_host=‘/‘ ) # 创建连接对象 conn_broker = pika.BlockingConnection(conn_params) # 获取信道对象 channel = conn_broker.channel() # 创建交换机 channel.exchange_declare( # 交换机名称 exchange="salt-exchange", # 交换机类型 type="direct", # 如果同名交换机已存在依然返回成功,否则创建 passive=False, # 声明为非持久化交换机 durable=False, # 交换机闲置也不会自动删除 auto_delete=False ) msg = sys.argv[1] # 创建配置对象 msg_props = pika.BasicProperties() # 设置内容类型 msg_props.content_type = ‘text/plain‘ # 尝试发布消息 channel.basic_publish( # 发布消息内容 body=msg, # 发布到交换机 exchange=‘salt-exchange‘, # 发布信息属性 properties=msg_props, # 发布信息时携带的路由键 routing_key=‘salt‘ )
说明: 首先用使用默认帐号密码guest,默认端口5672,默认虚拟主机/连接RabbitMQ Vhost,然后建立信道,利用信道和rabbitMQ进行通信,然后声明交换机,需要指定交换机名称,交换机类型,是否passive模式,如果非passive模式则表示想要声明交换机而非获取交换机信息,还可以指定是否持久化以及是否删除,最后通过命令行创建一条携带salt路由键类型为text/plain的消息通过basic_publish发送到salt-exchange交换机,但是此时由于并没有任何队列绑定在此交换机,所以消息必然会进入"黑洞"丢失掉.~
接收: 连接RabbitMQ->获得信道->声明交换机->声明队列->绑定队列到交换机->消费信息->关闭信道->关闭连接
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import pika # 说明: 导入其它模块 if __name__ == ‘__main__‘: # 创建凭证对象 credentials = pika.PlainCredentials(‘guest‘, ‘guest‘) # 创建参数对象 conn_params = pika.ConnectionParameters( # RabbitMQ服务地址 host=‘127.0.0.1‘, # RabbitMQ服务端口 port=5672, # RabbitMQ服务凭证 credentials=credentials, # RabbitMQ虚拟主机 virtual_host=‘/‘ ) # 创建连接对象 conn_broker = pika.BlockingConnection(conn_params) # 获取信道对象 channel = conn_broker.channel() # 创建交换机 channel.exchange_declare( # 交换机名称 exchange="salt-exchange", # 交换机类型 type="direct", # 如果同名交换机已存在依然返回成功 passive=False, # 声明为持久化交换机 durable=False, # 交换机闲置也不会自动删除 auto_delete=False ) # 创建队列 channel.queue_declare(queue="salt") # 绑定队列 channel.queue_bind( # 队列名称 queue="salt", # 交换机名称 exchange="salt-exchange", # 路由键名称 routing_key="salt" ) # 消息回调处理函数 def msg_consumer(channel, method, header, body): # 发送消息确认 channel.basic_ack(delivery_tag=method.delivery_tag) # 退出监听循环 if body == ‘exit‘: channel.basic_cancel(consumer_tag="salt-consumer") channel.stop_consuming() else: print ‘found notice: recive queue message {0}‘.format(body) return # 作为指定队列消费者 channel.basic_consume(msg_consumer, queue="salt", consumer_tag="salt-consumer") # 循环调用回调函数接收处理消息 channel.start_consuming()
说明: 首先用使用默认帐号密码guest,默认端口5672,默认虚拟主机/连接RabbitMQ Vhost,然后建立信道,利用信道和rabbitMQ进行通信,然后再次声明交换机,防止由于生产者没有声明交换机导致后面绑定队列失败,然后就是创建队列,创建队列时需要指定队列名称,然后就是绑定交换机,绑定的时候需要指定队列名称,交换机名称,绑定路由键,最后就是订阅指定队列,订阅时需要传递一个回调函数来处理消息,一个队列名称来指明要订阅的队列,一个标识进程的消费者标记,一旦开始读取消息则会开始一个阻塞的循环等待从信道进来的数据,如果要停止,则需要先使用basic_cancel结束消费(关闭信道和连接),注意需要提供进程标识,然后再stop_consuming停止消费者
确认: 连接RabbitMQ->获取信道->设置确认模式->声明交换机->创建消息->发布消息->关闭信道->关闭连接
#!/usr/bin/env python # -*- coding: utf-8 -*- """ # # Authors: limanman # OsChina: http://xmdevops.blog.51cto.com/ # Purpose: # """ # 说明: 导入公共模块 import sys import pika # 说明: 导入其它模块 if __name__ == ‘__main__‘: # 创建凭证对象 credentials = pika.PlainCredentials(‘guest‘, ‘guest‘) # 创建参数对象 conn_params = pika.ConnectionParameters( # RabbitMQ服务地址 host=‘127.0.0.1‘, # RabbitMQ服务端口 port=5672, # RabbitMQ登录凭证 credentials=credentials, # RabbitMQ虚拟主机 virtual_host=‘/‘ ) # 创建连接对象 conn_broker = pika.BlockingConnection(conn_params) # 获取信道对象 channel = conn_broker.channel() msg_ids = [] msg_ids.append(len(msg_ids)+1) # 确认模式回调函数 def confirm_handler(frame): # 第一次信道被设置为确认模式时会触发一次确认回调 if type(frame.method) == pika.spec.Confirm.SelectOk: print ‘found notice: channel in confirm mode‘ # 如果发送的消息达到队列后没有回应则说明消息丢失,需要重发 elif type(frame.method) == pika.spec.Basic.Nack: # 如果丢的消息确实是msg_ids里面的,则说明刚刚发的消息确实是丢失了~ if frame.method.delivery_tag in msg_ids: print ‘found errors: message may be lost‘ # 如果发送的消息到达队列后发回响应 elif type(frame.method) == pika.spec.Basic.Ack: # 如果确认消息id确实是msg_ids里面的,则从msg_ids里面删除 if frame.method.delivery_tag in msg_ids: print ‘found notice: message confirm received‘ # 删除已经确认的消息 msg_ids.remove(frame.method.delivery_tag) # 设置信道为确认模式 channel.confirm_delivery(callback=confirm_handler) # 创建交换机 channel.exchange_declare( # 交换机名称 exchange="salt-exchange", # 交换机类型 type="direct", # 如果同名交换机已存在依然返回成功,否则创建 passive=False, # 声明为非持久化交换机 durable=False, # 交换机闲置也不会自动删除 auto_delete=False ) msg = sys.argv[1] # 创建配置对象 msg_props = pika.BasicProperties() # 设置内容类型 msg_props.content_type = ‘text/plain‘ # 尝试发布消息 channel.basic_publish( # 发布消息内容 body=msg, # 发布到交换机 exchange=‘salt-exchange‘, # 发布信息属性 properties=msg_props, # 发布信息时携带的路由键 routing_key=‘salt‘ ) channel.close()
说明: RabbitMQ任何一个信道上发布的第一条消息都将获得ID1,并且信道上接下来的每一条消息的ID都会步进1,对于信道来说,消息ID是唯一的,所以一旦信道关闭,你将无法追踪发布在该信道上任何未完成的发送方确认消息状态,所以RabbitMQ并不会在发布消息时返回消息对应的ID,而需要我们自己为每个信道单独维护一个消息计数器,在几乎不影响RabbitMQ性能的前提下在生产者端用回调来处理消息确认.
本文出自 “满满李 - 运维开发之路” 博客,请务必保留此出处http://xmdevops.blog.51cto.com/11144840/1877695
消息队列_RabbitMQ-0002.深入MQ生产者/信道/交换机/队列/消费者?