首页 > 代码库 > 线程队列(版本1+版本2)
线程队列(版本1+版本2)
版本1. using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace ClassLibrary1 { public class BaseThread { public event Action<object> ThreadAction; public event Action<object> CompeletedAction; public object Parameter { get; set; } public void Run() { ThreadAction?.Invoke(Parameter); CompeletedAction?.Invoke(Parameter); } } public class ThreadQueue { public readonly object lockObj = new object(); public Queue<BaseThread> Queue; public ThreadQueue() { Queue = new Queue<BaseThread>(); } public ThreadQueue(int poolSize) { Queue = new Queue<BaseThread>(poolSize); } public bool IsEmpty { get { return Queue.Count == 0; } } public void AddQueueItem(BaseThread item) { Queue.Enqueue(item); ThreadPool.QueueUserWorkItem(DoWork, null); } private void DoWork(object state) { while (true) { BaseThread item = null; lock (lockObj) { if (Queue.Count > 0) item = Queue.Dequeue(); else { return; } } ThreadPool.QueueUserWorkItem(obj => { (obj as BaseThread).Run(); }, item); } } } public class Program { public static void Main() { ThreadQueue workQueue = new ThreadQueue(100); for (int i = 1; i<=100; i++) { BaseThread th = new BaseThread(); th.ThreadAction += a => { }; th.CompeletedAction += b => { Console.Write("队列a执行完成:" + b.ToString() + "\r\n"); }; th.Parameter = i; workQueue.AddQueueItem(th); } while (!workQueue.IsEmpty) { Console.Write("队列正在执行"); Thread.Sleep(1000); } Console.Write("队列执行完必"); Console.ReadKey(); } } } 2.版本2 using System.Collections.Generic; using System; using System.Threading; /***********多线程的工作队列*************** * 此工作队列保证线程安全性 * * * * * *******/ namespace WorkQueue { public delegate void RunThread(object threaParameter); public delegate void QueueException(Exception ex); /// <summary> /// 队列用到的实体 /// </summary> public class QueueThreadEntity { /// <summary> /// 队列中每个元素执行的方法 /// </summary> public event RunThread StartThreadHandle; /// <summary> /// 每个元素执行完毕事件 /// </summary> public event RunThread EndThreadHandle; /// <summary> /// 执行异常回调函数 /// </summary> public event QueueException EndExceptionHandle; /// <summary> /// 执行异常函数 /// </summary> public event QueueException StartExceptionHandle; /// <summary> /// 线程参数 /// </summary> public object ThreaParameter { get; set; } internal void Run() { try { StartThreadHandle(ThreaParameter); } catch (Exception ex) { if (StartExceptionHandle != null) { StartExceptionHandle(ex); } else { throw ex; } } if (EndThreadHandle != null) { try { EndThreadHandle(ThreaParameter); } catch (Exception ex) { if (EndExceptionHandle != null) { EndExceptionHandle(ex); } else { throw ex; } } } } } public class WorkThreadQueue { WorkQueue<QueueThreadEntity> wk = null; /// <summary> /// /// </summary> /// <param name="poolSize">线程池的大小</param> public WorkThreadQueue(int poolSize) { wk = new WorkQueue<QueueThreadEntity>(poolSize); wk.UserWork += workQueue_UserWork; } public void AddQueue(QueueThreadEntity entity) { wk.EnqueueItem(entity); } private void workQueue_UserWork(object sender, WorkQueue<QueueThreadEntity>.EnqueueEventArgs e) { var item = e.Item; item.Run(); } /// <summary> /// 队列处理是否需要单线程顺序执行 /// ture表示单线程处理队列的T对象 /// 默认为false,表明按照顺序出队,但是多线程处理item /// *****注意不要频繁改变此项**** /// </summary> public bool IsOneThread { get { return wk.IsOneThread; } set { wk.IsOneThread = value; } } /// <summary> /// 判断队列是否为空 /// </summary> /// <returns></returns> public bool IsEmpty() { return wk.IsEmpty(); } } public delegate void UserWorkEventHandler<T>(object sender, WorkQueue<T>.EnqueueEventArgs e); public class WorkQueue<T> { private bool IsWorking; //表明处理线程是否正在工作 private object lockIsWorking = new object();//对IsWorking的同步对象 private Queue<T> queue; //实际的队列 private object lockObj = new object(); //队列同步对象 /// <summary> /// 绑定用户需要对队列中的item对象 /// 施加的操作的事件 /// </summary> public event UserWorkEventHandler<T> UserWork; public WorkQueue(int n) { queue = new Queue<T>(n); } public WorkQueue() { queue = new Queue<T>(); } /// <summary> /// 谨慎使用此函数, /// 只保证此瞬间,队列值为空 /// </summary> /// <returns></returns> public bool IsEmpty() { lock (lockObj) { return queue.Count == 0; } } private bool isOneThread; /// <summary> /// 队列处理是否需要单线程顺序执行 /// ture表示单线程处理队列的T对象 /// 默认为false,表明按照顺序出队,但是多线程处理item /// *****注意不要频繁改变此项**** /// </summary> public bool IsOneThread { get { return isOneThread; } set { isOneThread = value; } } /// <summary> /// 向工作队列添加对象, /// 对象添加以后,如果已经绑定工作的事件 /// 会触发事件处理程序,对item对象进行处理 /// </summary> /// <param name="item">添加到队列的对象</param> public void EnqueueItem(T item) { lock (lockObj) { queue.Enqueue(item); } lock (lockIsWorking) { //if (!IsWorking) { //IsWorking = true; ThreadPool.QueueUserWorkItem(doUserWork); } } } /// <summary> /// 处理队列中对象的函数 /// </summary> /// <param name="o"></param> private void doUserWork(object o) { try { T item; while (true) { lock (lockObj) { if (queue.Count > 0) { item = queue.Dequeue(); } else { return; } } if (!item.Equals(default(T))) { if (isOneThread) { UserWork?.Invoke(this, new EnqueueEventArgs(item)); } else { int i = 0; ThreadPool.QueueUserWorkItem(obj => { UserWork?.Invoke(this, new EnqueueEventArgs(obj)); }, item); } } } } finally { lock (lockIsWorking) { IsWorking = false; } } } /// <summary> /// UserWork事件的参数,包含item对象 /// </summary> public class EnqueueEventArgs : EventArgs { public T Item { get; private set; } public EnqueueEventArgs(object item) { try { Item = (T)item; } catch (Exception) { throw new InvalidCastException("object to T 转换失败"); } } } } } using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using WorkQueue; namespace ConsoleApplication47 { class Program { static void Main(string[] args) {//实例并设置线程池的大小 WorkThreadQueue queue = new WorkThreadQueue(10); for (var i = 1; i <= 100; i++) { //队列中的线程对象 QueueThreadEntity entity = new QueueThreadEntity(); entity.ThreaParameter = i; //队列中每个元素执行的函数 entity.StartThreadHandle += (a) => { //Console.Write("当前参数为:" + a.ToString() + "\r\n"); }; //队列中每个元素执行完毕的函数 entity.EndThreadHandle += (a) => { Console.Write("队列a执行完成:" + a.ToString() + "\r\n"); }; //将线程插入到队列 queue.AddQueue(entity); } while (!queue.IsEmpty()) { Console.Write("队列正在执行"); Thread.Sleep(1000); } Console.Write("队列执行完必"); Console.ReadKey(); } } }
线程队列(版本1+版本2)
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。