首页 > 代码库 > ConcurrentAsyncQueue 2014-09-07
ConcurrentAsyncQueue 2014-09-07
namespace Microshaoft{ using System; using System.Collections.Concurrent; using System.Diagnostics; using System.Threading; internal static class QueuedObjectsPoolManager { public static readonly QueuedObjectsPool<Stopwatch> StopwatchsPool = new QueuedObjectsPool<Stopwatch>(0); } public class ConcurrentAsyncQueue<T> { public delegate void QueueEventHandler(T item); public event QueueEventHandler OnDequeue; public delegate void QueueLogEventHandler(string logMessage); public QueueLogEventHandler OnQueueLog , OnDequeueThreadStart , OnDequeueThreadEnd; public delegate bool ExceptionEventHandler(ConcurrentAsyncQueue<T> sender, Exception exception); public event ExceptionEventHandler OnCaughtException; private ConcurrentQueue<Tuple<Stopwatch, T>> _queue = new ConcurrentQueue<Tuple<Stopwatch, T>>(); public ConcurrentQueue<Tuple<Stopwatch, T>> InternalQueue { get { return _queue; } //set { _queue = value; } } private ConcurrentQueue<Action> _callbackProcessBreaksActions; private long _concurrentDequeueThreadsCount = 0; //Microshaoft 用于控制并发线程数 private ConcurrentQueue<ThreadProcessor> _dequeueThreadsProcessorsPool; private int _dequeueIdleSleepSeconds = 10; public QueuePerformanceCountersContainer PerformanceCounters { get; private set; } public int DequeueIdleSleepSeconds { set { _dequeueIdleSleepSeconds = value; } get { return _dequeueIdleSleepSeconds; } } private bool _isAttachedPerformanceCounters = false; private class ThreadProcessor { public bool Break { set; get; } public EventWaitHandle Wait { private set; get; } public ConcurrentAsyncQueue<T> Sender { private set; get; } public void StopOne() { Break = true; } public ThreadProcessor ( ConcurrentAsyncQueue<T> queue , EventWaitHandle wait ) { Wait = wait; Sender = queue; } public void ThreadProcess() { long l = 0; Interlocked.Increment(ref Sender._concurrentDequeueThreadsCount); bool counterEnabled = Sender._isAttachedPerformanceCounters; QueuePerformanceCountersContainer qpcc = Sender.PerformanceCounters; var queue = Sender.InternalQueue; var reThrowException = false; PerformanceCounter[] incrementCountersBeforeCountPerformanceForThread = null; PerformanceCounter[] decrementCountersAfterCountPerformanceForThread = null; PerformanceCounter[] incrementCountersAfterCountPerformanceForThread = null; if (counterEnabled && qpcc != null) { incrementCountersBeforeCountPerformanceForThread = new PerformanceCounter[] { qpcc .DequeueThreadStartPerformanceCounter , qpcc .DequeueThreadsCountPerformanceCounter }; decrementCountersAfterCountPerformanceForThread = new PerformanceCounter[] { qpcc.DequeueThreadsCountPerformanceCounter }; incrementCountersAfterCountPerformanceForThread = new PerformanceCounter[] { qpcc.DequeueThreadEndPerformanceCounter }; } PerformanceCountersHelper .TryCountPerformance ( counterEnabled , reThrowException , incrementCountersBeforeCountPerformanceForThread , null , null , () => { #region Try Process if (Sender.OnDequeueThreadStart != null) { l = Interlocked.Read(ref Sender._concurrentDequeueThreadsCount); Sender .OnDequeueThreadStart ( string .Format ( "{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}" , "Threads ++ !" , l , queue.Count , Thread.CurrentThread.Name , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff") ) ); } while (true) { #region while true loop if (Break) { break; } while (!queue.IsEmpty) { #region while queue.IsEmpty loop if (Break) { break; } Tuple<Stopwatch, T> item = null; if (queue.TryDequeue(out item)) { Stopwatch stopwatchDequeue = QueuedObjectsPoolManager.StopwatchsPool.Get(); PerformanceCounter[] incrementCountersBeforeCountPerformanceForDequeue = null; PerformanceCounter[] decrementCountersBeforeCountPerformanceForDequeue = null; PerformanceCounter[] incrementCountersAfterCountPerformanceForDequeue = null; Tuple < bool , Stopwatch , PerformanceCounter , PerformanceCounter >[] timerCounters = null; if (counterEnabled && qpcc != null) { incrementCountersBeforeCountPerformanceForDequeue = new PerformanceCounter[] { qpcc .DequeuePerformanceCounter }; decrementCountersBeforeCountPerformanceForDequeue = new PerformanceCounter[] { qpcc .QueueLengthPerformanceCounter }; timerCounters = new Tuple < bool , Stopwatch , PerformanceCounter , PerformanceCounter >[] { Tuple.Create < bool //before 时是否需要启动 , Stopwatch , PerformanceCounter , PerformanceCounter //base > ( false , item.Item1 , qpcc .QueuedWaitAverageTimerPerformanceCounter , qpcc .QueuedWaitAverageBasePerformanceCounter ) , Tuple.Create < bool , Stopwatch , PerformanceCounter , PerformanceCounter > ( true , stopwatchDequeue , qpcc .DequeueProcessedAverageTimerPerformanceCounter , qpcc .DequeueProcessedAverageBasePerformanceCounter ) }; incrementCountersAfterCountPerformanceForDequeue = new PerformanceCounter[] { qpcc .DequeueProcessedPerformanceCounter , qpcc .DequeueProcessedRateOfCountsPerSecondPerformanceCounter }; } PerformanceCountersHelper .TryCountPerformance ( counterEnabled , reThrowException , incrementCountersBeforeCountPerformanceForDequeue , decrementCountersBeforeCountPerformanceForDequeue , timerCounters , () => //try { if (Sender.OnDequeue != null) { var element = item.Item2; item = null; Sender.OnDequeue(element); } } , (x) => //catch { reThrowException = false; return reThrowException; } , null //finally , null , incrementCountersAfterCountPerformanceForDequeue ); //池化 stopwatchDequeue.Reset(); QueuedObjectsPoolManager.StopwatchsPool.Put(stopwatchDequeue); } #endregion while queue.IsEmpty loop } #region wait Sender ._dequeueThreadsProcessorsPool .Enqueue(this); if (Break) { } if (!Wait.WaitOne(Sender.DequeueIdleSleepSeconds * 1000)) { } #endregion wait #endregion while true loop } #endregion } , (x) => //catch { #region Catch Process if (Sender.OnCaughtException != null) { reThrowException = Sender.OnCaughtException(Sender, x); } return reThrowException; #endregion } , (x, y) => //finally { #region Finally Process l = Interlocked.Decrement(ref Sender._concurrentDequeueThreadsCount); if (l < 0) { Interlocked.Exchange(ref Sender._concurrentDequeueThreadsCount, 0); } if (Sender.OnDequeueThreadEnd != null) { Sender .OnDequeueThreadEnd ( string.Format ( "{0} Threads Count {1},Queue Count {2},Current Thread: {3} at {4}" , "Threads--" , l , Sender.InternalQueue.Count , Thread.CurrentThread.Name , DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fffff") ) ); } if (!Break) { Sender.StartIncreaseDequeueProcessThreads(1); } Break = false; #endregion } , decrementCountersAfterCountPerformanceForThread , incrementCountersAfterCountPerformanceForThread ); } } public void AttachPerformanceCounters ( string instanceNamePrefix , string categoryName , QueuePerformanceCountersContainer performanceCounters ) { var process = Process.GetCurrentProcess(); var processName = process.ProcessName; var instanceName = string.Format ( "{0}-{1}" , instanceNamePrefix , processName ); PerformanceCounters = performanceCounters; PerformanceCounters .AttachPerformanceCountersToProperties(instanceName, categoryName); _isAttachedPerformanceCounters = true; } public int Count { get { return _queue.Count; } } public long ConcurrentThreadsCount { get { return _concurrentDequeueThreadsCount; } } private void DecreaseDequeueProcessThreads(int count) { Action action; for (var i = 0; i < count; i++) { if (_callbackProcessBreaksActions.TryDequeue(out action)) { action(); action = null; } } } public void StartDecreaseDequeueProcessThreads(int count) { new Thread ( new ThreadStart ( () => { DecreaseDequeueProcessThreads(count); } ) ).Start(); } public void StartIncreaseDequeueProcessThreads(int count) { new Thread ( new ThreadStart ( () => { IncreaseDequeueProcessThreads(count); } ) ).Start(); } private void IncreaseDequeueProcessThreads(int count) { for (int i = 0; i < count; i++) { Interlocked.Increment(ref _concurrentDequeueThreadsCount); if (_dequeueThreadsProcessorsPool == null) { _dequeueThreadsProcessorsPool = new ConcurrentQueue<ThreadProcessor>(); } var processor = new ThreadProcessor ( this , new AutoResetEvent(false) ); var thread = new Thread ( new ThreadStart ( processor.ThreadProcess ) ); if (_callbackProcessBreaksActions == null) { _callbackProcessBreaksActions = new ConcurrentQueue<Action>(); } var callbackProcessBreakAction = new Action ( processor.StopOne ); _callbackProcessBreaksActions.Enqueue(callbackProcessBreakAction); _dequeueThreadsProcessorsPool.Enqueue(processor); thread.Start(); } } public bool Enqueue(T item) { var r = false; var reThrowException = false; var enableCount = _isAttachedPerformanceCounters; PerformanceCounter[] incrementCountersBeforeCountPerformance = null; var qpcc = PerformanceCounters; if (enableCount && qpcc != null) { incrementCountersBeforeCountPerformance = new PerformanceCounter[] { qpcc .EnqueuePerformanceCounter , qpcc .EnqueueRateOfCountsPerSecondPerformanceCounter , qpcc .QueueLengthPerformanceCounter }; } PerformanceCountersHelper .TryCountPerformance ( enableCount , reThrowException , incrementCountersBeforeCountPerformance , null , null , () => { Stopwatch stopwatch = null; if (_isAttachedPerformanceCounters) { stopwatch = QueuedObjectsPoolManager.StopwatchsPool.Get(); stopwatch = Stopwatch.StartNew(); } var element = Tuple.Create<Stopwatch, T>(stopwatch, item); _queue.Enqueue(element); r = true; } , (x) => { if (OnCaughtException != null) { reThrowException = OnCaughtException(this, x); } return reThrowException; } , (x, y) => { if ( _dequeueThreadsProcessorsPool != null && !_dequeueThreadsProcessorsPool.IsEmpty ) { ThreadProcessor processor; if (_dequeueThreadsProcessorsPool.TryDequeue(out processor)) { processor.Wait.Set(); processor = null; //Console.WriteLine("processor = null;"); } } } ); return r; } }}namespace Microshaoft{ using System; using System.Diagnostics; public class QueuePerformanceCountersContainer //: IPerformanceCountersContainer { #region PerformanceCounters private PerformanceCounter _caughtExceptionsPerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.NumberOfItems64 , CounterName = "99.捕获异常次数(次)" ) ] public PerformanceCounter CaughtExceptionsPerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> ( ref _caughtExceptionsPerformanceCounter , value , 2 ); } get { return _caughtExceptionsPerformanceCounter; } } private PerformanceCounter _enqueuePerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.NumberOfItems64 , CounterName = "01.入队列累计总数(笔)" ) ] public PerformanceCounter EnqueuePerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> (ref _enqueuePerformanceCounter, value, 2); } get { return _enqueuePerformanceCounter; } } private PerformanceCounter _enqueueRateOfCountsPerSecondPerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.RateOfCountsPerSecond64 , CounterName = "02.每秒入队列笔数(笔/秒)" ) ] public PerformanceCounter EnqueueRateOfCountsPerSecondPerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> (ref _enqueueRateOfCountsPerSecondPerformanceCounter, value, 2); } get { return _enqueueRateOfCountsPerSecondPerformanceCounter; } } private PerformanceCounter _queueLengthPerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.NumberOfItems64 , CounterName = "03.队列当前长度(笔)" ) ] public PerformanceCounter QueueLengthPerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> (ref _queueLengthPerformanceCounter, value, 2); } get { return _queueLengthPerformanceCounter; } } private PerformanceCounter _dequeuePerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.NumberOfItems64 , CounterName = "04.出队列累计总数(笔)" ) ] public PerformanceCounter DequeuePerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> (ref _dequeuePerformanceCounter, value, 2); } get { return _dequeuePerformanceCounter; } } private PerformanceCounter _dequeueProcessedRateOfCountsPerSecondPerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.RateOfCountsPerSecond64 , CounterName = "05.每秒出队列并完成处理笔数(笔/秒)" ) ] public PerformanceCounter DequeueProcessedRateOfCountsPerSecondPerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> (ref _dequeueProcessedRateOfCountsPerSecondPerformanceCounter, value, 2); } get { return _dequeueProcessedRateOfCountsPerSecondPerformanceCounter; } } private PerformanceCounter _dequeueProcessedPerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.NumberOfItems64 , CounterName = "06.已出队列并完成处理累计总笔数(笔)" ) ] public PerformanceCounter DequeueProcessedPerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> (ref _dequeueProcessedPerformanceCounter, value, 2); } get { return _dequeueProcessedPerformanceCounter; } } private PerformanceCounter _dequeueProcessedAverageTimerPerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.AverageTimer32 , CounterName = "07.每笔已出队列并完成处理平均耗时秒数(秒/笔)" ) ] public PerformanceCounter DequeueProcessedAverageTimerPerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> (ref _dequeueProcessedAverageTimerPerformanceCounter, value, 2); } get { return _dequeueProcessedAverageTimerPerformanceCounter; } } private PerformanceCounter _dequeueProcessedAverageBasePerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.AverageBase ) ] public PerformanceCounter DequeueProcessedAverageBasePerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> (ref _dequeueProcessedAverageBasePerformanceCounter, value, 2); } get { return _dequeueProcessedAverageBasePerformanceCounter; } } private PerformanceCounter _queuedWaitAverageTimerPerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.AverageTimer32 , CounterName = "08.每笔入出队列并完成处理平均耗时秒数(秒/笔)" ) ] public PerformanceCounter QueuedWaitAverageTimerPerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> (ref _queuedWaitAverageTimerPerformanceCounter, value, 2); } get { return _queuedWaitAverageTimerPerformanceCounter; } } private PerformanceCounter _queuedWaitAverageBasePerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.AverageBase ) ] public PerformanceCounter QueuedWaitAverageBasePerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> (ref _queuedWaitAverageBasePerformanceCounter, value, 2); } get { return _queuedWaitAverageBasePerformanceCounter; } } private PerformanceCounter _dequeueThreadStartPerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.NumberOfItems64 , CounterName = "09.新建出队列处理线程启动次数(次)" ) ] public PerformanceCounter DequeueThreadStartPerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> (ref _dequeueThreadStartPerformanceCounter, value, 2); } get { return _dequeueThreadStartPerformanceCounter; } } private PerformanceCounter _dequeueThreadsCountPerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.NumberOfItems64 , CounterName = "10.当前出队列并发处理线程数(个)" ) ] public PerformanceCounter DequeueThreadsCountPerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> (ref _dequeueThreadsCountPerformanceCounter, value, 2); } get { return _dequeueThreadsCountPerformanceCounter; } } private PerformanceCounter _dequeueThreadEndPerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.NumberOfItems64 , CounterName = "11.出队列处理线程退出次数(次)" ) ] public PerformanceCounter DequeueThreadEndPerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> (ref _dequeueThreadEndPerformanceCounter, value, 2); } get { return _dequeueThreadEndPerformanceCounter; } } #endregion // indexer declaration public PerformanceCounter this[string name] { get { throw new NotImplementedException(); //return null; } } private bool _isAttachedPerformanceCounters = false; public void AttachPerformanceCountersToProperties ( string instanceName , string categoryName ) { if (!_isAttachedPerformanceCounters) { var type = this.GetType(); PerformanceCountersHelper .AttachPerformanceCountersToProperties<QueuePerformanceCountersContainer> (instanceName, categoryName, this); } _isAttachedPerformanceCounters = true; } }}namespace Microshaoft{ using System; using System.Diagnostics; using System.Threading; using System.Linq; public static class PerformanceCountersHelper { public static void TryCountPerformance ( bool enableCount , bool reThrowException = false , PerformanceCounter[] IncrementCountersBeforeCountPerformance = null , PerformanceCounter[] DecrementCountersBeforeCountPerformance = null , Tuple < bool //before时是否已经启动 , Stopwatch , PerformanceCounter , PerformanceCounter //base计数器 >[] timerCounters = null , Action onTryCountPerformanceProcessAction = null , Func<Exception, bool> onCaughtExceptionCountPerformanceProcessFunc = null , Action<bool, Exception> onFinallyCountPerformanceProcessAction = null , PerformanceCounter[] DecrementCountersAfterCountPerformance = null , PerformanceCounter[] IncrementCountersAfterCountPerformance = null ) { if (onTryCountPerformanceProcessAction != null) { if (enableCount) { #region before if (IncrementCountersBeforeCountPerformance != null) { Array.ForEach ( IncrementCountersBeforeCountPerformance , (x) => { var l = x.Increment(); } ); } if (DecrementCountersBeforeCountPerformance != null) { Array.ForEach ( DecrementCountersBeforeCountPerformance , (x) => { var l = x.Decrement(); if (l < 0) { x.RawValue = 0; } } ); } if (timerCounters != null) { Array.ForEach ( timerCounters , (x) => { if ( x.Item1 && x.Item2 != null ) {#if NET45 x.Item2.Restart();#elif NET35 x.Item2.Reset(); x.Item2.Start();#endif } } ); } #endregion } var needTry = true; TryCatchFinallyProcessHelper .TryProcessCatchFinally ( needTry , () => { onTryCountPerformanceProcessAction(); } , reThrowException , (x, y) => { if (onCaughtExceptionCountPerformanceProcessFunc != null) { reThrowException = onCaughtExceptionCountPerformanceProcessFunc(x); } return reThrowException; } , (x, y) => { if (enableCount) { #region after#if NET45 if (timerCounters != null) { Array.ForEach ( timerCounters , (xx) => { if (xx.Item2 != null) { Stopwatch stopwatch = xx.Item2; stopwatch.Stop(); long elapsedTicks = stopwatch.ElapsedTicks; var counter = xx.Item3; counter.IncrementBy(elapsedTicks); //池化 //stopwatch = null; counter = xx.Item4; //base counter.Increment(); } } ); }#endif if (IncrementCountersAfterCountPerformance != null) { Array.ForEach ( IncrementCountersAfterCountPerformance , (xx) => { var l = xx.Increment(); } ); } if (DecrementCountersAfterCountPerformance != null) { Array.ForEach ( DecrementCountersAfterCountPerformance , (xx) => { var l = xx.Decrement(); if (l < 0) { xx.RawValue = 0; } } ); } #endregion } if (onFinallyCountPerformanceProcessAction != null) { onFinallyCountPerformanceProcessAction(x, y); } } ); } } public static void AttachPerformanceCountersToProperties<T> ( string performanceCounterInstanceName , string category , T target //= default(T) ) { var type = typeof(T); var propertiesList = type.GetProperties().ToList(); propertiesList = propertiesList .Where ( (pi) => { var parameters = pi.GetIndexParameters(); return ( pi.PropertyType == typeof(PerformanceCounter) && (parameters == null ? 0 : parameters.Length) <= 0 ); } ).ToList(); if (PerformanceCounterCategory.Exists(category)) { propertiesList .ForEach ( (pi) => { if (PerformanceCounterCategory.CounterExists(pi.Name, category)) { if (PerformanceCounterCategory.InstanceExists(performanceCounterInstanceName, category)) { //var pc = new PerformanceCounter(category, pi.Name, instanceName, false); //pc.InstanceName = instanceName; //pc.RemoveInstance(); } } } ); //PerformanceCounterCategory.Delete(category); } if (!PerformanceCounterCategory.Exists(category)) { var ccdc = new CounterCreationDataCollection(); propertiesList .ForEach ( (pi) => { var propertyName = pi.Name; var performanceCounterType = PerformanceCounterType.NumberOfItems64; var performanceCounterName = propertyName; var attribute = pi .GetCustomAttributes(false) .FirstOrDefault ( (x) => { return x as PerformanceCounterDefinitionAttribute != null; } ) as PerformanceCounterDefinitionAttribute; if (attribute != null) { var counterName = attribute.CounterName; if (!string.IsNullOrEmpty(counterName)) { performanceCounterName = counterName; } var counterType = attribute.CounterType; //if (counterType != null) { performanceCounterType = counterType; } } var ccd = PerformanceCountersHelper .GetCounterCreationData ( performanceCounterName , performanceCounterType ); ccdc.Add(ccd); } ); PerformanceCounterCategory .Create ( category , string.Format("{0} Category Help.", category) , PerformanceCounterCategoryType.MultiInstance , ccdc ); } propertiesList.ForEach ( (pi) => { var propertyName = pi.Name; var performanceCounterType = PerformanceCounterType.NumberOfItems64; var performanceCounterName = propertyName; var attribute = pi .GetCustomAttributes(false) .FirstOrDefault ( (x) => { return x as PerformanceCounterDefinitionAttribute != null; } ) as PerformanceCounterDefinitionAttribute; if (attribute != null) { var counterName = attribute.CounterName; if (!string.IsNullOrEmpty(counterName)) { performanceCounterName = counterName; } var counterType = attribute.CounterType; //if (counterType != null) { performanceCounterType = counterType; } } var pc = new PerformanceCounter() { CategoryName = category , CounterName = performanceCounterName , InstanceLifetime = PerformanceCounterInstanceLifetime.Process , InstanceName = performanceCounterInstanceName , ReadOnly = false , RawValue = 0 }; if (pi.GetGetMethod().IsStatic) { var setter = DynamicPropertyAccessor .CreateSetStaticPropertyValueAction<PerformanceCounter> ( type , propertyName ); setter(pc); } else { if (target != null) { var setter = DynamicPropertyAccessor .CreateSetPropertyValueAction<PerformanceCounter> ( type , propertyName ); setter(target, pc); } } } ); } public static CounterCreationData GetCounterCreationData ( string counterName , PerformanceCounterType performanceCounterType ) { return new CounterCreationData() { CounterName = counterName , CounterHelp = string.Format("{0} Help", counterName) , CounterType = performanceCounterType }; } }}namespace Microshaoft{ using System; using System.Diagnostics; public class CommonPerformanceCountersContainer : IPerformanceCountersContainer { #region PerformanceCounters private PerformanceCounter _caughtExceptionsPerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.NumberOfItems64 , CounterName = "99.捕获异常次数(次)" ) ] public PerformanceCounter CaughtExceptionsPerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> ( ref _caughtExceptionsPerformanceCounter , value , 2 ); } get { return _caughtExceptionsPerformanceCounter; } } private PerformanceCounter _processPerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.NumberOfItems64 , CounterName = "01.接收处理笔数(笔)" ) ] public PerformanceCounter PrcocessPerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> ( ref _processPerformanceCounter , value , 2 ); } get { return _processPerformanceCounter; } } private PerformanceCounter _processingPerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.NumberOfItems64 , CounterName = "02.正在处理笔数(笔)" ) ] public PerformanceCounter ProcessingPerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> ( ref _processingPerformanceCounter , value , 2 ); } get { return _processingPerformanceCounter; } } private PerformanceCounter _processedPerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.NumberOfItems64 , CounterName = "03.完成处理笔数(笔)" ) ] public PerformanceCounter ProcessedPerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> ( ref _processedPerformanceCounter , value , 2 ); } get { return _processedPerformanceCounter; } } private PerformanceCounter _processedRateOfCountsPerSecondPerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.RateOfCountsPerSecond64 , CounterName = "04.每秒完成处理笔数(笔/秒)" ) ] public PerformanceCounter ProcessedRateOfCountsPerSecondPerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> ( ref _processedRateOfCountsPerSecondPerformanceCounter , value , 2 ); } get { return _processedRateOfCountsPerSecondPerformanceCounter; } } private PerformanceCounter _ProcessedAverageTimerPerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.AverageTimer32 , CounterName = "05.平均每笔处理耗时秒数(秒/笔)" ) ] public PerformanceCounter ProcessedAverageTimerPerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> ( ref _ProcessedAverageTimerPerformanceCounter , value , 2 ); } get { return _ProcessedAverageTimerPerformanceCounter; } } private PerformanceCounter _processedAverageBasePerformanceCounter; [ PerformanceCounterDefinitionAttribute ( CounterType = PerformanceCounterType.AverageBase ) ] public PerformanceCounter ProcessedAverageBasePerformanceCounter { private set { ReaderWriterLockSlimHelper .TryEnterWriterLockSlimWrite<PerformanceCounter> ( ref _processedAverageBasePerformanceCounter , value , 2 ); } get { return _processedAverageBasePerformanceCounter; } } #endregion // indexer declaration public PerformanceCounter this[string name] { get { throw new NotImplementedException(); //return null; } } //private bool _isAttachedPerformanceCounters = false; public void AttachPerformanceCountersToProperties ( string instanceName , string categoryName ) { var type = this.GetType(); PerformanceCountersHelper .AttachPerformanceCountersToProperties<CommonPerformanceCountersContainer> ( instanceName , categoryName , this ); } }}namespace Microshaoft{ using System; using System.Diagnostics; public static class PerformanceCounterExtensionMethodsManager { public static void ChangeAverageTimerCounterValueWithTryCatchExceptionFinally ( this PerformanceCounter performanceCounter , bool enabled , PerformanceCounter basePerformanceCounter , Stopwatch stopwatch , Action onCountPerformanceInnerProcessAction = null , Func<PerformanceCounter, Exception, bool> onCaughtExceptionProcessFunc = null , Action<PerformanceCounter, PerformanceCounter, bool, Exception> onFinallyProcessAction = null ) { //Stopwatch stopwatch = null; if (enabled) { stopwatch.Reset(); stopwatch.Start(); } if (onCountPerformanceInnerProcessAction != null) { bool reThrowException = false; TryCatchFinallyProcessHelper .TryProcessCatchFinally ( true , () => { onCountPerformanceInnerProcessAction(); } , reThrowException , (x, y) => { var r = reThrowException; if (onCaughtExceptionProcessFunc != null) { r = onCaughtExceptionProcessFunc(performanceCounter, x); } return r; } , (x, y) => { if (enabled && stopwatch != null && stopwatch.IsRunning) { stopwatch.Stop(); performanceCounter.IncrementBy(stopwatch.ElapsedTicks); //stopwatch = null; basePerformanceCounter.Increment(); } if (onFinallyProcessAction != null) { onFinallyProcessAction ( performanceCounter , basePerformanceCounter , x , y ); } } ); } } }}namespace Microshaoft{ using System; using System.Diagnostics; [FlagsAttribute] public enum MultiPerformanceCountersTypeFlags : ushort { None = 0, ProcessCounter = 1, ProcessingCounter = 2, ProcessedCounter = 4, ProcessedAverageTimerCounter = 8, ProcessedRateOfCountsPerSecondCounter = 16 }; [AttributeUsage(AttributeTargets.Property, AllowMultiple = false, Inherited = false)] public class PerformanceCounterDefinitionAttribute : Attribute { public PerformanceCounterType CounterType; public string CounterName; }}namespace Microshaoft{ using System.Diagnostics; public interface IPerformanceCountersContainer { void AttachPerformanceCountersToProperties ( string performanceCountersCategoryInstanceName , string performanceCountersCategoryName ); PerformanceCounter PrcocessPerformanceCounter { get; } PerformanceCounter ProcessingPerformanceCounter { get; } PerformanceCounter ProcessedPerformanceCounter { get; } PerformanceCounter ProcessedRateOfCountsPerSecondPerformanceCounter { get; } PerformanceCounter ProcessedAverageTimerPerformanceCounter { get; } PerformanceCounter ProcessedAverageBasePerformanceCounter { get; } PerformanceCounter CaughtExceptionsPerformanceCounter { get; } }}namespace Microshaoft{ using System.Collections.Concurrent; public class QueuedObjectsPool<T> where T: new() { private ConcurrentQueue<T> _pool = new ConcurrentQueue<T>(); public QueuedObjectsPool(int capacity) { _pool = new ConcurrentQueue<T>(); for (int i = 0; i < capacity; i++) { PutNew(); } } public void PutNew() { var e = default(T); e = new T(); Put(e); } public bool Put(T target) { var r = false; if (target != null) { _pool.Enqueue(target); r = true; } return r; } public T Get() { T r; while (!_pool.TryDequeue(out r)) { PutNew(); } return r; } }}namespace Microshaoft{ using System; using System.Threading; public static class ReaderWriterLockSlimHelper { public static bool TryEnterWriterLockSlimWrite<T> ( ref T target , T newTarget , int enterTimeOutSeconds ) where T : class { bool r = false; var rwls = new ReaderWriterLockSlim(); int timeOut = Timeout.Infinite; if (enterTimeOutSeconds >= 0) { timeOut = enterTimeOutSeconds * 1000; } try { r = (rwls.TryEnterWriteLock(timeOut)); if (r) { Interlocked.Exchange<T>(ref target, newTarget); r = true; } } finally { if (r) { rwls.ExitWriteLock(); } } return r; } public static bool TryEnterWriterLockSlimWrite ( Action action , int enterTimeOutSeconds ) { bool r = false; if (action != null) { var rwls = new ReaderWriterLockSlim(); int timeOut = Timeout.Infinite; if (enterTimeOutSeconds >= 0) { timeOut = enterTimeOutSeconds * 1000; } try { r = (rwls.TryEnterWriteLock(timeOut)); if (r) { action(); r = true; } } finally { if (r) { rwls.ExitWriteLock(); } } } return r; } }}namespace Microshaoft{ using System; using System.Diagnostics; using System.Reflection;#if NET45//#endif using System.Threading.Tasks;//#if NET45#endif public static class TryCatchFinallyProcessHelper {#if NET45//#endif public static async Task<T> TryProcessCatchFinallyAsync<T> ( bool needTry , Func<Task<T>> onTryProcessFunc , bool reThrowException = false , Func<Exception, bool> onCaughtExceptionProcessFunc = null , Action<bool, Exception> onFinallyProcessAction = null ) { T r = default(T); //if (onTryProcessAction != null) { if (needTry) { Exception exception = null; var caughtException = false; try { r = await onTryProcessFunc(); return r; } catch (Exception e) { caughtException = true; exception = e; var currentCalleeMethod = MethodInfo.GetCurrentMethod(); var currentCalleeType = currentCalleeMethod.DeclaringType; StackTrace stackTrace = new StackTrace(); StackFrame stackFrame = stackTrace.GetFrame(1); var callerMethod = stackFrame.GetMethod(); var callerType = callerMethod.DeclaringType; var frame = (stackTrace.FrameCount > 1 ? stackTrace.FrameCount - 1 : 1); stackFrame = stackTrace.GetFrame(frame); var originalCallerMethod = stackFrame.GetMethod(); var originalCallerType = originalCallerMethod.DeclaringType; var innerExceptionMessage = string.Format ( "Rethrow caught [{1}] Exception{0} at Callee Method: [{2}]{0} at Caller Method: [{3}]{0} at Original Caller Method: [{4}]" , "\r\n\t" , e.Message , string.Format("{1}{0}{2}", "::", currentCalleeType, currentCalleeMethod) , string.Format("{1}{0}{2}", "::", callerType, callerMethod) , string.Format("{1}{0}{2}", "::", originalCallerType, originalCallerMethod) ); Console.WriteLine(innerExceptionMessage); if (onCaughtExceptionProcessFunc != null) { reThrowException = onCaughtExceptionProcessFunc(e); } if (reThrowException) { throw new Exception ( innerExceptionMessage , e ); } return r; } finally { if (onFinallyProcessAction != null) { onFinallyProcessAction(caughtException, exception); } } } else { return await onTryProcessFunc(); } } }//#if NET45#endif public static void TryProcessCatchFinally ( bool needTry , Action onTryProcessAction , bool reThrowException = false , Func<Exception, string, bool> onCaughtExceptionProcessFunc = null , Action<bool, Exception> onFinallyProcessAction = null ) { if (onTryProcessAction != null) { if (needTry) { Exception exception = null; var caughtException = false; try { onTryProcessAction(); } catch (Exception e) { caughtException = true; exception = e;#if NET45 if (e is AggregateException) { var aggregateException = e as AggregateException; if (aggregateException != null) { exception = aggregateException.Flatten(); } }#endif var currentCalleeMethod = MethodInfo.GetCurrentMethod(); var currentCalleeType = currentCalleeMethod.DeclaringType; StackTrace stackTrace = new StackTrace(e, true); StackFrame stackFrame = stackTrace.GetFrame(1); var callerMethod = stackFrame.GetMethod(); var callerType = callerMethod.DeclaringType; var frame = (stackTrace.FrameCount > 1 ? stackTrace.FrameCount - 1 : 1); stackFrame = stackTrace.GetFrame(frame); var originalCallerMethod = stackFrame.GetMethod(); var originalCallerType = originalCallerMethod.DeclaringType; var innerExceptionMessage = string.Format ( "Rethrow caught [{1}] Exception{0} at Callee Method: [{2}]{0} at Caller Method: [{3}]{0} at Original Caller Method: [{4}]" , "\r\n\t" , e.Message , string.Format("{1}{0}{2}", "::", currentCalleeType, currentCalleeMethod) , string.Format("{1}{0}{2}", "::", callerType, callerMethod) , string.Format("{1}{0}{2}", "::", originalCallerType, originalCallerMethod) ); //Console.WriteLine(innerExceptionMessage); if (onCaughtExceptionProcessFunc != null) { reThrowException = onCaughtExceptionProcessFunc(e, innerExceptionMessage); } if (reThrowException) { throw new Exception ( innerExceptionMessage , e ); } } finally { //Console.WriteLine("Finally"); if (onFinallyProcessAction != null) { onFinallyProcessAction(caughtException, exception); } } } else { onTryProcessAction(); } } } }}namespace Microshaoft{ using System; using System.Linq; using System.Linq.Expressions; using System.Reflection; public class DynamicPropertyAccessor { private static Assembly GetAssemblyByTypeName(string typeName) { return AppDomain .CurrentDomain .GetAssemblies() .First ( (a) => { return a .GetTypes() .Any ( (t) => { return ( t.FullName == typeName ); } ); } ); } public static Func<object, object> CreateGetPropertyValueFunc ( string typeName , string propertyName , bool isTypeFromAssembly = false ) { Type type; if (isTypeFromAssembly) { var assembly = GetAssemblyByTypeName(typeName); type = assembly.GetType(typeName); } else { type = Type.GetType(typeName); } return CreateGetPropertyValueFunc(type, propertyName); } public static Func<object, object> CreateGetPropertyValueFunc ( Type type , string propertyName ) { var target = Expression.Parameter(typeof(object), "P"); var castTarget = Expression.Convert(target, type); var getPropertyValue = http://www.mamicode.com/Expression.Property(castTarget, propertyName); var castPropertyValue = http://www.mamicode.com/Expression.Convert(getPropertyValue, typeof(object)); var lambda = Expression.Lambda<Func<object, object>>(castPropertyValue, target); return lambda.Compile(); } public static Func<object, TProperty> CreateGetPropertyValueFunc<TProperty> ( string typeName , string propertyName , bool isTypeFromAssembly = false ) { Type type; if (isTypeFromAssembly) { var assembly = GetAssemblyByTypeName(typeName); type = assembly.GetType(typeName); } else { type = Type.GetType(typeName); } return CreateGetPropertyValueFunc<TProperty>(type, propertyName); } public static Func<object, TProperty> CreateGetPropertyValueFunc<TProperty> ( Type type , string propertyName ) { var target = Expression.Parameter(typeof(object), "p"); var castTarget = Expression.Convert(target, type); var getPropertyValue = http://www.mamicode.com/Expression.Property(castTarget, propertyName); var lambda = Expression.Lambda<Func<object, TProperty>>(getPropertyValue, target); return lambda.Compile(); } public static Func<TProperty> CreateGetStaticPropertyValueFunc<TProperty> ( string typeName , string propertyName , bool isTypeFromAssembly = false ) { Type type; if (isTypeFromAssembly) { var assembly = GetAssemblyByTypeName(typeName); type = assembly.GetType(typeName); } else { type = Type.GetType(typeName); } return CreateGetStaticPropertyValueFunc<TProperty>(type, propertyName); } public static Func<TProperty> CreateGetStaticPropertyValueFunc<TProperty> ( Type type , string propertyName ) { Func<TProperty> func = null; var property = type.GetProperty(propertyName, typeof(TProperty)); if (property == null) { property = type .GetProperties() .ToList() .FirstOrDefault ( (x) => { return ( x.Name.ToLower() == propertyName.ToLower() ); } ); } if (property != null) { var getPropertyValue = http://www.mamicode.com/Expression.Property(null, property); var lambda = Expression.Lambda<Func<TProperty>>(getPropertyValue, null); func = lambda.Compile(); } return func; } public static Func<object> CreateGetStaticPropertyValueFunc ( Type type , string propertyName ) { Func<object> func = null; var property = type.GetProperty(propertyName); if (property == null) { property = type .GetProperties() .ToList() .FirstOrDefault ( (x) => { return ( x.Name.ToLower() == propertyName.ToLower() ); } ); } if (property != null) { var getPropertyValue = http://www.mamicode.com/Expression.Property(null, property); var castPropertyValue = http://www.mamicode.com/Expression.Convert(getPropertyValue, typeof(object)); var lambda = Expression.Lambda<Func<object>>(castPropertyValue, null); func = lambda.Compile(); } return func; } public static Func<object> CreateGetStaticPropertyValueFunc ( string typeName , string propertyName , bool isTypeFromAssembly = false ) { Type type; if (isTypeFromAssembly) { var assembly = GetAssemblyByTypeName(typeName); type = assembly.GetType(typeName); } else { type = Type.GetType(typeName); } return CreateGetStaticPropertyValueFunc(type, propertyName); } public static Action<object, object> CreateSetPropertyValueAction ( Type type , string propertyName ) { Action<object, object> action = null; var property = type.GetProperty(propertyName); if (property == null) { property = type .GetProperties() .ToList() .FirstOrDefault ( (x) => { return ( x.Name.ToLower() == propertyName.ToLower() ); } ); } if (property != null) { var target = Expression.Parameter(typeof(object), "p"); var propertyValue = http://www.mamicode.com/Expression.Parameter(typeof(object), "p"); var castTarget = Expression.Convert(target, type); var castPropertyValue = http://www.mamicode.com/Expression.Convert(propertyValue, property.PropertyType); var getSetMethod = property.GetSetMethod(); if (getSetMethod == null) { getSetMethod = property.GetSetMethod(true); } var call = Expression.Call(castTarget, getSetMethod, castPropertyValue); var lambda = Expression.Lambda<Action<object, object>>(call, target, propertyValue); action = lambda.Compile(); } return action; } public static Action<object, object> CreateSetPropertyValueAction ( string typeName , string propertyName , bool isTypeFromAssembly = false ) { Type type; if (isTypeFromAssembly) { var assembly = GetAssemblyByTypeName(typeName); type = assembly.GetType(typeName); } else { type = Type.GetType(typeName); } return CreateSetPropertyValueAction(type, propertyName); } public static Action<object, TProperty> CreateSetPropertyValueAction<TProperty> ( Type type , string propertyName ) { Action<object, TProperty> action = null; var property = type.GetProperty(propertyName); if (property == null) { property = type .GetProperties() .ToList() .FirstOrDefault ( (x) => { return ( x.Name.ToLower() == propertyName.ToLower() ); } ); } if (property != null) { var target = Expression.Parameter(typeof(object), "p"); var propertyValue = http://www.mamicode.com/Expression.Parameter(typeof(TProperty), "p"); var castTarget = Expression.Convert(target, type); var castPropertyValue = http://www.mamicode.com/Expression.Convert(propertyValue, property.PropertyType); var getSetMethod = property.GetSetMethod(); if (getSetMethod == null) { getSetMethod = property.GetSetMethod(true); } var call = Expression.Call(castTarget, getSetMethod, castPropertyValue); var lambda = Expression.Lambda<Action<object, TProperty>>(call, target, propertyValue); action = lambda.Compile(); } return action; } public static Action<object, TProperty> CreateSetPropertyValueAction<TProperty> ( string typeName , string propertyName , bool isTypeFromAssembly = false ) { Type type; if (isTypeFromAssembly) { var assembly = GetAssemblyByTypeName(typeName); type = assembly.GetType(typeName); } else { type = Type.GetType(typeName); } return CreateSetPropertyValueAction<TProperty>(type, propertyName); } public static Action<object> CreateSetStaticPropertyValueAction ( Type type , string propertyName ) { Action<object> action = null; var property = type.GetProperty(propertyName); if (property == null) { property = type .GetProperties() .ToList() .FirstOrDefault ( (x) => { return ( x.Name.ToLower() == propertyName.ToLower() ); } ); } if (property != null) { var propertyValue = http://www.mamicode.com/Expression.Parameter(typeof(object), "p"); var castPropertyValue = http://www.mamicode.com/Expression.Convert(propertyValue, property.PropertyType); var getSetMethod = property.GetSetMethod(); if (getSetMethod == null) { getSetMethod = property.GetSetMethod(true); } var call = Expression.Call(null, getSetMethod, castPropertyValue); var lambda = Expression.Lambda<Action<object>>(call, propertyValue); action = lambda.Compile(); } return action; } public static Action<object> CreateSetStaticPropertyValueAction ( string typeName , string propertyName , bool isTypeFromAssembly = false ) { Type type; if (isTypeFromAssembly) { var assembly = GetAssemblyByTypeName(typeName); type = assembly.GetType(typeName); } else { type = Type.GetType(typeName); } return CreateSetStaticPropertyValueAction(type, propertyName); } public static Action<TProperty> CreateSetStaticPropertyValueAction<TProperty> ( Type type , string propertyName ) { Action<TProperty> action = null; var property = type.GetProperty(propertyName); if (property == null) { property = type .GetProperties() .ToList() .FirstOrDefault ( (x) => { return ( x.Name.ToLower() == propertyName.ToLower() ); } ); } if (property != null) { var propertyValue = http://www.mamicode.com/Expression.Parameter(typeof(TProperty), "p"); //var castPropertyValue = http://www.mamicode.com/Expression.Convert(propertyValue, property.PropertyType); var getSetMethod = property.GetSetMethod(); if (getSetMethod == null) { getSetMethod = property.GetSetMethod(true); } var call = Expression.Call(null, getSetMethod, propertyValue); var lambda = Expression.Lambda<Action<TProperty>>(call, propertyValue); action = lambda.Compile(); } return action; } public static Action<TProperty> CreateSetStaticPropertyValueAction<TProperty> ( string typeName , string propertyName , bool isTypeFromAssembly = false ) { Type type; if (isTypeFromAssembly) { var assembly = GetAssemblyByTypeName(typeName); type = assembly.GetType(typeName); } else { type = Type.GetType(typeName); } return CreateSetStaticPropertyValueAction<TProperty>(type, propertyName); } }} |
ConcurrentAsyncQueue 2014-09-07
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。