首页 > 代码库 > 【DDD-Apwork框架】事件总线和事件聚合器

【DDD-Apwork框架】事件总线和事件聚合器

第一步:事件总线和事件聚合器

   【1】事件总线 IEventBus

   IUnitOfWork.cs

using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading.Tasks;namespace Keasy5.Infrastructure{    /// <summary>    /// 表示所有集成于该接口的类型都是Unit Of Work的一种实现。    /// </summary>    /// <remarks>有关Unit Of Work的详细信息,请参见UnitOfWork模式:http://martinfowler.com/eaaCatalog/unitOfWork.html/// </remarks>    public interface IUnitOfWork    {        /// <summary>        /// 获得一个<see cref="System.Boolean"/>值,        /// 该值表示当前的Unit Of Work是否支持Microsoft分布式事务处理机制。        /// </summary>        bool DistributedTransactionSupported { get; }        /// <summary>        /// 获得一个<see cref="System.Boolean"/>值,        /// 该值表述了当前的Unit Of Work事务是否已被提交。        /// </summary>        bool Committed { get; }        /// <summary>        /// 提交当前的Unit Of Work事务。        /// </summary>        void Commit();        /// <summary>        /// 回滚当前的Unit Of Work事务。        /// </summary>        void Rollback();    }}
View Code

   IBus.cs

using System;using System.Collections.Generic;using System.Linq;using System.Text;using Keasy5.Infrastructure;namespace Keasy5.Events.Bus{    /// <summary>    /// Represents the message bus.    /// </summary>    public interface IBus : IUnitOfWork, IDisposable    {        Guid ID { get; }        /// <summary>        /// Publishes the specified message to the bus.        /// </summary>        /// <param name="message">The message to be published.</param>        void Publish<TMessage>(TMessage message)            where TMessage : class, IEvent;        /// <summary>        /// Publishes a collection of messages to the bus.        /// </summary>        /// <param name="messages">The messages to be published.</param>        void Publish<TMessage>(IEnumerable<TMessage> messages)            where TMessage : class, IEvent;        /// <summary>        /// Clears the published messages waiting for commit.        /// </summary>        void Clear();    }}
View Code

  接口: IEventBus.cs

using System;using System.Collections.Generic;using System.Linq;using System.Text;namespace Keasy5.Events.Bus{    public interface IEventBus : IBus    {    }}
View Code

 

实现类:EventBus

EventBus.cs

using System;using System.Collections.Generic;using System.Linq;using System.Reflection;using System.Text;using System.Threading;using System.Threading.Tasks;using Keasy5.Infrastructure;namespace Keasy5.Events.Bus{    public class EventBus : DisposableObject, IEventBus    {        private readonly Guid id = Guid.NewGuid();        private readonly ThreadLocal<Queue<object>> messageQueue = new ThreadLocal<Queue<object>>(() => new Queue<object>());        private readonly IEventAggregator aggregator;        private ThreadLocal<bool> committed = new ThreadLocal<bool>(() => true);        private readonly MethodInfo publishMethod;        /// <summary>        ///         /// </summary>        /// <param name="aggregator"></param>        /// <remarks>        ///    参数IEventAggregator aggregator 的一个实现:EventAggregator<see cref="EventAggregator"/>        /// </remarks>        public EventBus(IEventAggregator aggregator)        {            this.aggregator = aggregator;            //得到aggregator对象中的名为Publish的函数信息            // 1.相关资料:C# 反射泛型            //    http://www.cnblogs.com/easy5weikai/p/3790589.html            publishMethod = (from m in aggregator.GetType().GetMethods()                             let parameters = m.GetParameters()                             let methodName = m.Name                             where methodName == "Publish" &&                             parameters != null &&                             parameters.Length == 1                             select m).First();        }        protected override void Dispose(bool disposing)        {            if (disposing)            {                messageQueue.Dispose();                committed.Dispose();            }        }        #region IBus Members        public void Publish<TMessage>(TMessage message)            where TMessage : class, IEvent        {            messageQueue.Value.Enqueue(message);            committed.Value = false;        }        public void Publish<TMessage>(IEnumerable<TMessage> messages)            where TMessage : class, IEvent        {            foreach (var message in messages)                Publish(message);        }        public void Clear()        {            messageQueue.Value.Clear();            committed.Value = true;        }        #endregion        #region IUnitOfWork Members        public bool DistributedTransactionSupported        {            get { return false; }        }        public bool Committed        {            get { return committed.Value; }        }        public void Commit()        {            while (messageQueue.Value.Count > 0)            {                var evnt = messageQueue.Value.Dequeue();                var evntType = evnt.GetType();                //设置aggregator的publish方法的参数类型,                //子所以这样做,是因为存在重载,传人参数类型以确定哪个具体的方法。                var method = publishMethod.MakeGenericMethod(evntType);                //调用aggregator对象的publish                method.Invoke(aggregator, new object[] { evnt });            }            committed.Value = true;        }        public void Rollback()        {            Clear();        }        public Guid ID        {            get { return id; }        }        #endregion    }}
View Code

 

   【2】事件聚合器IEventAggregator

接口:IEventAggregator.cs

using System;using System.Collections.Generic;using System.Linq;using System.Text;namespace Keasy5.Events{    public interface IEventAggregator    {        void Subscribe<TEvent>(IEventHandler<TEvent> domainEventHandler)            where TEvent : class, IEvent;        void Subscribe<TEvent>(IEnumerable<IEventHandler<TEvent>> domainEventHandlers)            where TEvent : class, IEvent;        void Subscribe<TEvent>(params IEventHandler<TEvent>[] domainEventHandlers)            where TEvent : class, IEvent;        void Subscribe<TEvent>(Action<TEvent> domainEventHandlerFunc)            where TEvent : class, IEvent;        void Subscribe<TEvent>(IEnumerable<Func<TEvent, bool>> domainEventHandlerFuncs)            where TEvent : class, IEvent;        void Subscribe<TEvent>(params Func<TEvent, bool>[] domainEventHandlerFuncs)            where TEvent : class, IEvent;        void Unsubscribe<TEvent>(IEventHandler<TEvent> domainEventHandler)            where TEvent : class, IEvent;        void Unsubscribe<TEvent>(IEnumerable<IEventHandler<TEvent>> domainEventHandlers)            where TEvent : class, IEvent;        void Unsubscribe<TEvent>(params IEventHandler<TEvent>[] domainEventHandlers)            where TEvent : class, IEvent;        void Unsubscribe<TEvent>(Action<TEvent> domainEventHandlerFunc)            where TEvent : class, IEvent;        void Unsubscribe<TEvent>(IEnumerable<Func<TEvent, bool>> domainEventHandlerFuncs)            where TEvent : class, IEvent;        void Unsubscribe<TEvent>(params Func<TEvent, bool>[] domainEventHandlerFuncs)            where TEvent : class, IEvent;        void UnsubscribeAll<TEvent>()            where TEvent : class, IEvent;        void UnsubscribeAll();        IEnumerable<IEventHandler<TEvent>> GetSubscriptions<TEvent>()            where TEvent : class, IEvent;        void Publish<TEvent>(TEvent domainEvent)            where TEvent : class, IEvent;        void Publish<TEvent>(TEvent domainEvent, Action<TEvent, bool, Exception> callback, TimeSpan? timeout = null)            where TEvent : class, IEvent;    }    }
View Code

 

实现类:EventAggregator

EventAggregator.cs

using System;using System.Collections.Generic;using System.Linq;using System.Reflection;using System.Text;using System.Threading.Tasks;namespace Keasy5.Events{    public class EventAggregator : IEventAggregator    {        #region private property        private readonly object sync = new object();        private readonly Dictionary<Type, List<object>> eventHandlers = new Dictionary<Type, List<object>>();        private readonly MethodInfo registerEventHandlerMethod;        private readonly Func<object, object, bool> eventHandlerEquals = (o1, o2) =>        {            var o1Type = o1.GetType();            var o2Type = o2.GetType();            if (o1Type.IsGenericType &&                o1Type.GetGenericTypeDefinition() == typeof(ActionDelegatedEventHandler<>) &&                o2Type.IsGenericType &&                o2Type.GetGenericTypeDefinition() == typeof(ActionDelegatedEventHandler<>))                return o1.Equals(o2);            return o1Type == o2Type;        }; // checks if the two event handlers are equal. if the event handler is an action-delegated, just simply        // compare the two with the object.Equals override (since it was overriden by comparing the two delegates. Otherwise,        // the type of the event handler will be used because we don‘t need to register the same type of the event handler        // more than once for each specific event.         #endregion        #region Ctor        public EventAggregator()        {            registerEventHandlerMethod = (from p in this.GetType().GetMethods()                                          let methodName = p.Name                                          let parameters = p.GetParameters()                                          where methodName == "Subscribe" &&                                          parameters != null &&                                          parameters.Length == 1 &&                                          parameters[0].ParameterType.GetGenericTypeDefinition() == typeof(IEventHandler<>)                                          select p).First();        }        /// <summary>        ///         /// </summary>        /// <param name="handlers"></param>        /// <remarks>        /// 1.相关资料:C# 反射泛型        ///    http://www.cnblogs.com/easy5weikai/p/3790589.html        /// 2.    依赖注入:        ///       <!--Event Aggregator-->        ///          <register type="Keasy5.Events.IEventAggregator, Keasy5.Events" mapTo="Keasy5.Events.EventAggregator, Keasy5.Events">        ///            <constructor>        ///              <param name="handlers">        ///                <array>        ///                  <dependency name="orderDispatchedSendEmailHandler" type="Keasy5.Events.IEventHandler`1[[Keasy5.Domain.Events.OrderDispatchedEvent, Keasy5.Domain]], Keasy5.Events" />        ///                  <dependency name="orderConfirmedSendEmailHandler" type="Keasy5.Events.IEventHandler`1[[Keasy5.Domain.Events.OrderConfirmedEvent, Keasy5.Domain]], Keasy5.Events" />        ///                </array>        ///              </param>        ///            </constructor>        ///          </register>        /// </remarks>        public EventAggregator(object[] handlers)            : this()        {            foreach (var obj in handlers)            {                var type = obj.GetType();                var implementedInterfaces = type.GetInterfaces();                foreach (var implementedInterface in implementedInterfaces)                {                    if (implementedInterface.IsGenericType &&                        implementedInterface.GetGenericTypeDefinition() == typeof(IEventHandler<>))                    {                        var eventType = implementedInterface.GetGenericArguments().First();                        var method = registerEventHandlerMethod.MakeGenericMethod(eventType);                        method.Invoke(this, new object[] { obj });                    }                }            }        }         #endregion        #region interface IEventAggregator members        public void Subscribe<TEvent>(IEventHandler<TEvent> eventHandler)             where TEvent : class, IEvent        {            lock (sync)            {                var eventType = typeof(TEvent);                if (eventHandlers.ContainsKey(eventType))                {                    var handlers = eventHandlers[eventType];                    if (handlers != null)                    {                        if (!handlers.Exists(deh => eventHandlerEquals(deh, eventHandler)))                            handlers.Add(eventHandler);                    }                    else                    {                        handlers = new List<object>();                        handlers.Add(eventHandler);                    }                }                else                    eventHandlers.Add(eventType, new List<object> { eventHandler });            }        }        public void Subscribe<TEvent>(IEnumerable<IEventHandler<TEvent>> eventHandlers)            where TEvent : class, IEvent        {            foreach (var eventHandler in eventHandlers)                Subscribe<TEvent>(eventHandler);        }        public void Subscribe<TEvent>(params IEventHandler<TEvent>[] eventHandlers)            where TEvent : class, IEvent        {            foreach (var eventHandler in eventHandlers)                Subscribe<TEvent>(eventHandler);        }        public void Subscribe<TEvent>(Action<TEvent> eventHandlerFunc)            where TEvent : class, IEvent        {            Subscribe<TEvent>(new ActionDelegatedEventHandler<TEvent>(eventHandlerFunc));        }        public void Subscribe<TEvent>(IEnumerable<Func<TEvent, bool>> eventHandlerFuncs)            where TEvent : class, IEvent        {            foreach (var eventHandlerFunc in eventHandlerFuncs)                Subscribe<TEvent>(eventHandlerFunc);        }        public void Subscribe<TEvent>(params Func<TEvent, bool>[] eventHandlerFuncs)            where TEvent : class, IEvent        {            foreach (var eventHandlerFunc in eventHandlerFuncs)                Subscribe<TEvent>(eventHandlerFunc);        }        public void Unsubscribe<TEvent>(IEventHandler<TEvent> eventHandler)            where TEvent : class, IEvent        {            lock (sync)            {                var eventType = typeof(TEvent);                if (eventHandlers.ContainsKey(eventType))                {                    var handlers = eventHandlers[eventType];                    if (handlers != null &&                        handlers.Exists(deh => eventHandlerEquals(deh, eventHandler)))                    {                        var handlerToRemove = handlers.First(deh => eventHandlerEquals(deh, eventHandler));                        handlers.Remove(handlerToRemove);                    }                }            }        }        public void Unsubscribe<TEvent>(IEnumerable<IEventHandler<TEvent>> eventHandlers)            where TEvent : class, IEvent        {            foreach (var eventHandler in eventHandlers)                Unsubscribe<TEvent>(eventHandler);        }        public void Unsubscribe<TEvent>(params IEventHandler<TEvent>[] eventHandlers)            where TEvent : class, IEvent        {            foreach (var eventHandler in eventHandlers)                Unsubscribe<TEvent>(eventHandler);        }        public void Unsubscribe<TEvent>(Action<TEvent> eventHandlerFunc)            where TEvent : class, IEvent        {            Unsubscribe<TEvent>(new ActionDelegatedEventHandler<TEvent>(eventHandlerFunc));        }        public void Unsubscribe<TEvent>(IEnumerable<Func<TEvent, bool>> eventHandlerFuncs)            where TEvent : class, IEvent        {            foreach (var eventHandlerFunc in eventHandlerFuncs)                Unsubscribe<TEvent>(eventHandlerFunc);        }        public void Unsubscribe<TEvent>(params Func<TEvent, bool>[] eventHandlerFuncs)            where TEvent : class, IEvent        {            foreach (var eventHandlerFunc in eventHandlerFuncs)                Unsubscribe<TEvent>(eventHandlerFunc);        }        public void UnsubscribeAll<TEvent>()            where TEvent : class, IEvent        {            lock (sync)            {                var eventType = typeof(TEvent);                if (eventHandlers.ContainsKey(eventType))                {                    var handlers = eventHandlers[eventType];                    if (handlers != null)                        handlers.Clear();                }            }        }        public void UnsubscribeAll()        {            lock (sync)            {                eventHandlers.Clear();            }        }        public IEnumerable<IEventHandler<TEvent>> GetSubscriptions<TEvent>()            where TEvent : class, IEvent        {            var eventType = typeof(TEvent);            if (eventHandlers.ContainsKey(eventType))            {                var handlers = eventHandlers[eventType];                if (handlers != null)                    return handlers.Select(p => p as IEventHandler<TEvent>).ToList();                else                    return null;            }            else                return null;        }        public void Publish<TEvent>(TEvent evnt)            where TEvent : class, IEvent        {            if (evnt == null)                throw new ArgumentNullException("evnt");            var eventType = evnt.GetType();            if (eventHandlers.ContainsKey(eventType) &&                eventHandlers[eventType] != null &&                eventHandlers[eventType].Count > 0)            {                var handlers = eventHandlers[eventType];                foreach (var handler in handlers)                {                    var eventHandler = handler as IEventHandler<TEvent>;                    if (eventHandler == null)                        throw new ArgumentNullException("eventHandler");                    if (eventHandler.GetType().IsDefined(typeof(HandlesAsynchronouslyAttribute), false))                    {                        Task.Factory.StartNew((o) => eventHandler.Handle((TEvent)o), evnt);                    }                    else                    {                        eventHandler.Handle(evnt);                    }                }            }        }        public void Publish<TEvent>(TEvent evnt,            Action<TEvent, bool, Exception> callback,            TimeSpan? timeout = null)            where TEvent : class, IEvent        {            if (evnt == null)                throw new ArgumentNullException("evnt");            var eventType = evnt.GetType();            if (eventHandlers.ContainsKey(eventType) &&                eventHandlers[eventType] != null &&                eventHandlers[eventType].Count > 0)            {                var handlers = eventHandlers[eventType];                List<Task> tasks = new List<Task>();                try                {                    foreach (var handler in handlers)                    {                        var eventHandler = handler as IEventHandler<TEvent>;                        if (eventHandler.GetType().IsDefined(typeof(HandlesAsynchronouslyAttribute), false))                        {                            tasks.Add(Task.Factory.StartNew((o) => eventHandler.Handle((TEvent)o), evnt));                        }                        else                        {                            eventHandler.Handle(evnt);                        }                    }                    if (tasks.Count > 0)                    {                        if (timeout == null)                            Task.WaitAll(tasks.ToArray());                        else                            Task.WaitAll(tasks.ToArray(), timeout.Value);                    }                    callback(evnt, true, null);                }                catch (Exception ex)                {                    callback(evnt, false, ex);                }            }            else                callback(evnt, false, null);        }         #endregion    }}
View Code

 

第二步:使用:

private readonly IEventBus eventBus;。。。。eventBus.Publish<OrderDispatchedEvent>(evnt);