首页 > 代码库 > 一个winform带你玩转rabbitMQ(二)
一个winform带你玩转rabbitMQ(二)
接上一篇内容 安装,简介和初探 下面我们接着来学习下RabbitMQ
一. exchange属性
Type
前一章我们说了exchange的类型分为fanout,direct,topic.还有一种不常用的headers。
headers这种类型的exchange绑定的时候会忽略掉routingkey,Headers是一个键值对,可以定义成成字典等。发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。之前的几种exchange的routingKey都需要要字符串形式的,而headers exchange则没有这个要求,因为键值对的值可以是任何类型
举个例子,发送端定义2个键值{k1,1},{k2,2},接收端绑定队列的时候定义{"x-match", "any"},那么接收端的键值属性里只要存在{k1,1}或{k2,2}都可以获取到消息。
这样的类型扩展的程度很大,适合非常复杂的业务场景。
Durability
持久性,这是exchange的可选属性,如果你Durability设置为false,那些当前会话结束的时候,该exchange也会被销毁。
新建一个transient exchange
关闭当前连接再查看一下
刚才我们新建的transient已经销毁了。
Auto delete
当没有队列或者其他exchange绑定到此exchange的时候,该exchange被销毁。这个很简单就不示例了。
Internal (比较简单 也不展示了)
表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定。
PS: 无法声明2个名称相同 但是类型却不同的exchange
二. Queue属性
Durability 和exchange相同,未持久化的队列,服务重启后销毁。
Auto delete 当没有消费者连接到该队列的时候,队列自动销毁。
Exclusive 使队列成为私有队列,只有当前应用程序可用,当你需要限制队列只有一个消费者,这是很有用的。
扩展属性如下对应源程序 RabbitMQ.Client.IModel.QueueDeclare(string, bool, bool, bool, System.Collections.Generic.IDictionary<string,object>)最后的参数
Message TTL 当一个消息被推送在该队列的时候 可以存在的时间 单位为ms,(对应扩展参数argument "x-message-ttl" )
Auto expire 在队列自动删除之前可以保留多长时间(对应扩展参数argument "x-expires")
Max length 一个队列可以容纳的已准备消息的数量(对应扩展参数argument "x-max-length")
... 更多参考 http://www.rabbitmq.com/extensions.html
ps:一旦创建了队列和交换机,就不能修改其标志了。例如,如果创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。
三. Message属性
Durability
消息的持久在代码中设置的方法与exchange和queue不同,有2种方法
1.
IBasicProperties properties = channel.CreateBasicProperties(); properties.SetPersistent(true); byte[] payload = Encoding.ASCII.GetBytes(message); channel.BasicPublish(exchange.name, txtMessageRoutingKey.Text.Trim(), properties, payload);
2.
IBasicProperties properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; byte[] payload = Encoding.ASCII.GetBytes(message); channel.BasicPublish(exchange.name, txtMessageRoutingKey.Text.Trim(), properties, payload);
contentType: 标识消息内容的MIME,例如JSON用application/json
replayTo: 标识回调的queue的地址
correlationId:用于request和response的关联,确保消息的请求和响应的同一性
Message的2种状态:
Ready
此状态的消息存在于队列中待处理。
Unacknowledged
此状态的消息表示已经在处理未确认。
说到Unacknowledged,这里需要了解一个ack的概念。当Consumer接收到消息、处理任务完成之后,会发送带有这个消息标示符的ack,来告诉server这个消息接收到并处理完成。RabbitMQ会一直等到处理某个消息的Consumer的链接失去之后,才确定这个消息没有正确处理,从而RabbitMQ重发这个消息。
Message acknowledgment是默认关闭的。初始化Consumer时有个noAck参数,如果设置为true,这个Consumer在收到消息之后会马上返回ack。
string BasicConsume(string queue, bool noAck, RabbitMQ.Client.IBasicConsumer consumer)
一般来说,常用的场景noack一般就是设置成true,但是对于风险要求比较高的项目,例如支付。对于每一条消息我们都需要保证他的完整性和正确性。就需要获取消息后确认执行完正确的业务逻辑后再主动返回一个ack给server。可以通过rabbitmqctl list_queues name message_rady message_unacknowleded 命令来查看队列中的消息情况,也可以通过后台管理界面。
我们先hold住一条消息
然后我们再关闭链接或者重启服务
数据还是完整的。
ps:message的消费还分为consume和baseget 下面讲到集群的时候再介绍。
四. binding相关
如果你绑定了一个durable的队列和一个durable的交换机,RabbitMQ会自动保留这个绑定。类似的,如果删除了某个队列或交换机(无论是不是 durable),依赖它的绑定都会自动删除。
在声明一个队列的同时,server会默认让此队列绑定在默认的exchange上,这个exchange的名称为空。
五. 发布订阅
我们上一章的demo中实际上已经使用了发布订阅模式。
RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。发布者(producer)只需要把消息发送给一个exchange。exchange非常简单,它一边从发布者方接收消息,一边把消息推入队列。exchange必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过exchange type来定义的。
发布订阅其实很简单,例如上章我所示例,假设我们一开始没有任何消息,现在有一个生产者P1,他是一个天气预报播放者。然后我们有2个消费者来订阅他的消息。
P1通过广播类型的交换机fEx来发布他的天气消息,c1,c2分别建立一个队列为Q1,Q2. 并且订阅P1的fEx.
基本可以如图所示
我们P1利用fEx生成一条消息的时候,c1,c2通过Q1,Q2都可以获取到p1所发布的消息
我们发布3条消息
查看队列情况
Q1:
Q2:
Q1,Q2都拿到了广播的消息,至于C1,C2如何消费这些消息,互相之间完全没有干扰。
ps:简单一句话 发布订阅中发布者所产生的消息通过exchange对所有绑定他的队列队形消息推送,每个队列获取绑定所对应的消息
六. 消费者集群
区分于发布订阅,消费者集群主要解决横向服务器扩展问题,如果一个队列积压太多,如何均与的让不同的消费者来承担。
默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。
我们开3个程序,1个生产 2个消费。
如图所示绑定关系如下
2个消费者用同样的程序,这里记录进程pid以区分,实际项目中可以用不同服务器来区分
启动消息消费,使消费者处理work状态
然后我们不停的通过生产者这发布消息
然后我们看下2个消费者的消费情况
1.
2.
3.
4.
5.
默认地,RabbitMQ会逐一地向下一个Consumer发放消息,每一个Consumer会得到数目相同的消息。文中所示之所以是按照1条一条的轮询,是因为程序中控制了一个队列单次消费的数量。
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)
本篇先到此,希望对大家有帮助,下周一发布第三篇。
一个winform带你玩转rabbitMQ(二)