首页 > 代码库 > RabbitMQ随笔

RabbitMQ随笔

不管是官方还是能搜到的文章,使用MQ的基本思路都是这样:

        static void Main(string[] args)        {           //通过工厂建立连接            using (IConnection connection = factory.CreateConnection())            {                //通过连接创建会话,这里还有可能是Channel                using (ISession session = connection.CreateSession())                {                    while (true)或者一个for 循环发送100万个消息                    {                          //创建一个msg                      string  message = "Hello World";                         //发送                       xxx.Send(message);                    }                }            }        }

那么问题来了:

这个"Hello World"怎么传进去?如何对外提供服务?

然后会发现有些客户端SDK是这么处理的:

       public static void SendMsg()        {            MQAPI("Hello World");        }        private static void MQAPI(string message)        {            IConnectionFactory factory;            //通过工厂建立连接            using (IConnection connection = factory.CreateConnection())            {                //通过连接创建Session会话                using (ISession session = connection.CreateSession())                {                       //创建一个msg                      string  message = message;                      //发送                      xxx.Send(message);                }            }        }

去公开一个接口调用SendMsg吧。

看起来似乎解决了这个问题,但是实际测试下会吓一跳:后者的QPS仅为前者的40%左右,这是不能容忍的。

那么接下来大家肯定会从SQLConnection的经验得出一个解决方案:

将connection抽出来,那么session或者Channel呢?

我们通过研究RabbitMQ的链接客户端和服务端链接过程(这个过程较为复杂,写了几遍都删了)得出如下结论:

  1. 先建立Connection链接,这个链接就是一个TCP链接,Producer和Consumer都是通过TCP连接到RabbitMQ Server 的。经过connection.start -> connection.start_ok -> connection.secure -> connection.secure_ok -> connection.tune -> connection.tune_ok(这时rabbit会建立一个心跳进程)-> connection.open -> connection.open_ok后,客户端与rabbit之间就认为已经建立 了连接。
  2. 再建立Channels: 虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。可以多路复用,1~65535为可用的channel编号,channel的索引不为0时(0是全局链接),rabbit认为这些数据从属于某个 channel。如果该channel进程不 存在,则会创建一个channel进程,并由此进程负责该channel上 的所有数据。根据AMQP协议,经过channel.open -> channel.open_ok后,客户端就可以开始在该channel上发送数据了。

那么,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理 高并发的能力。但是,在TCP连接中建立 Channel是没有上述代价的。对于Producer或者Consumer来说,可以并发的使用多个 Channel进行 Publish或者Receive。

然而我研究了RabbitMQ.Client代码之后发现其并未维护一个连接池:

                if (AutomaticRecoveryEnabled)                {                    var autorecoveringConnection = new AutorecoveringConnection(this, clientProvidedName);                    autorecoveringConnection.Init(endpointResolver);                    conn = autorecoveringConnection;                }                else                {                    IProtocol protocol = Protocols.DefaultProtocol;                    conn = protocol.CreateConnection(this, false, endpointResolver.SelectOne(this.CreateFrameHandler), clientProvidedName);                }

只是有一个自动恢复功能,需要设置为true建立长TCP链接,然后根据在请求的时候再创建Channel。

RabbitMQ随笔