首页 > 代码库 > ActiveMQ发布订阅模式

ActiveMQ发布订阅模式

ActiveMQ的另一种模式就SUB/HUB即发布订阅模式,是SUB/hub就是一拖N的USB分线器的意思。意思就是一个来源分到N个出口。还是上节的例子,当一个订单产生后,后台N个系统需要联动,但有一个前提是都需要收到订单信息,那么我们就需要将一个生产者的消息发布到N个消费者。

生产者:

 

            try            {                //Create the Connection Factory                  IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");                using (IConnection connection = factory.CreateConnection())                {                    //Create the Session                      using (ISession session = connection.CreateSession())                    {                        //Create the Producer for the topic/queue                          IMessageProducer prod = session.CreateProducer(                            new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"));                        //Send Messages                          int i = 0;                        while (!Console.KeyAvailable)                        {                            ITextMessage msg = prod.CreateTextMessage();                            msg.Text = i.ToString();                            Console.WriteLine("Sending: " + i.ToString());                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);                            System.Threading.Thread.Sleep(5000);                            i++;                        }                    }                }                Console.ReadLine();            }            catch (System.Exception e)            {                Console.WriteLine("{0}", e.Message);                Console.ReadLine();            }

假设生产者每5秒发送一次消息:

技术分享

消费者:

        static void Main(string[] args)        {            try              {                  //Create the Connection factory                  IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");                                    //Create the connection                  using (IConnection connection = factory.CreateConnection())                  {                      connection.ClientId = "testing listener1";                      connection.Start();                        //Create the Session                      using (ISession session = connection.CreateSession())                      {                          //Create the Consumer                          IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener1", null, false);                                                    consumer.Listener += new MessageListener(consumer_Listener);                            Console.ReadLine();                      }                      connection.Stop();                      connection.Close();                  }              }              catch (System.Exception e)              {                  Console.WriteLine(e.Message);              }          }            static void consumer_Listener(IMessage message)          {              try              {                  ITextMessage msg = (ITextMessage)message;                  Console.WriteLine("Receive: " + msg.Text);             }              catch (System.Exception e)              {                  Console.WriteLine(e.Message);              }          }

 

启动一个消费者:

技术分享

我们发现他是从15开始的,而不是像上节一样从头开始,再启动另一个消费者:

技术分享

我们发现就是从启动时开始接受消息的,之前的消息就丢失了。

整体状态如下:

技术分享

我们观察管理界面:

技术分享

产生了一个testing的Topics,而订阅方有2个都订阅的是testing:

技术分享

这样只需要在需要获取消息的地方订阅即可及时获得。

源代码下载

ActiveMQ发布订阅模式