首页 > 代码库 > 利用TaskScheduler处理Queue、Stack等类型的操作队列(生产者消费者场景)
利用TaskScheduler处理Queue、Stack等类型的操作队列(生产者消费者场景)
我们经常会遇到生产者消费者模式,比如前端各种UI操作事件触发后台逻辑等。在这种典型的应用场景中,我们可能会有4个业务处理逻辑(下文以P代表生产者,C代表消费者):
1. FIFO(先进先出)
P产生1,2,3,4,5,6,3,2
C处理顺序应为1,2,3,4,5,6,3,2
2.LIFO(后进先出)
P产生1,2,3,4,5,6,3,2
C处理顺序应为2,3,6,5,4,3,2,1
3.Dynamic FIFO(我定义为:去掉相同数据的FIFO, 如果产生的数据队列里已经有相同数据,后进的数据优先级高)
P产生1,2,3,4,5,6,3,2
C处理顺序为1,4,5,6,3,2
4.Dynamic LIFO(我定义为:去掉相同数据的LIFO, 如果产生的数据栈里已经有相同数据,后进的数据优先级高)
P产生1,2,3,4,5,6,3,2
C处理顺序为2,3,6,5,4,1
1,2情况为基本处理逻辑,3,4可能和我们实际场景有关系(包括:判断相同的逻辑可能不同、已存在和后续数据哪个优先级高)
C#中有个Task类进行异步操作,我们可以通过TaskScheduler类进行任务调度,实现上述的4种基本场景。
定义上述4种场景的通用接口以及其遍历类
publicinterfaceIScheduler:IEnumerable<Task>
{
voidAdd(Taskt);
voidRemove(Taskt);
intCount{get; }
Taskthis[intindex] {get;set; }
}
publicclassSchedulerEnumerator:IEnumerator<Task>
{
privateIScheduler_collection;
privateint_currentIndex;
privateTask_currentTask;
publicSchedulerEnumerator(ISchedulercollection)
{
_collection=collection;
_currentIndex= -1;
_currentTask=default(Task);
}
publicboolMoveNext()
{
//Avoids going beyond the end of the collection.
if(++_currentIndex>=_collection.Count)
{
returnfalse;
}
else
{
// Set current box to next item in collection.
_currentTask=_collection[_currentIndex];
}
returntrue;
}
publicvoidReset() {_currentIndex= -1; }
voidIDisposable.Dispose() { }
publicTaskCurrent
{
get{return_currentTask; }
}
objectIEnumerator.Current
{
get{returnCurrent; }
}
}
实现我们自己的任务调度类模板,可以通过T传递我们想要的队列类型
publicclassTaskSchedulerBase<T> :TaskScheduler
whereT :IScheduler,new()
{
privateThread_processThread;
privatereadonlyobject_lock =newobject();
publicTaskSchedulerBase()
{
_processThread =newThread(this.Process);
}
privatevoidProcess()
{
lock(_lock)
{
vartasks = GetScheduledTasks();
if(null!= tasks)
{
foreach(vartintasks)
{
TryExecuteTask(t);
TryDequeue(t);
}
}
}
}
protectedoverridevoidQueueTask(Tasktask)
{
lock(_lock)
{
Scheduler.Add(task);
if(_processThread.ThreadState.Equals(ThreadState.Stopped))
{
_processThread =newThread(Process);
}
if(!_processThread.IsAlive
&& !_processThread.ThreadState.Equals(ThreadState.Running))
{
try
{
_processThread.Start();
}
catch(System.Exception)
{
if(!_processThread.ThreadState.Equals(ThreadState.Running))
{
_processThread =newThread(Process);
_processThread.Start();
}
}
}
}
}
protectedoverrideboolTryDequeue(Tasktask)
{
Scheduler.Remove(task);
returntrue;
}
protectedoverrideIEnumerable<Task> GetScheduledTasks()
{
returnScheduler.ToArray();
}
protectedoverrideboolTryExecuteTaskInline(Tasktask,booltaskWasPreviouslyQueued)
{
if(taskWasPreviouslyQueued)
{
if(TryDequeue(task))
{
returnbase.TryExecuteTask(task);
}
else
{
returnfalse;
}
}
else
{
returnbase.TryExecuteTask(task);
}
}
privatereadonlyT _scheduler =newT();
publicT Scheduler
{
get
{
return_scheduler;
}
}
}
实现4种队列
1.FIFO
publicclassQueueScheduler:IScheduler
{
protectedQueue<Task>_queue;
publicQueueScheduler()
{
_queue=newQueue<Task>();
}
publicvoidAdd(Taskt)
{
if(!Contains(t))
{
_queue.Enqueue(t);
}
}
publicvoidRemove(Taskt)
{
_queue.Dequeue();
}
publicboolContains(Taskt)
{
boolfound=false;
foreach(vartaskin_queue)
{
if(t.AsyncState!=null&&t.AsyncState.Equals(task.AsyncState))
{
found=true;
break;
}
}
returnfound;
}
publicboolContains(Taskt,EqualityComparer<Task>comp)
{
thrownewNotImplementedException();
}
publicIEnumerator<Task>GetEnumerator()
{
returnnewSchedulerEnumerator(this);
}
IEnumeratorIEnumerable.GetEnumerator()
{
returnnewSchedulerEnumerator(this);
}
publicintCount
{
get{return_queue.Count; }
}
publicTaskthis[intindex]
{
get{return(Task)_queue.ToArray()[index]; }
set{_queue.ToArray()[index] = value; }
}
}
2.LIFO
publicclassStackScheduler:IScheduler
{
protectedStack<Task>_stack;
publicStackScheduler()
{
_stack=newStack<Task>();
}
publicvoidAdd(Taskt)
{
if(!Contains(t))
{
_stack.Push(t);
}
}
publicvoidRemove(Taskt)
{
_stack.Pop();
}
publicboolContains(Taskt)
{
boolfound=false;
foreach(vartaskin_stack)
{
if(t.AsyncState!=null&&t.AsyncState.Equals(task.AsyncState))
{
found=true;
break;
}
}
returnfound;
}
publicboolContains(Taskt,EqualityComparer<Task>comp)
{
thrownewNotImplementedException();
}
publicIEnumerator<Task>GetEnumerator()
{
returnnewSchedulerEnumerator(this);
}
IEnumeratorIEnumerable.GetEnumerator()
{
returnnewSchedulerEnumerator(this);
}
publicintCount
{
get{return_stack.Count; }
}
publicTaskthis[intindex]
{
get{return(Task)_stack.ToArray()[index]; }
set{_stack.ToArray()[index] = value; }
}
}
3.Dynamic FIFO
publicclassDynamicQueueScheduler:IScheduler
{
protectedList<Task>_queue;
publicDynamicQueueScheduler()
{
_queue=newList<Task>();
}
publicvirtualvoidAdd(Taskt)
{
TaskoldTask=null;
if(Contains(t,outoldTask))
{
_queue.Remove(oldTask);
}
_queue.Add(t);
}
publicvirtualvoidRemove(Taskt)
{
_queue.Remove(t);
}
publicvirtualboolContains(Taskt)
{
TaskoldTask=null;
boolfound=Contains(t,outoldTask);
returnfound;
}
publicvirtualboolContains(Taskt,outTaskoldTask)
{
boolfound=false;
oldTask=null;
foreach(vartaskin_queue)
{
if(t.AsyncState!=null&&t.AsyncState.Equals(task.AsyncState))
{
oldTask=task;
found=true;
break;
}
}
returnfound;
}
publicvirtualboolContains(Taskt,EqualityComparer<Task>comp)
{
thrownewNotImplementedException();
}
publicIEnumerator<Task>GetEnumerator()
{
returnnewSchedulerEnumerator(this);
}
IEnumeratorIEnumerable.GetEnumerator()
{
returnnewSchedulerEnumerator(this);
}
publicintCount
{
get{return_queue.Count; }
}
publicTaskthis[intindex]
{
get{return(Task)_queue[index]; }
set{_queue[index] =value; }
}
}
4.Dynamic LIFO
publicclassDynamicStackScheduler:IScheduler
{
protectedList<Task>_queue;
publicDynamicStackScheduler()
{
_queue=newList<Task>();
}
publicvoidAdd(Taskt)
{
TaskoldTask=null;
if(Contains(t,outoldTask))
{
_queue.Remove(oldTask);
}
_queue.Insert(0,t);
}
publicvoidRemove(Taskt)
{
_queue.Remove(t);
}
publicboolContains(Taskt)
{
TaskoldTask=null;
boolfound=Contains(t,outoldTask);
returnfound;
}
publicboolContains(Taskt,outTaskoldTask)
{
boolfound=false;
oldTask=null;
foreach(vartaskin_queue)
{
if(t.AsyncState!=null&&t.AsyncState.Equals(task.AsyncState))
{
oldTask=task;
found=true;
break;
}
}
returnfound;
}
publicboolContains(Taskt,EqualityComparer<Task>comp)
{
thrownewNotImplementedException();
}
publicIEnumerator<Task>GetEnumerator()
{
returnnewSchedulerEnumerator(this);
}
IEnumeratorIEnumerable.GetEnumerator()
{
returnnewSchedulerEnumerator(this);
}
publicintCount
{
get{return_queue.Count; }
}
publicTaskthis[intindex]
{
get{return(Task)_queue[index]; }
set{_queue[index] =value; }
}
}
测试代码
classProgram
{
staticQueue<int>_queue=newQueue<int>();
//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<QueueScheduler>());
//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<StackScheduler>());
//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<DynamicQueueScheduler>());
//static TaskFactory _factory = new TaskFactory(new TaskSchedulerBase<DynamicStackScheduler>());
staticTaskFactory_factory=newTaskFactory(newTaskSchedulerBase<DynamicQueueScheduler>());
staticvoidMain(string[]args)
{
varthread1=newThread(Producer);
varthread2=newThread(Consumer);
thread1.Start();
thread2.Start();
Console.ReadKey();
}
staticvoidProducer()
{
for(inti= 0;i< 7;i++)
{
_queue.Enqueue(i);
}
_queue.Enqueue(3);
_queue.Enqueue(2);
}
staticvoidConsumer()
{
while(true)
{
if(_queue.Count> 0)
{
foreach(variin_queue)
{
_factory.StartNew((s) =>
{
Console.Write("{0}on thread{1}{2}\n", s, Thread.CurrentThread.ManagedThreadId,
DateTime.Now.ToLongTimeString());
},i);
}
_queue.Clear();
}
else
{
Thread.Sleep(1);
}
}
}
}
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。