首页 > 代码库 > 线程队列(版本1+版本2)

线程队列(版本1+版本2)

版本1.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ClassLibrary1
{
    public class BaseThread
    {
        public event Action<object> ThreadAction;
        public event Action<object> CompeletedAction;
        public object Parameter { get; set; }
        public void Run()
        {
            ThreadAction?.Invoke(Parameter);
            CompeletedAction?.Invoke(Parameter);
        }

    }
    public class ThreadQueue
    {
        public readonly object lockObj = new object();
        public Queue<BaseThread> Queue;
        public ThreadQueue()
        {
            Queue = new Queue<BaseThread>();
        }
        public ThreadQueue(int poolSize)
        {
            Queue = new Queue<BaseThread>(poolSize);
        }

        public bool IsEmpty
        {
            get
            {
                return Queue.Count == 0;
            }
        }
        public void AddQueueItem(BaseThread item)
        {
            Queue.Enqueue(item);
            ThreadPool.QueueUserWorkItem(DoWork, null);
        }

        private void DoWork(object state)
        {
            while (true)
            {
                BaseThread item = null;

                lock (lockObj)
                {


                    if (Queue.Count > 0)
                        item = Queue.Dequeue();

                    else
                    {
                        return;
                    }
                }
                ThreadPool.QueueUserWorkItem(obj =>
                {
                    (obj as BaseThread).Run();
                }, item);


            }
        }

    }
    public class Program
    {

        public static void Main()
        {
            ThreadQueue workQueue = new ThreadQueue(100);
            for (int i = 1; i<=100; i++)
            {
                BaseThread th = new BaseThread();
                th.ThreadAction += a =>
                {

                };
                th.CompeletedAction += b =>
                {
                    Console.Write("队列a执行完成:" + b.ToString() + "\r\n");
                };
                th.Parameter = i;
                workQueue.AddQueueItem(th);
            }
            while (!workQueue.IsEmpty)
            {
                Console.Write("队列正在执行");
                Thread.Sleep(1000);
            }
            Console.Write("队列执行完必");
            Console.ReadKey();
        }
    }
}
2.版本2
using System.Collections.Generic;
using System;
using System.Threading;

/***********多线程的工作队列***************
 * 此工作队列保证线程安全性
 * 
 * 
 * 
 * 
 * *******/
namespace WorkQueue
{
    public delegate void RunThread(object threaParameter);
    public delegate void QueueException(Exception ex);
    /// <summary>
    /// 队列用到的实体
    /// </summary>
    public class QueueThreadEntity
    {
        /// <summary>
        /// 队列中每个元素执行的方法
        /// </summary>
        public event RunThread StartThreadHandle;
        /// <summary>
        /// 每个元素执行完毕事件
        /// </summary>
        public event RunThread EndThreadHandle;
        /// <summary>
        /// 执行异常回调函数
        /// </summary>
        public event QueueException EndExceptionHandle;
        /// <summary>
        /// 执行异常函数
        /// </summary>
        public event QueueException StartExceptionHandle;


        /// <summary>
        /// 线程参数
        /// </summary>
        public object ThreaParameter
        {
            get;
            set;
        }
        internal void Run()
        {
            try
            {
                StartThreadHandle(ThreaParameter);
            }
            catch (Exception ex)
            {
                if (StartExceptionHandle != null)
                {
                    StartExceptionHandle(ex);
                }
                else
                {
                    throw ex;
                }
            }
            if (EndThreadHandle != null)
            {
                try
                {
                    EndThreadHandle(ThreaParameter);
                }
                catch (Exception ex)
                {
                    if (EndExceptionHandle != null)
                    {
                        EndExceptionHandle(ex);
                    }
                    else
                    {
                        throw ex;
                    }
                }
            }
        }
    }
    public class WorkThreadQueue
    {
        WorkQueue<QueueThreadEntity> wk = null;
        /// <summary>
        /// 
        /// </summary>
        /// <param name="poolSize">线程池的大小</param>
        public WorkThreadQueue(int poolSize)
        {
            wk = new WorkQueue<QueueThreadEntity>(poolSize);
            wk.UserWork += workQueue_UserWork;
        }
        public void AddQueue(QueueThreadEntity entity)
        {
            wk.EnqueueItem(entity);
        }
        private void workQueue_UserWork(object sender, WorkQueue<QueueThreadEntity>.EnqueueEventArgs e)
        {
            var item = e.Item;
            item.Run();
        }
        /// <summary>
        /// 队列处理是否需要单线程顺序执行
        /// ture表示单线程处理队列的T对象
        /// 默认为false,表明按照顺序出队,但是多线程处理item
        /// *****注意不要频繁改变此项****
        /// </summary>
        public bool IsOneThread
        {

            get
            {
                return wk.IsOneThread;
            }
            set
            {
                wk.IsOneThread = value;
            }
        }
        /// <summary>
        /// 判断队列是否为空
        /// </summary>
        /// <returns></returns>
        public bool IsEmpty()
        {
            return wk.IsEmpty();
        }

    }
    public delegate void UserWorkEventHandler<T>(object sender, WorkQueue<T>.EnqueueEventArgs e);
    public class WorkQueue<T>
    {
        private bool IsWorking; //表明处理线程是否正在工作
        private object lockIsWorking = new object();//对IsWorking的同步对象


