首页 > 代码库 > 一个winform带你玩转rabbitMQ(二)

一个winform带你玩转rabbitMQ(二)

接上一篇内容 安装,简介和初探 下面我们接着来学习下RabbitMQ


一.  exchange属性

  Type

  前一章我们说了exchange的类型分为fanout,direct,topic.还有一种不常用的headers。
  headers这种类型的exchange绑定的时候会忽略掉routingkey,Headers是一个键值对,可以定义成成字典等。发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。之前的几种exchange的routingKey都需要要字符串形式的,而headers exchange则没有这个要求,因为键值对的值可以是任何类型
  举个例子,发送端定义2个键值{k1,1},{k2,2},接收端绑定队列的时候定义{"x-match", "any"},那么接收端的键值属性里只要存在{k1,1}或{k2,2}都可以获取到消息。
  这样的类型扩展的程度很大,适合非常复杂的业务场景。

  Durability

  持久性,这是exchange的可选属性,如果你Durability设置为false,那些当前会话结束的时候,该exchange也会被销毁。 
  新建一个transient exchange 
  
  关闭当前连接再查看一下
  

  

  刚才我们新建的transient已经销毁了。

  Auto delete

  当没有队列或者其他exchange绑定到此exchange的时候,该exchange被销毁。这个很简单就不示例了。

  Internal (比较简单 也不展示了)

  表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定。

  PS: 无法声明2个名称相同 但是类型却不同的exchange

  


二.  Queue属性  

  Durability 和exchange相同,未持久化的队列,服务重启后销毁。

  Auto delete 当没有消费者连接到该队列的时候,队列自动销毁。

  Exclusive 使队列成为私有队列,只有当前应用程序可用,当你需要限制队列只有一个消费者,这是很有用的。

  扩展属性如下对应源程序 RabbitMQ.Client.IModel.QueueDeclare(string, bool, bool, bool, System.Collections.Generic.IDictionary<string,object>)最后的参数

  Message TTL 当一个消息被推送在该队列的时候 可以存在的时间 单位为ms,(对应扩展参数argument "x-message-ttl" )

  Auto expire 在队列自动删除之前可以保留多长时间(对应扩展参数argument "x-expires")

  Max length 一个队列可以容纳的已准备消息的数量(对应扩展参数argument "x-max-length")

  ... 更多参考 http://www.rabbitmq.com/extensions.html

  ps:一旦创建了队列和交换机,就不能修改其标志了。例如,如果创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。


三.  Message属性

  Durability 

  消息的持久在代码中设置的方法与exchange和queue不同,有2种方法

  1.

  IBasicProperties properties = channel.CreateBasicProperties();  properties.SetPersistent(true);  byte[] payload = Encoding.ASCII.GetBytes(message);  channel.BasicPublish(exchange.name, txtMessageRoutingKey.Text.Trim(), properties, payload);

  2.

  IBasicProperties properties = channel.CreateBasicProperties();  properties.DeliveryMode = 2;  byte[] payload = Encoding.ASCII.GetBytes(message);  channel.BasicPublish(exchange.name, txtMessageRoutingKey.Text.Trim(), properties, payload);

  contentType: 标识消息内容的MIME,例如JSON用application/json

  replayTo: 标识回调的queue的地址

  correlationId:用于request和response的关联,确保消息的请求和响应的同一性

  Message的2种状态:

  Ready

  此状态的消息存在于队列中待处理。

  Unacknowledged

  此状态的消息表示已经在处理未确认。

  说到Unacknowledged,这里需要了解一个ack的概念。当Consumer接收到消息、处理任务完成之后,会发送带有这个消息标示符的ack,来告诉server这个消息接收到并处理完成。RabbitMQ会一直等到处理某个消息的Consumer的链接失去之后,才确定这个消息没有正确处理,从而RabbitMQ重发这个消息。
  Message acknowledgment是默认关闭的。初始化Consumer时有个noAck参数,如果设置为true,这个Consumer在收到消息之后会马上返回ack。

  string BasicConsume(string queue, bool noAck, RabbitMQ.Client.IBasicConsumer consumer)

  一般来说,常用的场景noack一般就是设置成true,但是对于风险要求比较高的项目,例如支付。对于每一条消息我们都需要保证他的完整性和正确性。就需要获取消息后确认执行完正确的业务逻辑后再主动返回一个ack给server。可以通过rabbitmqctl list_queues name message_rady message_unacknowleded 命令来查看队列中的消息情况,也可以通过后台管理界面。

  我们先hold住一条消息

  

  然后我们再关闭链接或者重启服务

  
  数据还是完整的。 

  ps:message的消费还分为consume和baseget 下面讲到集群的时候再介绍。


四.  binding相关

  如果你绑定了一个durable的队列和一个durable的交换机,RabbitMQ会自动保留这个绑定。类似的,如果删除了某个队列或交换机(无论是不是 durable),依赖它的绑定都会自动删除。

  在声明一个队列的同时,server会默认让此队列绑定在默认的exchange上,这个exchange的名称为空。

   


 五.  发布订阅

  我们上一章的demo中实际上已经使用了发布订阅模式。

  RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。发布者(producer)只需要把消息发送给一个exchange。exchange非常简单,它一边从发布者方接收消息,一边把消息推入队列。exchange必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过exchange type来定义的。

  

  发布订阅其实很简单,例如上章我所示例,假设我们一开始没有任何消息,现在有一个生产者P1,他是一个天气预报播放者。然后我们有2个消费者来订阅他的消息。
  P1通过广播类型的交换机fEx来发布他的天气消息,c1,c2分别建立一个队列为Q1,Q2. 并且订阅P1的fEx.

  基本可以如图所示
  
  我们P1利用fEx生成一条消息的时候,c1,c2通过Q1,Q2都可以获取到p1所发布的消息

  我们发布3条消息
  
  查看队列情况
  Q1:
  
  Q2:

  

  Q1,Q2都拿到了广播的消息,至于C1,C2如何消费这些消息,互相之间完全没有干扰。

  ps:简单一句话 发布订阅中发布者所产生的消息通过exchange对所有绑定他的队列队形消息推送,每个队列获取绑定所对应的消息


六.  消费者集群

  区分于发布订阅,消费者集群主要解决横向服务器扩展问题,如果一个队列积压太多,如何均与的让不同的消费者来承担。

  

  默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。

  我们开3个程序,1个生产 2个消费。

  如图所示绑定关系如下

  

  2个消费者用同样的程序,这里记录进程pid以区分,实际项目中可以用不同服务器来区分

  

   启动消息消费,使消费者处理work状态

  

  然后我们不停的通过生产者这发布消息

  

  然后我们看下2个消费者的消费情况

  1.

  

  2.
  

  3.
  

  4.
  

  5.
  
  

  默认地,RabbitMQ会逐一地向下一个Consumer发放消息,每一个Consumer会得到数目相同的消息。文中所示之所以是按照1条一条的轮询,是因为程序中控制了一个队列单次消费的数量。

  void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)


   本篇先到此,希望对大家有帮助,下周一发布第三篇。  

一个winform带你玩转rabbitMQ(二)