首页 > 代码库 > 接上文,可设置并发数的版本

接上文,可设置并发数的版本

做了些微优化,并增加并发数控制:

    public abstract class MessageQueueConcurrentHandlerBase<T> : IMessageQueueHandler    {        public MessageQueueConcurrentHandlerBase(string queueName, int maxConcurrency, Action<string> logDelegate)        {            if (!MessageQueue.Exists(queueName))                throw new Exception(string.Format("No such a queue: {0}", queueName));            if (maxConcurrency < 1)                throw new ArgumentOutOfRangeException("maxConcurrency");            this._queueName = queueName;            this._poolForConsumer = new Semaphore(0, maxConcurrency);            this._producerAutoResetEvent = new AutoResetEvent(false);            this._maxConcurrency = maxConcurrency;            this._logDelegate = logDelegate;        }        public void StartRead()        {            this._queue = new MessageQueue(this._queueName) { Formatter = new XmlMessageFormatter(new Type[] { typeof(long) }) };            this._queue.PeekCompleted += new PeekCompletedEventHandler(Produce);            this._producerAutoResetEvent.Set();            this._poolForConsumer.Release(this._maxConcurrency);            this._queue.BeginPeek();        }        public override string ToString()        {            return string.Format("{0}_{1}", this._queueName, this.ProcessName);        }        public int WorkerCount { get { return Thread.VolatileRead(ref this._workerCount); } }        public int MaxConcurrency { get { return _maxConcurrency; } }        protected abstract string ProcessName { get; }        protected abstract void MainProcess(T backThreadId);        protected Action<string> LogDelegate { get { return _logDelegate; } }        #region private        private void Produce(object sender, PeekCompletedEventArgs e)        {            this._producerAutoResetEvent.WaitOne();            var message = this._queue.EndPeek(e.AsyncResult);            ThreadPool.QueueUserWorkItem(new WaitCallback(this.Consume));            this._queue.BeginPeek();        }        private void Consume(object stateInfo)        {            this._poolForConsumer.WaitOne();            var message = this._queue.Receive();            this._producerAutoResetEvent.Set();            T messageItem = (T)message.Body;            this.LogDelegate(string.Format("{0} - Received a message, MessageItem = {1}", this.ProcessName, messageItem));            Interlocked.Increment(ref this._workerCount);            try            {                this.LogDelegate(string.Format("{0} - Running - {1}, WorkerCount = {2}", this.ProcessName, messageItem, this._workerCount));                MainProcess(messageItem);            }            catch (Exception ex)            {                this.HandleException(ex, messageItem);            }            finally            {                Interlocked.Decrement(ref this._workerCount);                this.LogDelegate(string.Format("{0} - Over - {1}, WorkerCount = {2}", this.ProcessName, messageItem, this._workerCount));            }            this._poolForConsumer.Release();        }        private void HandleException(Exception ex, T messageItem)        {            this.LogDelegate(string.Format("Exception in {0}:[Message]={1},[StackTrace]={2},[Type]={3},[_workerCount]={4},[backThreadId]={5}", this.ProcessName, ex.Message, ex.StackTrace, ex.GetType(), this.WorkerCount, messageItem));        }        private readonly string _queueName;        private MessageQueue _queue;        private int _workerCount;        private Semaphore _poolForConsumer;        private AutoResetEvent _producerAutoResetEvent;        private int _maxConcurrency;        private Action<string> _logDelegate;        #endregion    }

觉得好就表扬一下啊:)

接上文,可设置并发数的版本