        private Queue<T> queue; //实际的队列
        private object lockObj = new object(); //队列同步对象

        /// <summary>
        /// 绑定用户需要对队列中的item对象
        /// 施加的操作的事件
        /// </summary>
        public event UserWorkEventHandler<T> UserWork;

        public WorkQueue(int n)
        {
            queue = new Queue<T>(n);
        }

        public WorkQueue()
        {
            queue = new Queue<T>();
        }

        /// <summary>
        /// 谨慎使用此函数,
        /// 只保证此瞬间,队列值为空
        /// </summary>
        /// <returns></returns>
        public bool IsEmpty()
        {
            lock (lockObj)
            {
                return queue.Count == 0;
            }
        }

        private bool isOneThread;

        /// <summary>
        /// 队列处理是否需要单线程顺序执行
        /// ture表示单线程处理队列的T对象
        /// 默认为false,表明按照顺序出队,但是多线程处理item
        /// *****注意不要频繁改变此项****
        /// </summary>
        public bool IsOneThread
        {
            get
            {
                return isOneThread;
            }
            set
            {
                isOneThread = value;
            }

        }



        /// <summary>
        /// 向工作队列添加对象,
        /// 对象添加以后,如果已经绑定工作的事件
        /// 会触发事件处理程序,对item对象进行处理
        /// </summary>
        /// <param name="item">添加到队列的对象</param>
        public void EnqueueItem(T item)
        {
            lock (lockObj)
            {
                queue.Enqueue(item);
            }

            lock (lockIsWorking)
            {
                //if (!IsWorking)
                {
                    //IsWorking = true;
                    ThreadPool.QueueUserWorkItem(doUserWork);
                }
            }
        }
        /// <summary>
        /// 处理队列中对象的函数
        /// </summary>
        /// <param name="o"></param>
        private void doUserWork(object o)
        {
            try
            {
                T item;

                while (true)
                {
                    lock (lockObj)
                    {
                        if (queue.Count > 0)
                        {
                            item = queue.Dequeue();
                        }
                        else
                        {
                            return;
                        }
                    }
                    if (!item.Equals(default(T)))
                    {

                        if (isOneThread)
                        {
                            UserWork?.Invoke(this, new EnqueueEventArgs(item));
                        }
                        else
                        {
                            int i = 0;
                            ThreadPool.QueueUserWorkItem(obj =>
                            {
                                UserWork?.Invoke(this, new EnqueueEventArgs(obj));
                            }, item);
                        }


                    }

                }
            }
            finally
            {
                lock (lockIsWorking)
                {
                    IsWorking = false;
                }

            }
        }

        /// <summary>
        /// UserWork事件的参数,包含item对象
        /// </summary>
        public class EnqueueEventArgs : EventArgs
        {
            public T Item { get; private set; }
            public EnqueueEventArgs(object item)
            {
                try
                {
                    Item = (T)item;
                }
                catch (Exception)
                {

                    throw new InvalidCastException("object to T 转换失败");
                }
            }
        }
    }
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using WorkQueue;

namespace ConsoleApplication47
{
    class Program
    {
        static void Main(string[] args)
        {//实例并设置线程池的大小
            WorkThreadQueue queue = new WorkThreadQueue(10);

            for (var i = 1; i <= 100; i++)
            {
                //队列中的线程对象
                QueueThreadEntity entity = new QueueThreadEntity();
                entity.ThreaParameter = i;
                //队列中每个元素执行的函数
                entity.StartThreadHandle += (a) =>
                {
                    
                    //Console.Write("当前参数为:" + a.ToString() + "\r\n");
                };
                //队列中每个元素执行完毕的函数
                entity.EndThreadHandle += (a) =>
                {
                    Console.Write("队列a执行完成:" + a.ToString() + "\r\n");
                };
                //将线程插入到队列
                queue.AddQueue(entity);
            }
            while (!queue.IsEmpty())
            {
                Console.Write("队列正在执行");
                Thread.Sleep(1000);
            }
            Console.Write("队列执行完必");


            Console.ReadKey();
        }
    }
}

 

线程队列(版本1+版本2)