首页 > 代码库 > 接上文,可设置并发数的版本
接上文,可设置并发数的版本
做了些微优化,并增加并发数控制:
public abstract class MessageQueueConcurrentHandlerBase<T> : IMessageQueueHandler { public MessageQueueConcurrentHandlerBase(string queueName, int maxConcurrency, Action<string> logDelegate) { if (!MessageQueue.Exists(queueName)) throw new Exception(string.Format("No such a queue: {0}", queueName)); if (maxConcurrency < 1) throw new ArgumentOutOfRangeException("maxConcurrency"); this._queueName = queueName; this._poolForConsumer = new Semaphore(0, maxConcurrency); this._producerAutoResetEvent = new AutoResetEvent(false); this._maxConcurrency = maxConcurrency; this._logDelegate = logDelegate; } public void StartRead() { this._queue = new MessageQueue(this._queueName) { Formatter = new XmlMessageFormatter(new Type[] { typeof(long) }) }; this._queue.PeekCompleted += new PeekCompletedEventHandler(Produce); this._producerAutoResetEvent.Set(); this._poolForConsumer.Release(this._maxConcurrency); this._queue.BeginPeek(); } public override string ToString() { return string.Format("{0}_{1}", this._queueName, this.ProcessName); } public int WorkerCount { get { return Thread.VolatileRead(ref this._workerCount); } } public int MaxConcurrency { get { return _maxConcurrency; } } protected abstract string ProcessName { get; } protected abstract void MainProcess(T backThreadId); protected Action<string> LogDelegate { get { return _logDelegate; } } #region private private void Produce(object sender, PeekCompletedEventArgs e) { this._producerAutoResetEvent.WaitOne(); var message = this._queue.EndPeek(e.AsyncResult); ThreadPool.QueueUserWorkItem(new WaitCallback(this.Consume)); this._queue.BeginPeek(); } private void Consume(object stateInfo) { this._poolForConsumer.WaitOne(); var message = this._queue.Receive(); this._producerAutoResetEvent.Set(); T messageItem = (T)message.Body; this.LogDelegate(string.Format("{0} - Received a message, MessageItem = {1}", this.ProcessName, messageItem)); Interlocked.Increment(ref this._workerCount); try { this.LogDelegate(string.Format("{0} - Running - {1}, WorkerCount = {2}", this.ProcessName, messageItem, this._workerCount)); MainProcess(messageItem); } catch (Exception ex) { this.HandleException(ex, messageItem); } finally { Interlocked.Decrement(ref this._workerCount); this.LogDelegate(string.Format("{0} - Over - {1}, WorkerCount = {2}", this.ProcessName, messageItem, this._workerCount)); } this._poolForConsumer.Release(); } private void HandleException(Exception ex, T messageItem) { this.LogDelegate(string.Format("Exception in {0}:[Message]={1},[StackTrace]={2},[Type]={3},[_workerCount]={4},[backThreadId]={5}", this.ProcessName, ex.Message, ex.StackTrace, ex.GetType(), this.WorkerCount, messageItem)); } private readonly string _queueName; private MessageQueue _queue; private int _workerCount; private Semaphore _poolForConsumer; private AutoResetEvent _producerAutoResetEvent; private int _maxConcurrency; private Action<string> _logDelegate; #endregion }
觉得好就表扬一下啊:)
接上文,可设置并发数的版本
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。