首页 > 代码库 > 利用TaskScheduler处理Queue、Stack等类型的操作队列(生产者消费者场景)

利用TaskScheduler处理Queue、Stack等类型的操作队列(生产者消费者场景)

我们经常会遇到生产者消费者模式,比如前端各种UI操作事件触发后台逻辑等。在这种典型的应用场景中,我们可能会有4个业务处理逻辑(下文以P代表生产者,C代表消费者):

1. FIFO(先进先出)
     P产生1,2,3,4,5,6,3,2
     C处理顺序应为1,2,3,4,5,6,3,2
2.LIFO(后进先出)
     P产生1,2,3,4,5,6,3,2
     C处理顺序应为2,3,6,5,4,3,2,1
3.Dynamic FIFO(我定义为:去掉相同数据的FIFO, 如果产生的数据队列里已经有相同数据,后进的数据优先级高)
     P产生1,2,3,4,5,6,3,2
     C处理顺序为1,4,5,6,3,2

4.Dynamic LIFO(我定义为:去掉相同数据的LIFO, 如果产生的数据栈里已经有相同数据,后进的数据优先级高)
     P产生1,2,3,4,5,6,3,2
     C处理顺序为2,3,6,5,4,1
     

1,2情况为基本处理逻辑,3,4可能和我们实际场景有关系(包括:判断相同的逻辑可能不同、已存在和后续数据哪个优先级高)

C#中有个Task类进行异步操作,我们可以通过TaskScheduler类进行任务调度,实现上述的4种基本场景。

定义上述4种场景的通用接口以及其遍历类
publicinterfaceIScheduler:IEnumerable<Task>
    {
       voidAdd(Taskt);
       voidRemove(Taskt);

       intCount{get; }
       Taskthis[intindex] {get;set; }
    }

   publicclassSchedulerEnumerator:IEnumerator<Task>
    {
       privateIScheduler_collection;
       privateint_currentIndex;
       privateTask_currentTask;


       publicSchedulerEnumerator(ISchedulercollection)
        {
           _collection=collection;
           _currentIndex= -1;
           _currentTask=default(Task);
        }

       publicboolMoveNext()
        {
           //Avoids going beyond the end of the collection.
           if(++_currentIndex>=_collection.Count)
            {
               returnfalse;
            }
           else
            {
               // Set current box to next item in collection.
               _currentTask=_collection[_currentIndex];
            }
           returntrue;
        }

       publicvoidReset() {_currentIndex= -1; }

       voidIDisposable.Dispose() { }

       publicTaskCurrent
        {
           get{return_currentTask; }
        }


       objectIEnumerator.Current
        {
           get{returnCurrent; }
        }

    }


实现我们自己的任务调度类模板,可以通过T传递我们想要的队列类型
 publicclassTaskSchedulerBase<T> :TaskScheduler
       whereT :IScheduler,new()

    {
       privateThread_processThread;
       privatereadonlyobject_lock =newobject();

       publicTaskSchedulerBase()
        {
            _processThread =newThread(this.Process);
        }

       privatevoidProcess()
        {
           lock(_lock)
            {
               vartasks = GetScheduledTasks();
               if(null!= tasks)
                {
                   foreach(vartintasks)
                    {
                        TryExecuteTask(t);

                        TryDequeue(t);
                    }
                }
            }
        }

       protectedoverridevoidQueueTask(Tasktask)
        {
           lock(_lock)
            {
                Scheduler.Add(task);

               if(_processThread.ThreadState.Equals(ThreadState.Stopped))
                {
                    _processThread =newThread(Process);
                }

               if(!_processThread.IsAlive
                    && !_processThread.ThreadState.Equals(ThreadState.Running))
                {
                   try
                    {
                        _processThread.Start();
                    }
                   catch(System.Exception)
                    {
                       if(!_processThread.ThreadState.Equals(ThreadState.Running))
                        {
                            _processThread =newThread(Process);
                            _processThread.Start();
                        }
                    }
                }
            }
        }

       protectedoverrideboolTryDequeue(Tasktask)
        {
            Scheduler.Remove(task);

           returntrue;
        }

       protectedoverrideIEnumerable<Task> GetScheduledTasks()
        {
           returnScheduler.ToArray();
        }

       protectedoverrideboolTryExecuteTaskInline(Tasktask,booltaskWasPreviouslyQueued)
        {
           if(taskWasPreviouslyQueued)
            {
               if(TryDequeue(task))
                {
                   returnbase.TryExecuteTask(task);
                }
               else
                {
                   returnfalse;
                }
            }
           else
            {
               returnbase.TryExecuteTask(task);
            }
        }

       privatereadonlyT _scheduler =newT();
       publicT Scheduler
        {
           get
            {
               return_scheduler;
            }
        }
    }

