首页 > 代码库 > WCF基于MSMQ的事件代理服务

WCF基于MSMQ的事件代理服务

前言

公司目前楼主负责的项目正在改版升级,对之前的服务也在作调整,项目里有个操作日志的模块,就决定把日志单独提取出来,做个日志服务,所以就有了这篇文章

正文

MSMQ作为消息队列,B/S项目调用日志服务,日志服务往消息队列发送消息,事件代理服务负责处理消息队列的消息,贴下核心代码

事件代理服务契约

using System;using System.Collections.Generic;using System.Linq;using System.ServiceModel;using System.Text;using TestService.Contract.Faults;namespace TestService.Contract{    /// <summary>    /// 事件代理    /// </summary>    [ServiceContract(Namespace = "http://TestService.Contract",                     SessionMode = SessionMode.Required,                     CallbackContract = typeof(IEventBrokerCallback))]    public interface IEventBroker    {        [OperationContract(IsOneWay = false)]        [FaultContract(typeof(EventBrokerException))]        void Subscribe(Guid subscriptionId, string[] eventNames);        [OperationContract(IsOneWay = true)]        void EndSubscription(Guid subscriptionId);    }}

事件代理服务回调处理契约

using System;using System.Collections.Generic;using System.Linq;using System.ServiceModel;using System.Text;using TestService.Contract.Data;namespace TestService.Contract{    /// <summary>    /// 事件代理回调    /// </summary>    public interface IEventBrokerCallback    {        [OperationContract(IsOneWay = true)]        void ReceiveStreamingResult(RealTimeEventMessage streamingResult);    }}

事件代理服务异常实体

using System;using System.Collections.Generic;using System.Linq;using System.Runtime.Serialization;using System.Text;namespace TestService.Contract.Faults{    [DataContract]    public class EventBrokerException    {        [DataMember]        public string Message        {            get;            set;        }        public EventBrokerException(string message)        {            Message = message;        }    }}

消息处理实体

using System;using System.Collections.Generic;using System.Linq;using System.Runtime.Serialization;using System.Text;namespace TestService.Contract.Data{    /// <summary>    /// 数据实体    /// </summary>    [DataContract]    public class RealTimeEventMessage    {        public RealTimeEventMessage()        {        }        public RealTimeEventMessage(MessageModel msg, string eventName, string entityIdType,            string description, string additionalData, DateTime date)        {            this.Msg = msg;            this.EventName = eventName;            this.EntityIdType = entityIdType;            this.Description = description;            this.AdditionalData =http://www.mamicode.com/ additionalData;            this.Date = date;        }        [DataMember]        public MessageModel Msg { get; set; }        [DataMember]        public string EventName { get; set; }        [DataMember]        public string EntityIdType { get; set; }        [DataMember]        public string Description { get; set; }        [DataMember]        public string AdditionalData { get; set; }        [DataMember]        public DateTime? Date { get; set; }    }}

以上是事件代理服务的契约部分,下面看看实现,先看EventBroker的实现

using System;using System.Collections.Generic;using System.Linq;using System.Messaging;using System.ServiceModel;using System.Threading.Tasks;using TestService.ApplicationService.Data;using TestService.ApplicationService.Services.Interface;using TestService.Common.IoC;using TestService.Contract;using TestService.Contract.Data;using TestService.Contract.Faults;namespace TestService.ApplicationService{    [IocServiceBehavior]    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)]    public class EventBroker : ApplicationBaseService, IEventBroker    {        Dictionary<string, List<UniqueCallbackHandle>> eventNameToCallbackLookups = new Dictionary<string, List<UniqueCallbackHandle>>();        private static Object syncObj = new Object();        private static string inputQueueName = "";        private bool shouldRun = true;        private static readonly TimeSpan queueReadTimeOut = TimeSpan.FromSeconds(500);        private static readonly TimeSpan queuePeekTimeOut = TimeSpan.FromSeconds(30);        private IXmlParserService _xmlParserService;        public EventBroker(IXmlParserService xmlParserService)        {            inputQueueName = AppSettings.InputQueueName;            StartCollectingMessage();            _xmlParserService = xmlParserService;        }        public void StartCollectingMessage()        {            try            {                GetMessageFromQueue();            }            catch (Exception ex)            {                throw new FaultException<EventBrokerException>(new EventBrokerException(ex.Message), new FaultReason(ex.Message));            }        }        public void Subscribe(Guid subscriptionId, string[] eventNames)        {            try            {                CreateSubscription(subscriptionId, eventNames);            }            catch (Exception ex)            {                throw new FaultException<EventBrokerException>(new EventBrokerException(ex.Message), new FaultReason(ex.Message));            }        }        public void EndSubscription(Guid subscriptionId)        {            lock (syncObj)            {                //create new dictionary that will be populated by those remaining                Dictionary<string, List<UniqueCallbackHandle>> remainingEventNameToCallbackLookups =                    new Dictionary<string, List<UniqueCallbackHandle>>();                foreach (KeyValuePair<string, List<UniqueCallbackHandle>> kvp in eventNameToCallbackLookups)                {                    //get all the remaining subscribers whos session id is not the same as the one we wish to remove                    List<UniqueCallbackHandle> remainingMessageSubscriptions =                        kvp.Value.Where(x => x.CallbackSessionId != subscriptionId).ToList();                    if (remainingMessageSubscriptions.Any())                    {                        remainingEventNameToCallbackLookups.Add(kvp.Key, remainingMessageSubscriptions);                    }                }                //now left with only the subscribers that are subscribed                eventNameToCallbackLookups = remainingEventNameToCallbackLookups;            }        }        #region 私有方法        /// <summary>        /// 从消息队列中获取消息        /// </summary>        private void GetMessageFromQueue()        {            try            {                Task messageQueueReaderTask = Task.Factory.StartNew(() =>                {                    using (MessageQueue queue = new MessageQueue(inputQueueName, QueueAccessMode.Receive))                    {                        queue.Formatter = new XmlMessageFormatter(new[] { typeof(string) });                        while (shouldRun)                        {                            Message message = null;                            try                            {                                if (!queue.IsEmpty())                                {                                    //Logs.Debug("接受队列里的消息");                                    message = queue.Receive(queueReadTimeOut);                                    ProcessMessage(message);                                }                            }                            catch (MessageQueueException e)                            {                                if (e.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout)                                {                                    Log.Warn("消息队列出现异常:", e);                                }                            }                            catch (Exception e)                            {                                // Write the message details to the Error queue                                Log.Warn("操作异常:", e);                            }                        }                    }                }, TaskCreationOptions.LongRunning);            }            catch (AggregateException ex)            {                throw;            }        }        /// <summary>        /// 处理消息        /// </summary>        /// <param name="msmqMessage"></param>        private void ProcessMessage(Message msmqMessage)        {            string messageBody = (string)msmqMessage.Body;#if DEBUG            Log.Info(string.Format("接受消息 : {0}", messageBody));#endif            RealTimeEventMessage messageToSendToSubscribers = _xmlParserService.ParseRawMsmqXml(messageBody);            if (messageToSendToSubscribers != null)            {                lock (syncObj)                {                    if (messageToSendToSubscribers.Msg != null)                    {                        // 保存到数据库                                            }                    List<Guid> deadSubscribers = new List<Guid>();                    if (eventNameToCallbackLookups.ContainsKey(messageToSendToSubscribers.EventName))                    {                        List<UniqueCallbackHandle> uniqueCallbackHandles =                            eventNameToCallbackLookups[messageToSendToSubscribers.EventName];                        foreach (UniqueCallbackHandle uniqueCallbackHandle in uniqueCallbackHandles)                        {                            try                            {                                uniqueCallbackHandle.Callback.ReceiveStreamingResult(messageToSendToSubscribers);                            }                            catch (CommunicationObjectAbortedException coaex)                            {                                deadSubscribers.Add(uniqueCallbackHandle.CallbackSessionId);                            }                        }                    }                    //end all subcriptions for dead subscribers                    foreach (Guid deadSubscriberId in deadSubscribers)                    {                        EndSubscription(deadSubscriberId);                    }                }            }        }        private void CreateSubscription(Guid subscriptionId, string[] eventNames)        {            //Ensure that a subscription is created for each message type the subscriber wants to receive            lock (syncObj)            {                foreach (string eventName in eventNames)                {                    if (!eventNameToCallbackLookups.ContainsKey(eventName))                    {                        List<UniqueCallbackHandle> currentCallbacks = new List<UniqueCallbackHandle>();                        eventNameToCallbackLookups[eventName] = currentCallbacks;                    }                    eventNameToCallbackLookups[eventName].Add(                        new UniqueCallbackHandle(subscriptionId, OperationContext.Current.GetCallbackChannel<IEventBrokerCallback>()));                }            }        }        #endregion    }}

事件代理实现里的回调处理实体

using System;using System.Collections.Generic;using System.Linq;using System.Text;using TestService.Contract;namespace TestService.ApplicationService.Data{    public class UniqueCallbackHandle    {        public UniqueCallbackHandle(Guid callbackSessionId, IEventBrokerCallback callback)        {            this.CallbackSessionId = callbackSessionId;            this.Callback = callback;        }        public Guid CallbackSessionId { get; private set; }        public IEventBrokerCallback Callback { get; private set; }    }}

 

其中事件代理服务构造方法的AppSettings.InputQueueName是读取配置文件里的专用队列名称,这里有个构造方法,后面会使用Autofac进行注入,另外还有个IXmlParserService普通业务操作的,主要是解析消息队列里存放的xml消息

using System;using System.Collections.Generic;using System.Linq;using System.Text;using TestService.Contract.Data;namespace TestService.ApplicationService.Services.Interface{    public interface IXmlParserService    {        RealTimeEventMessage ParseRawMsmqXml(string messageBody);    }}

实现

using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Xml.Linq;using TestService.ApplicationService.Services.Interface;using TestService.Contract.Data;namespace TestService.ApplicationService.Services.Impl{    public class XmlParserService : IXmlParserService    {        private MessageModel msg;        public XmlParserService(MessageModel msg)        {            this.msg = msg;        }        public RealTimeEventMessage ParseRawMsmqXml(string messageBody)        {            try            {                RealTimeEventMessage info = new RealTimeEventMessage();                XElement xelement = XElement.Parse(messageBody);                MessageModel model = new MessageModel();                model.MessageId = GetSafeString(xelement, "messageId");                model.Content = GetSafeString(xelement, "content");                model.CreateTime = GetSafeDate(xelement, "createTime") ?? DateTime.Now;                info.Msg = model;                info.EventName = GetSafeString(xelement, "eventName");                //info.EntityIdType = GetSafeString(xelement, "entityIdType");                //info.Description = GetSafeString(xelement, "description").Replace("\n\n", "\n\r");                //info.Date = GetSafeDate(xelement, "date");                //info.AdditionalData = http://www.mamicode.com/GetSafeString(xelement,"additionalData");                return info;            }            catch (Exception ex)            {                Log.Error("解析Xml消息出现异常:" + ex);                return null;            }        }        public static Int32 GetSafeInt32(XElement root, string elementName)        {            try            {                XElement element = root.Elements().Where(node => node.Name.LocalName == elementName).Single();                return Convert.ToInt32(element.Value);            }            catch            {                return 0;            }        }        private static DateTime? GetSafeDate(XElement root, string elementName)        {            try            {                XElement element = root.Elements().Where(node => node.Name.LocalName == elementName).Single();                return DateTime.Parse(element.Value);            }            catch            {                return null;            }        }        public static String GetSafeString(XElement root, string elementName)        {            try            {                XElement element = root.Elements().Where(node => node.Name.LocalName == elementName).Single();                return element.Value;            }            catch            {                return String.Empty;            }        }        public static bool GetSafeBool(XElement root, string elementName)        {            try            {                XElement element = root.Elements().Where(node => node.Name.LocalName == elementName).Single();                return Convert.ToBoolean(element.Value);            }            catch            {                return false;            }        }    }}

这里的xml节点主要是根据消息服务里发送消息的xml节点来定的,事件代理服务的就是上面的这么多,下面看看消息服务,

using System;using System.Collections.Generic;using System.Linq;using System.ServiceModel;using System.Text;using TestService.Contract.Data;namespace TestService.Contract{    [ServiceContract]    public interface IMessageService    {        [OperationContract]        bool Send(MessageModel model);    }}

实现

using System;using System.Collections.Generic;using System.Linq;using System.Messaging;using System.Text;using TestService.Common;using TestService.Common.IoC;using TestService.Contract;using TestService.Contract.Data;namespace TestService.ApplicationService{    /// <summary>    /// 消息服务    /// </summary>    [IocServiceBehavior]    public class MessageService : ApplicationBaseService, IMessageService    {        private static string inputQueueName = "";        public MessageService()        {            inputQueueName = AppSettings.InputQueueName;        }        public bool Send(MessageModel model)        {            using (MessageQueue queue = new MessageQueue(inputQueueName, QueueAccessMode.Send))            {                queue.Formatter = new XmlMessageFormatter(new[] { typeof(string) });                try                {                    Message message = new Message(                        GetXmlData(model));                    Log.Info(string.Format("发送消息: {0}", message.Body.ToString()));                    queue.Send(message);                    return true;                }                catch (MessageQueueException mex)                {                    if (mex.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout)                    {                        Log.Error(string.Format("Message queue exception occured", mex));                    }                    return false;                }                catch (Exception ex)                {                    // Write the message details to the Error queue                    Log.Error(ex);                    return false;                }            }        }        private string GetXmlData(MessageModel model)        {            StringBuilder sb = new StringBuilder("<realtimeEvent>");            sb.AppendFormat("<eventName>ClientDealEvent</eventName>");            sb.AppendFormat("<messageId>{0}</messageId>", model.MessageId);            sb.AppendFormat("<createTime>{0}</createTime>", model.CreateTime);            sb.AppendFormat("<content>{0}</content>", model.Content);            sb.AppendLine("</realtimeEvent>");            return sb.ToString();        }    }}

消息服务比较简单,就是往消息队列里发送消息,细心的人会发现,我在每个服务实现里都加了个IocServiceBehavior特性,这个主要是标识了注入了该服务

核心代码就是上面介绍的那么多,有些操作没将代码贴出来,后面会将代码开源到码云上,今天就先写到这儿了

WCF基于MSMQ的事件代理服务