首页 > 代码库 > 高并发场景之RabbitMQ篇

高并发场景之RabbitMQ篇

上次我们介绍了在单机、集群下高并发场景可以选择的一些方案,传送门:高并发场景之一般解决方案

但是也发现了一些问题,比如集群下使用ConcurrentQueue或加锁都不能解决问题,后来采用Redis队列也不能完全解决问题,

因为使用Redis要自己实现分布式锁

 

这次我们来了解一下一个专门处理队列的组件:RabbitMQ,这个东西天生支持分布式队列。

下面我们来用RabbitMQ来实现上一篇的场景

 

一、新建RabbitMQ.Receive

private static ConnectionFactory factory = new ConnectionFactory 
{ HostName = "192.168.1.109", UserName = "ljr", Password = "root", VirtualHost = "/" };
 1         static void Main(string[] args) 2         { 3             using (var connection = factory.CreateConnection()) 4             { 5                 using (var channel = connection.CreateModel()) 6                 { 7                     var consumer = new EventingBasicConsumer(); 8                     consumer.Received += (model, ea) => 9                     {10                         var body = ea.Body;11                         var message = Encoding.UTF8.GetString(body);12                         Console.WriteLine(" [x] Received {0}", message);13 14                         var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString();15                         var value = http://www.mamicode.com/int.Parse(total) + 1;16 17                         DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null);18                     };19 20                     channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null);21                     channel.BasicConsume(queue: "queueName", noAck: true, consumer: consumer);22 23                     Console.WriteLine(" Press [enter] to exit.");24                     Console.ReadLine();25                 }26             }27         }

二、新建RabbitMQ.Send  

 1         static void Main(string[] args) 2         { 3             for (int i = 1; i <= 500; i++) 4             { 5                 Task.Run(async () => 6                 { 7                     await Produce(); 8                 }); 9 10                 Console.WriteLine(i);11             }12 13             Console.ReadKey();14         }15 16         public static Task Produce()17         {18             return Task.Factory.StartNew(() =>19             {20                 using (var connection = factory.CreateConnection())21                 {22                     using (var channel = connection.CreateModel())23                     {24                         var body = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());25                         channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null);26                         channel.BasicPublish(exchange: "", routingKey: "queueName", basicProperties: null, body: body);27                     }28                 }29             });30         }

 

这里是模拟500个用户请求,正常的话最后Total就等于500

我们来说试试看,运行程序

2.1、打开接收端

技术分享

2.2 运行客户端

技术分享

2.3、可以看到2边几乎是实时的,再去看看数据库

技术分享

三、我们在集群里执行 

技术分享技术分享

 

最后数据是1000

技术分享

 

 完全没有冲突,好了,就是这样 、。

 

高并发场景之RabbitMQ篇