实现4种队列
     1.FIFO
     
 publicclassQueueScheduler:IScheduler
    {
       protectedQueue<Task>_queue;

       publicQueueScheduler()
        {
           _queue=newQueue<Task>();
        }

       publicvoidAdd(Taskt)
        {
           if(!Contains(t))
            {
               _queue.Enqueue(t);
            }
        }

       publicvoidRemove(Taskt)
        {
           _queue.Dequeue();
        }

       publicboolContains(Taskt)
        {
           boolfound=false;

           foreach(vartaskin_queue)
            {
               if(t.AsyncState!=null&&t.AsyncState.Equals(task.AsyncState))
                {
                   found=true;
                   break;
                }
            }

           returnfound;
        }

       publicboolContains(Taskt,EqualityComparer<Task>comp)
        {
           thrownewNotImplementedException();
        }

       publicIEnumerator<Task>GetEnumerator()
        {
           returnnewSchedulerEnumerator(this);
        }

       IEnumeratorIEnumerable.GetEnumerator()
        {
           returnnewSchedulerEnumerator(this);
        }

       publicintCount
        {
           get{return_queue.Count; }
        }

       publicTaskthis[intindex]
        {
           get{return(Task)_queue.ToArray()[index]; }
           set{_queue.ToArray()[index] = value; }
        }
    }

     2.LIFO
publicclassStackScheduler:IScheduler
    {
       protectedStack<Task>_stack;

       publicStackScheduler()
        {
           _stack=newStack<Task>();
        }

       publicvoidAdd(Taskt)
        {
           if(!Contains(t))
            {
               _stack.Push(t);
            }
        }

       publicvoidRemove(Taskt)
        {
           _stack.Pop();
        }

       publicboolContains(Taskt)
        {
           boolfound=false;

           foreach(vartaskin_stack)
            {
               if(t.AsyncState!=null&&t.AsyncState.Equals(task.AsyncState))
                {
                   found=true;
                   break;
                }
            }

           returnfound;
        }

       publicboolContains(Taskt,EqualityComparer<Task>comp)
        {
           thrownewNotImplementedException();
        }

       publicIEnumerator<Task>GetEnumerator()
        {
           returnnewSchedulerEnumerator(this);
        }

       IEnumeratorIEnumerable.GetEnumerator()
        {
           returnnewSchedulerEnumerator(this);
        }

       publicintCount
        {
           get{return_stack.Count; }
        }

       publicTaskthis[intindex]
        {
           get{return(Task)_stack.ToArray()[index]; }
           set{_stack.ToArray()[index] = value; }
        }
    }

     3.Dynamic FIFO

