首页 > 代码库 > ActiveMq C#客户端 消息队列的使用(存和取)
ActiveMq C#客户端 消息队列的使用(存和取)
1、准备工具
VS2013Apache.NMS.ActiveMQ-1.7.2-bin.zip
apache-activemq-5.14.0-bin.zip
apache-activemq-5.14.0-bin.zip
2、开始项目
VS2013新建一个C#控制台应用程序,项目中添加两个dll引用,一个是D:\Apache.NMS.ActiveMQ-1.7.2-bin\lib\Apache.NMS\net-4.0目录下的Apache.NMS.dll,另一个是D:\Apache.NMS.ActiveMQ-1.7.2-bin\build\net-4.0\debug目录下的Apache.NMS.ActiveMQ.dll。
新建一个类,MyActiveMq.cs,用于对activemq消息队列接口的封装,实现如下:
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 | using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Apache.NMS; using Apache.NMS.ActiveMQ; namespace NmsProducerClasses { public class MyActiveMq { private IConnectionFactory factory; private IConnection connection; private ISession session; private IMessageProducer prod; private IMessageConsumer consumer; private ITextMessage msg; private bool isTopic = false ; private bool hasSelector = false ; private const string ClientID = "clientid" ; private const string Selector = "filter=‘demo‘" ; private bool sendSuccess = true ; private bool receiveSuccess = true ; public MyActiveMq(bool isLocalMachine, string remoteAddress) { try { //初始化工厂 if (isLocalMachine) { factory = new ConnectionFactory( "tcp://localhost:61616/" ); } else { factory = new ConnectionFactory( "tcp://" + remoteAddress + ":61616/" ); //写tcp:// 192.168 . 1.111 : 61616 的形式连接其他服务器上的ActiveMQ服务器 } //通过工厂建立连接 connection = factory.CreateConnection(); connection.ClientId = ClientID; connection.Start(); //通过连接创建Session会话 session = connection.CreateSession(); } catch (System.Exception e) { sendSuccess = false ; receiveSuccess = false ; Console.WriteLine( "Exception:{0}" , e.Message); Console.ReadLine(); throw e; } Console.WriteLine( "Begin connection..." ); } ~MyActiveMq() { //this.ShutDown(); } /// <summary> /// 初始化 /// </summary> ///<param name="topic">选择是否是Topic ///<param name="name">队列名 ///<param name="selector">是否设置过滤 public bool InitQueueOrTopic(bool topic, string name, bool selector = false ) { try { //通过会话创建生产者、消费者 if (topic) { prod = session.CreateProducer( new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name)); if (selector) { consumer = session.CreateDurableConsumer( new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name), ClientID, Selector, false ); hasSelector = true ; } else { consumer = session.CreateDurableConsumer( new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic(name), ClientID, null , false ); hasSelector = false ; } isTopic = true ; } else { prod = session.CreateProducer( new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name)); if (selector) { consumer = session.CreateConsumer( new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name), Selector); hasSelector = true ; } else { consumer = session.CreateConsumer( new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(name)); hasSelector = false ; } isTopic = false ; } //创建一个发送的消息对象 msg = prod.CreateTextMessage(); } catch (System.Exception e) { sendSuccess = false ; receiveSuccess = false ; Console.WriteLine( "Exception:{0}" , e.Message); Console.ReadLine(); throw e; } return sendSuccess; } public bool SendMessage(string message, string msgId = "defult" , MsgPriority priority = MsgPriority.Normal) { if (prod == null ) { sendSuccess = false ; Console.WriteLine( "call InitQueueOrTopic() first!!" ); return false ; } Console.WriteLine( "Begin send messages..." ); //给这个对象赋实际的消息 msg.NMSCorrelationID = msgId; msg.Properties[ "MyID" ] = msgId; msg.NMSMessageId = msgId; msg.Text = message; Console.WriteLine(message); if (isTopic) { sendSuccess = ProducerSubcriber(message, priority); } else { sendSuccess = P2P(message, priority); } return sendSuccess; } public string GetMessage() { if (prod == null ) { Console.WriteLine( "call InitQueueOrTopic() first!!" ); return null ; } Console.WriteLine( "Begin receive messages..." ); ITextMessage revMessage = null ; try { //同步阻塞10ms,没消息就直接返回null,注意此处时间不能设太短,否则还没取到消息就直接返回null了!!! revMessage = consumer.Receive( new TimeSpan(TimeSpan.TicksPerMillisecond * 10 )) as ITextMessage; } catch (System.Exception e) { receiveSuccess = false ; Console.WriteLine( "Exception:{0}" , e.Message); Console.ReadLine(); throw e; } if (revMessage == null ) { Console.WriteLine( "No message received!" ); return null ; } else { Console.WriteLine( "Received message with Correlation ID: " + revMessage.NMSCorrelationID); //Console.WriteLine("Received message with Properties‘ID: " + revMessage.Properties["MyID"]); Console.WriteLine( "Received message with text: " + revMessage.Text); } return revMessage.Text; } //P2P模式,一个生产者对应一个消费者 private bool P2P(string message, MsgPriority priority) { try { if (hasSelector) { //设置消息对象的属性,这个很重要,是Queue的过滤条件,也是P2P消息的唯一指定属性 msg.Properties.SetString( "filter" , "demo" ); //P2P模式 } prod.Priority = priority; //设置持久化 prod.DeliveryMode = MsgDeliveryMode.Persistent; //生产者把消息发送出去,几个枚举参数MsgDeliveryMode是否持久化,MsgPriority消息优先级别,存活时间,当然还有其他重载 prod.Send(msg, MsgDeliveryMode.Persistent, priority, TimeSpan.MinValue); } catch (System.Exception e) { sendSuccess = false ; Console.WriteLine( "Exception:{0}" , e.Message); Console.ReadLine(); throw e; } return sendSuccess; } //发布订阅模式,一个生产者多个消费者 private bool ProducerSubcriber(string message, MsgPriority priority) { try { prod.Priority = priority; //设置持久化,如果DeliveryMode没有设置或者设置为NON_PERSISTENT,那么重启MQ之后消息就会丢失 prod.DeliveryMode = MsgDeliveryMode.Persistent; prod.Send(msg, Apache.NMS.MsgDeliveryMode.Persistent, priority, TimeSpan.MinValue); //System.Threading.Thread.Sleep(1000); } catch (System.Exception e) { sendSuccess = false ; Console.WriteLine( "Exception:{0}" , e.Message); Console.ReadLine(); throw e; } return sendSuccess; } public void ShutDown() { Console.WriteLine( "Close connection and session..." ); session.Close(); connection.Close(); } } } |
Program.cs代码如下:
?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.IO; using System.Threading; namespace NmsProducerClasses { class Program { static void Main(string[] args) { MyActiveMq mymq = new MyActiveMq(isLocalMachine: true , remoteAddress: "" ); mymq.InitQueueOrTopic(topic: false , name: "myqueue" , selector: false ); //mymq.InitQueueOrTopic(topic: false, name: "seletorqueue", selector: true); //mymq.InitQueueOrTopic(topic: true, name: "noselectortopic", selector: false); //mymq.InitQueueOrTopic(topic: true, name: "selectortopic", selector: true); //The full range of priority values (0-9) are supported by the JDBC message store. For KahaDB three priority categories are supported, Low (< 4), Default (= 4) and High (> 4). User myuser0 = new User( "0000" , "Lowest" , "img/p.jpg" ); mymq.SendMessage(JsonUtil.ObjectToJson(myuser0), "newid" , priority: Apache.NMS.MsgPriority.Lowest); User myuser1 = new User( "1111" , "AboveLow" , "img/p.jpg" ); mymq.SendMessage(JsonUtil.ObjectToJson(myuser1), "newid" , priority: Apache.NMS.MsgPriority.AboveLow); User myuser2 = new User( "2222" , "AboveNormal" , "img/p.jpg" ); mymq.SendMessage(JsonUtil.ObjectToJson(myuser2), "newid" , priority: Apache.NMS.MsgPriority.AboveNormal); User myuser3 = new User( "0000" , "BelowNormal" , "img/p.jpg" ); mymq.SendMessage(JsonUtil.ObjectToJson(myuser3), "newid" , priority: Apache.NMS.MsgPriority.BelowNormal); User myuser4 = new User( "1111" , "High" , "img/p.jpg" ); mymq.SendMessage(JsonUtil.ObjectToJson(myuser4), "newid" , priority: Apache.NMS.MsgPriority.High); User myuser5 = new User( "2222" , "Highest" , "img/p.jpg" ); mymq.SendMessage(JsonUtil.ObjectToJson(myuser5), "newid" , priority: Apache.NMS.MsgPriority.Highest); User myuser6 = new User( "0000" , "Low" , "img/p.jpg" ); mymq.SendMessage(JsonUtil.ObjectToJson(myuser6), "newid" , priority: Apache.NMS.MsgPriority.Low); User myuser7 = new User( "1111" , "Normal" , "img/p.jpg" ); mymq.SendMessage(JsonUtil.ObjectToJson(myuser7), "newid" , priority: Apache.NMS.MsgPriority.Normal); User myuser8 = new User( "2222" , "VeryHigh" , "img/p.jpg" ); mymq.SendMessage(JsonUtil.ObjectToJson(myuser8), "newid" , priority: Apache.NMS.MsgPriority.VeryHigh); User myuser9 = new User( "2222" , "VeryLow" , "img/p.jpg" ); mymq.SendMessage(JsonUtil.ObjectToJson(myuser8), "newid" , priority: Apache.NMS.MsgPriority.VeryLow); int num = 20 ; while (num-- > 0 ) { mymq.GetMessage(); //Thread.Sleep(1000); } mymq.ShutDown(); //XML测试 //string xml = XmlTest.ObjToXml(); //Console.WriteLine("ObjToXml: {0}", xml); //Json测试 //User u = new User() { Id="88", Imgurl="img/88.jpg", Name="haha88"}; //string jsonstr = JsonUtil.ObjectToJson(u); //Console.WriteLine(jsonstr); } } |
3、测试
首先,需要启动消息队列,具体启动及测试消息队列步骤可见这边:点击打开链接
然后,运行项目,运行结果如下:
4、存在问题
priority并不能决定消息传送的严格消息,具体原因可见
http://activemq.apache.org/how-can-i-support-priority-queues.html
http://shift-alt-ctrl.iteye.com/blog/2034440
ActiveMq C#客户端 消息队列的使用(存和取)
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。