首页 > 代码库 > 利用TaskScheduler处理Queue、Stack等类型的操作队列(生产者消费者场景)

利用TaskScheduler处理Queue、Stack等类型的操作队列(生产者消费者场景)

<?xml version="1.0" encoding="UTF-8"?>我们经常会遇到生产者消费者模式,比如前端各种UI操作事件触发后台逻辑等。在这种典型的应用场景中,我们可能会有4个业务处理逻辑(下文以P代表生产者,C代表消费者):

1. FIFO(先进先出)
     P产生1,2,3,4,5,6,3,2
     C处理顺序应为1,2,3,4,5,6,3,2
2.LIFO(后进先出)
     P产生1,2,3,4,5,6,3,2
     C处理顺序应为2,3,6,5,4,3,2,1
3.Dynamic FIFO(我定义为:去掉相同数据的FIFO, 如果产生的数据队列里已经有相同数据,后进的数据优先级高)
     P产生1,2,3,4,5,6,3,2
     C处理顺序为1,4,5,6,3,2

4.Dynamic LIFO(我定义为:去掉相同数据的LIFO, 如果产生的数据栈里已经有相同数据,后进的数据优先级高)
     P产生1,2,3,4,5,6,3,2
     C处理顺序为2,3,6,5,4,1
     

1,2情况为基本处理逻辑,3,4可能和我们实际场景有关系(包括:判断相同的逻辑可能不同、已存在和后续数据哪个优先级高)

C#中有个Task类进行异步操作,我们可以通过TaskScheduler类进行任务调度,实现上述的4种基本场景。

定义上述4种场景的通用接口以及其遍历类
public interface IScheduler : IEnumerable<Task >
    {
        void Add (Task t);
        void Remove (Task t);

        int Count { get; }
        Task this [int index] { get; set ; }
    }

    public class SchedulerEnumerator : IEnumerator< Task>
    {
        private IScheduler _collection;
        private int _currentIndex;
        private Task _currentTask;


        public SchedulerEnumerator (IScheduler collection)
        {
            _collection = collection ;
            _currentIndex = -1;
            _currentTask = default (Task);
        }

        public bool MoveNext()
        {
            //Avoids going beyond the end of the collection.
            if (++_currentIndex >= _collection. Count)
            {
                return false ;
            }
            else
            {
                // Set current box to next item in collection.
                _currentTask = _collection [_currentIndex];
            }
            return true ;
        }

        public void Reset() { _currentIndex = -1; }

        void IDisposable .Dispose() { }

        public Task Current
        {
            get { return _currentTask; }
        }


        object IEnumerator .Current
        {
            get { return Current; }
        }

    }


实现我们自己的任务调度类模板,可以通过T传递我们想要的队列类型
 public class TaskSchedulerBase <T> : TaskScheduler
        where T : IScheduler , new ()

    {
        private Thread _processThread;
        private readonly object _lock = new object ();

        public TaskSchedulerBase()
        {
            _processThread = new Thread (this.Process);
        }

        private void Process()
        {
            lock (_lock)
            {
                var tasks = GetScheduledTasks();
                if (null != tasks)
                {
                    foreach (var t in tasks)
                    {
                        TryExecuteTask(t);

                        TryDequeue(t);
                    }
                }
            }
        }

        protected override void QueueTask( Task task)
        {
            lock (_lock)
            {
                Scheduler.Add(task);

                if (_processThread.ThreadState.Equals(ThreadState .Stopped))
                {
                    _processThread = new Thread (Process);
                }

                if (!_processThread.IsAlive
                    && !_processThread.ThreadState.Equals( ThreadState.Running))
                {
                    try
                    {
                        _processThread.Start();
                    }
                    catch (System.Exception )
                    {
                        if (!_processThread.ThreadState.Equals(ThreadState .Running))
                        {
                            _processThread = new Thread (Process);
                            _processThread.Start();
                        }
                    }
                }
            }
        }

        protected override bool TryDequeue( Task task)
        {
            Scheduler.Remove(task);

            return true ;
        }

        protected override IEnumerable< Task> GetScheduledTasks()
        {
            return Scheduler.ToArray();
        }

        protected override bool TryExecuteTaskInline( Task task, bool taskWasPreviouslyQueued)
        {
            if (taskWasPreviouslyQueued)
            {
                if (TryDequeue(task))
                {
                    return base .TryExecuteTask(task);
                }
                else
                {
                    return false ;
                }
            }
            else
            {
                return base .TryExecuteTask(task);
            }
        }

        private readonly T _scheduler = new T();
        public T Scheduler
        {
            get
            {
                return _scheduler;
            }
        }
    }

实现4种队列
     1.FIFO
     
 public class QueueScheduler : IScheduler
    {
        protected Queue <Task> _queue;

        public QueueScheduler ()
        {
            _queue = new Queue< Task>();
        }

        public void Add( Task t )
        {
            if (!Contains (t))
            {
                _queue.Enqueue (t);
            }
        }

        public void Remove( Task t )
        {
            _queue.Dequeue ();
        }

        public bool Contains( Task t )
        {
            bool found = false;

            foreach (var task in _queue )
            {
                if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))
                {
                    found = true ;
                    break;
                }
            }

            return found ;
        }

        public bool Contains( Task t , EqualityComparer< Task> comp )
        {
            throw new NotImplementedException();
        }

        public IEnumerator <Task> GetEnumerator()
        {
            return new SchedulerEnumerator( this);
        }

        IEnumerator IEnumerable .GetEnumerator()
        {
            return new SchedulerEnumerator( this);
        }

        public int Count
        {
            get { return _queue. Count; }
        }

        public Task this[ int index]
        {
            get { return (Task) _queue.ToArray ()[index]; }
            set { _queue .ToArray()[index] = value; }
        }
    }

     2.LIFO
