首页 > 代码库 > Azure Messaging-ServiceBus Messaging消息队列技术系列3-消息顺序保证

Azure Messaging-ServiceBus Messaging消息队列技术系列3-消息顺序保证

上一篇:Window Azure ServiceBus Messaging消息队列技术系列2-编程SDK入门  http://www.cnblogs.com/tianqing/p/5944573.html

介绍了Azure Service Bus的编程SDK(主要的编程接口)

本文中我们以实际的使用场景来说明Azure Messaging是否支持以及如何编码实现:消息的收发顺序保证

消息的收发在实际业务中往往是有顺序的:发送时1-2-3-4-5,接收时也必须是1-2-3-4-5,即FIFO特性。

在本文的Demo中,我们模拟销售订单消息队列异步处理场景,消息体是一条SalesOrder,顺序发送,顺序接收。

1. 我们还是使用上篇博客中在Windows Azure的Portal上建立好的NameSpaceservicebustest

销售订单队列名称:OrderQueue

2.简单封装一个Service Bus的工具类:ServiceBusUtils: 用于创建队列、删除队列、创建QueueClient、创建BrokerdMessage

using Microsoft.ServiceBus;using Microsoft.ServiceBus.Messaging;using System;using System.Collections.Generic;using System.Linq;using System.Runtime.Serialization;using System.Text;using System.Threading.Tasks;namespace AzureMessaging.FIFO{    /// <summary>    /// ServiceBus工具类    /// </summary>    class ServiceBusUtils    {        //Namespace名称        private static readonly string namespaceName = "servicebustest";        /// <summary>        /// 创建队列        /// </summary>        /// <param name="queueName">队列名称</param>        /// <param name="isSession">是否支持会话</param>        public void CreateQueue(string queueName, bool isSession = true)        {            var namespaceClient = NamespaceManager.Create();            if (namespaceClient.QueueExists(queueName))            {                namespaceClient.DeleteQueue(queueName);            }            var queue = new QueueDescription(queueName) { RequiresSession = isSession };            namespaceClient.CreateQueue(queue);        }        /// <summary>        /// 删除队列        /// </summary>        /// <param name="queueName">队列名称</param>                public void DeleteQueue(string queueName)        {            var namespaceClient = NamespaceManager.Create();            if (namespaceClient.QueueExists(queueName))            {                namespaceClient.DeleteQueue(queueName);            }        }        /// <summary>        /// 创建队列客户端        /// </summary>        /// <returns>队列客户端</returns>        public QueueClient GetQueueClient(string queueName, bool isSession = false, ReceiveMode mode = ReceiveMode.ReceiveAndDelete)        {            return QueueClient.Create(queueName, mode);        }        /// <summary>        /// 创建队列客户端        /// </summary>        /// <returns>队列客户端</returns>        public QueueClient GetReceiveQueueClient(string queueName, ReceiveMode mode = ReceiveMode.PeekLock)        {            var namespaceClient = NamespaceManager.Create();            return QueueClient.Create(queueName, mode);        }        /// <summary>        /// 构造消息        /// </summary>        /// <param name="serializableObject">可序列化的对象</param>        /// <returns>消息</returns>        public BrokeredMessage Create(Object serializableObject)        {            var serializer = new DataContractSerializer(serializableObject.GetType(),                 new DataContractSerializerSettings() { IgnoreExtensionDataObject = true, PreserveObjectReferences = false });            var message = new BrokeredMessage(serializableObject);            message.Properties.Add("Type", serializableObject.GetType().ToString());            return message;        }    }}

2. 示例SalesOrder实体类

using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;namespace AzureMessaging.FIFO{    /// <summary>    /// 销售订单类    /// </summary>    public class SalesOrder    {        /// <summary>        /// 订单ID        /// </summary>        public string OrderID { get; set; }        /// <summary>        /// 订单编号        /// </summary>        public string Code { get; set; }        /// <summary>        /// 创建时间        /// </summary>        public DateTime CreateTime { get; set; }        /// <summary>        /// 总价格        /// </summary>        public Decimal TotalPrice { get; set; }        /// <summary>        /// 产品ID        /// </summary>        public int ProductID { get; set; }    }}

3. 消息顺序发送

向OrderQueue发送10条消息订单消息,输出每条消息的顺序号以及MessageID

private static readonly string queueName = "OrderQueue";    /// <summary>        /// 发送消息        /// </summary>        private static void MessageSend()        {            var sbUtils = new ServiceBusUtils();            //创建队列            sbUtils.CreateQueue(queueName, false);            //顺序发送消息到OrderQueue            var queueSendClient = sbUtils.GetQueueClient(queueName);            for (int i = 0; i < 10; i++)            {                var order = new SalesOrder() { OrderID = i.ToString(), Code = "SalesOrder_" + i, CreateTime = DateTime.Now, ProductID = 17967, TotalPrice = new decimal(19999) };                var message = sbUtils.Create(order);                queueSendClient.Send(message);                Console.WriteLine(string.Format("Send {0} Message: {1}", i, message.MessageId));            }            Console.WriteLine("Send Completed!");        }

程序输出:

技术分享

4. 消息顺序接收

消费OrderQueue中的消息,验证消息的接收顺序

private static readonly string queueName = "OrderQueue"; /// <summary>        /// 接收消息        /// </summary>        private static void MessageReceive()        {            int index = 0;            BrokeredMessage msg = null;            var sbUtils = new ServiceBusUtils();            var queueReveiveClient = sbUtils.GetReceiveQueueClient(queueName, ReceiveMode.ReceiveAndDelete);            while ((msg = queueReveiveClient.Receive(TimeSpan.FromMilliseconds(3))) != null)            {                Console.WriteLine(string.Format("Received {0} Message: {1}", index, msg.MessageId));                index++;            }            ////删除队列            //sbUtils.DeleteQueue(queueName);            Console.WriteLine("Receive Completed!");        }

程序输出:

技术分享

可以看出,Azure Messaging中ServiceBus对消息的收发是有顺序保证的。

下一篇我们继续其他特性的验证和介绍。

 

周国庆

2017/3/9

 

Azure Messaging-ServiceBus Messaging消息队列技术系列3-消息顺序保证