publicclassDynamicQueueScheduler:IScheduler
    {
       protectedList<Task>_queue;

       publicDynamicQueueScheduler()
        {
           _queue=newList<Task>();
        }

       publicvirtualvoidAdd(Taskt)
        {
           TaskoldTask=null;
           if(Contains(t,outoldTask))
            {
               _queue.Remove(oldTask);
            }
  
           _queue.Add(t);
        }

       publicvirtualvoidRemove(Taskt)
        {
           _queue.Remove(t);
        }

       publicvirtualboolContains(Taskt)
        {
           TaskoldTask=null;
           boolfound=Contains(t,outoldTask);
           returnfound;
        }

       publicvirtualboolContains(Taskt,outTaskoldTask)
        {
           boolfound=false;
           oldTask=null;

           foreach(vartaskin_queue)
            {
               if(t.AsyncState!=null&&t.AsyncState.Equals(task.AsyncState))
                {
                   oldTask=task;
                   found=true;

                   break;
                }
            }

           returnfound;
        }

       publicvirtualboolContains(Taskt,EqualityComparer<Task>comp)
        {
           thrownewNotImplementedException();
        }

       publicIEnumerator<Task>GetEnumerator()
        {
           returnnewSchedulerEnumerator(this);
        }

       IEnumeratorIEnumerable.GetEnumerator()
        {
           returnnewSchedulerEnumerator(this);
        }

       publicintCount
        {
           get{return_queue.Count; }
        }

       publicTaskthis[intindex]
        {
           get{return(Task)_queue[index]; }
           set{_queue[index] =value; }
        }
    }

     4.Dynamic LIFO
 publicclassDynamicStackScheduler:IScheduler
    {
       protectedList<Task>_queue;

       publicDynamicStackScheduler()
        {
           _queue=newList<Task>();
        }

       publicvoidAdd(Taskt)
        {
           TaskoldTask=null;
           if(Contains(t,outoldTask))
            {
               _queue.Remove(oldTask);
            }
  
           _queue.Insert(0,t);
        }

       publicvoidRemove(Taskt)
        {
           _queue.Remove(t);
        }

       publicboolContains(Taskt)
        {
           TaskoldTask=null;
           boolfound=Contains(t,outoldTask);
           returnfound;
        }

       publicboolContains(Taskt,outTaskoldTask)
        {
           boolfound=false;
           oldTask=null;

           foreach(vartaskin_queue)
            {
               if(t.AsyncState!=null&&t.AsyncState.Equals(task.AsyncState))
                {
                   oldTask=task;
                   found=true;

                   break;
                }
            }

           returnfound;
        }

       publicboolContains(Taskt,EqualityComparer<Task>comp)
        {
           thrownewNotImplementedException();
        }

       publicIEnumerator<Task>GetEnumerator()
        {
           returnnewSchedulerEnumerator(this);
        }

       IEnumeratorIEnumerable.GetEnumerator()
        {
           returnnewSchedulerEnumerator(this);
        }

       publicintCount
        {
           get{return_queue.Count; }
        }

       publicTaskthis[intindex]
        {
           get{return(Task)_queue[index]; }
           set{_queue[index] =value; }
        }
    }


测试代码
  classProgram
    {
       staticQueue<int>_queue=newQueue<int>();

       //static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<QueueScheduler>());
       //static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<StackScheduler>());
       //static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<DynamicQueueScheduler>());
       //static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<DynamicStackScheduler>());
       staticTaskFactory_factory=newTaskFactory(newTaskSchedulerBase<DynamicQueueScheduler>());
       staticvoidMain(string[]args)
        {
           varthread1=newThread(Producer);
           varthread2=newThread(Consumer);

           thread1.Start();
           thread2.Start();

           Console.ReadKey();
        }

       staticvoidProducer()
        {
           for(inti= 0;i< 7;i++)
            {
               _queue.Enqueue(i);
            }

           _queue.Enqueue(3);
           _queue.Enqueue(2);
        }

       staticvoidConsumer()
        {
           while(true)
            {
               if(_queue.Count> 0)
                {
                   foreach(variin_queue)
                    {
                       _factory.StartNew((s) =>
                        {
                           Console.Write("{0}on thread{1}{2}\n", s, Thread.CurrentThread.ManagedThreadId,
                                         DateTime.Now.ToLongTimeString());
                        },i);
                    }

                   _queue.Clear();
                }
               else
                {
                   Thread.Sleep(1);
                }
            }
        }
    }