public class StackScheduler : IScheduler
    {
        protected Stack <Task> _stack;

        public StackScheduler ()
        {
            _stack = new Stack< Task>();
        }

        public void Add( Task t )
        {
            if (!Contains (t))
            {
                _stack.Push (t);
            }
        }

        public void Remove( Task t )
        {
            _stack.Pop ();
        }

        public bool Contains( Task t )
        {
            bool found = false;

            foreach (var task in _stack )
            {
                if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))
                {
                    found = true ;
                    break;
                }
            }

            return found ;
        }

        public bool Contains( Task t , EqualityComparer< Task> comp )
        {
            throw new NotImplementedException();
        }

        public IEnumerator <Task> GetEnumerator()
        {
            return new SchedulerEnumerator( this);
        }

        IEnumerator IEnumerable .GetEnumerator()
        {
            return new SchedulerEnumerator( this);
        }

        public int Count
        {
            get { return _stack. Count; }
        }

        public Task this[ int index]
        {
            get { return (Task) _stack.ToArray ()[index]; }
            set { _stack .ToArray()[index] = value; }
        }
    }

     3.Dynamic FIFO

public class DynamicQueueScheduler : IScheduler
    {
        protected List <Task> _queue;

        public DynamicQueueScheduler ()
        {
            _queue = new List< Task>();
        }

        public virtual void Add(Task t)
        {
            Task oldTask = null;
            if (Contains (t, out oldTask ))
            {
                _queue.Remove (oldTask);
            }
  
            _queue.Add (t);
        }

        public virtual void Remove(Task t)
        {
            _queue.Remove (t);
        }

        public virtual bool Contains(Task t)
        {
            Task oldTask = null;
            bool found = Contains( t, out oldTask);
            return found ;
        }

        public virtual bool Contains(Task t, out Task oldTask)
        {
            bool found = false;
            oldTask = null ;

            foreach (var task in _queue )
            {
                if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))
                {
                    oldTask = task ;
                    found = true ;

                    break;
                }
            }

            return found ;
        }

        public virtual bool Contains(Task t, EqualityComparer<Task > comp)
        {
            throw new NotImplementedException();
        }

        public IEnumerator <Task> GetEnumerator()
        {
            return new SchedulerEnumerator( this);
        }

        IEnumerator IEnumerable .GetEnumerator()
        {
            return new SchedulerEnumerator( this);
        }

        public int Count
        {
            get { return _queue. Count; }
        }

        public Task this[ int index]
        {
            get { return (Task) _queue[index]; }
            set { _queue [index] = value; }
        }
    }

     4.Dynamic LIFO
 public class DynamicStackScheduler : IScheduler
    {
        protected List <Task> _queue;

        public DynamicStackScheduler ()
        {
            _queue = new List< Task>();
        }

        public void Add( Task t )
        {
            Task oldTask = null;
            if (Contains (t, out oldTask ))
            {
                _queue.Remove (oldTask);
            }
  
            _queue.Insert (0,t);
        }

        public void Remove( Task t )
        {
            _queue.Remove (t);
        }

        public bool Contains( Task t )
        {
            Task oldTask = null;
            bool found = Contains( t, out oldTask);
            return found ;
        }

        public bool Contains( Task t , out Task oldTask )
        {
            bool found = false;
            oldTask = null ;

            foreach (var task in _queue )
            {
                if (t .AsyncState != null && t .AsyncState. Equals(task .AsyncState))
                {
                    oldTask = task ;
                    found = true ;

                    break;
                }
            }

            return found ;
        }

        public bool Contains( Task t , EqualityComparer< Task> comp )
        {
            throw new NotImplementedException();
        }

        public IEnumerator <Task> GetEnumerator()
        {
            return new SchedulerEnumerator( this);
        }

        IEnumerator IEnumerable .GetEnumerator()
        {
            return new SchedulerEnumerator( this);
        }

        public int Count
        {
            get { return _queue. Count; }
        }

        public Task this[ int index]
        {
            get { return (Task) _queue[index]; }
            set { _queue [index] = value; }
        }
    }


测试代码
  class Program
    {
        static Queue <int> _queue = new Queue< int>();

        //static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<QueueScheduler>());
        //static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<StackScheduler>());
        //static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<DynamicQueueScheduler>());
        //static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<DynamicStackScheduler>());
        static TaskFactory _factory = new TaskFactory (new TaskSchedulerBase<DynamicQueueScheduler >());
        static void Main( string[] args )
        {
            var thread1 = new Thread(Producer );
            var thread2 = new Thread(Consumer );

            thread1.Start ();
            thread2.Start ();

            Console.ReadKey ();
        }

        static void Producer()
        {
            for (int i = 0; i < 7; i ++)
            {
                _queue.Enqueue (i);
            }

            _queue.Enqueue (3);
            _queue.Enqueue (2);
        }

        static void Consumer()
        {
            while (true )
            {
                if (_queue .Count > 0)
                {
                    foreach (var i in _queue )
                    {
                        _factory.StartNew ((s) =>
                        {
                            Console.Write ("{0} on thread {1} {2}\n", s, Thread.CurrentThread .ManagedThreadId,
                                          DateTime.Now .ToLongTimeString());
                        }, i);
                    }

                    _queue.Clear ();
                }
                else
                {
                    Thread.Sleep (1);
                }
            }
        }
    }