首页 > 代码库 > RabbitMQ Queue分发多个Consumer
RabbitMQ Queue分发多个Consumer
多个Consumer的消息分发
之前讲过一个queue对应一个consumer的小例子, 但是在实际项目中,一个consumber肯定是不够的,queue中的消息过多。一个consumber明显会处理过慢,等待时间过长。这时候就需要多个consumber来缓解压力。
消息发布端
无论是创建connection还是创建channel与之前的步骤都是一样的,在上面我们使用的是默认的交换机。在这里可以自己声明一个交换机
这里是与上个例子不同的地方,创建了exchange,把queue绑定到了交换机上。然后去发布一百个消息
//声明一个direct类型的交换机channel.ExchangeDeclare("firstExchange", "direct", true, false, null);//声明队列 channel.QueueDeclare("firstTest", true, false, false, null); //绑定队列 channel.QueueBind("firstTest", "firstExchange", "firstExchange_Demo_firstTest", null);//发布一百个消息for (var i = 0; i < 100; i++) { var msg = Encoding.UTF8.GetBytes($"{i} :Hello RabbitMQ"); channel.BasicPublish("firstExchange", routingKey: "firstExchange_Demo_firstTest", basicProperties: null, body: msg);}
Consumer
这边大部分也与上个例子的代码一致。
//使用订阅的方式//这里的创建队列,是为了防止 消费 在 生产 之前channel.QueueDeclare("firstTest", true, false, false, null);//绑定队列
channel.ExchangeDeclare("firstExchange", "direct", true, false, null);
channel.QueueBind("firstTest", "firstExchange", "firstExchange_Demo_firstTest", null); var consumer = new EventingBasicConsumer(channel);consumer.Received += (sender, e) =>{ var msg = Encoding.UTF8.GetString(e.Body); Console.WriteLine(msg);};//进行消费 channel.BasicConsume("firstTest", true, consumer);
然后可以运行多个consumer,然后再打开消息发布程序。观察我们不同的consumer窗口
可以看到queue是把第n个消息发送给了第n个consumer。如果这时候有三个consumber。那么它们收到的消息顺序分别是
consumer1 consumer2 consumer3
1 2 3
4 5 6
这会带来一个问题,当consumber过多的时候,消息就会分配的不均匀,导致某些concumer非常忙,有些特别闲。而且consumber也会有掉线的情况,甚至queue和rabbitmq也会有崩溃的情况,这时候应该如此保持我们的消息的有效性、持久性、以及准确性呢。这些在下一篇博文中会详细说到
RabbitMQ Queue分发多个Consumer
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。