首页 > 代码库 > 一个C#多线程的工作队列

一个C#多线程的工作队列

多线程添加元素到队列中,队列根据绑定

的事件进行自动处理,可以设置WorkSequential属性来实现对队列处理的单线程(严格顺序处理)或者多线程处理(循序出队,但是

多线程处理,不保证对队列元素的处理顺利)的选择。

代码/***********多线程的工作队列*************** * 此工作队列保证线程安全性 *  *  *  *  * *******/namespace WorkQueue{    using System.Collections.Generic;    using System;    using System.Threading;    public delegate void UserWorkEventHandler<T>(object sender, WorkQueue<T>.EnqueueEventArgs e);    public class WorkQueue<T>    {        private bool IsWorking; //表明处理线程是否正在工作        private object lockIsWorking = new object();//对IsWorking的同步对象        private Queue<T> queue; //实际的队列        private object lockObj = new object(); //队列同步对象               /// <summary>        /// 绑定用户需要对队列中的item对象        /// 施加的操作的事件        /// </summary>        public event UserWorkEventHandler<T> UserWork;        public WorkQueue(int n)        {            queue = new Queue<T>(n);        }        public WorkQueue()        {            queue = new Queue<T>();        }        /// <summary>        /// 谨慎使用此函数,        /// 只保证此瞬间,队列值为空        /// </summary>        /// <returns></returns>        public bool IsEmpty()        {            lock (lockObj)            {                return queue.Count == 0;            }        }        private bool isOneThread;        /// <summary>        /// 队列处理是否需要单线程顺序执行        /// ture表示单线程处理队列的T对象        /// 默认为false,表明按照顺序出队,但是多线程处理item        /// *****注意不要频繁改变此项****        /// </summary>        public bool WorkSequential        {            get            {                return isOneThread;            }            set            {                isOneThread = value;            }        }        /// <summary>        /// 向工作队列添加对象,        /// 对象添加以后,如果已经绑定工作的事件        /// 会触发事件处理程序,对item对象进行处理        /// </summary>        /// <param name="item">添加到队列的对象</param>        public void EnqueueItem(T item)        {            lock (lockObj)            {                queue.Enqueue(item);            }            lock (lockIsWorking)            {                if (!IsWorking)                {                    IsWorking = true;                    ThreadPool.QueueUserWorkItem(doUserWork);                }            }        }        /// <summary>        /// 处理队列中对象的函数        /// </summary>        /// <param name="o"></param>        private void doUserWork(object o)        {            try            {                T item;                while (true)                {                    lock (lockObj)                    {                        if (queue.Count > 0)                        {                            item = queue.Dequeue();                        }                        else                        {                            return;                        }                    }                    if (!item.Equals(default(T)))                    {                        if (isOneThread)                        {                            if (UserWork != null)                            {                                UserWork(this, new EnqueueEventArgs(item));                            }                        }                        else                        {                            ThreadPool.QueueUserWorkItem(obj =>                                                             {                                                                 if (UserWork != null)                                                                 {                                                                     UserWork(this, new EnqueueEventArgs(obj));                                                                 }                                                             }, item);                        }                                            }                }            }            finally            {                lock (lockIsWorking)                {                    IsWorking = false;                }            }        }        /// <summary>        /// UserWork事件的参数,包含item对象        /// </summary>        public class EnqueueEventArgs : EventArgs        {            public T Item { get; private set; }            public EnqueueEventArgs(object item)            {                try                {                    Item = (T)item;                }                catch (Exception)                {                    throw new InvalidCastException("object to T 转换失败");                }            }        }    }}
代码using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading;using System.IO;using WorkQueue;namespace Program{    class Program    {        private static List<string> list=new List<string>(1000);        static  StreamWriter sw = new StreamWriter(new FileStream("test.dat", FileMode.Create));        static void Main(string[] args)        {            WorkQueue<int> workQueue=new WorkQueue<int>(1000);            workQueue.UserWork += new UserWorkEventHandler<int>(workQueue_UserWork);           // workQueue.WorkSequential = true;            ThreadPool.QueueUserWorkItem(o =>                                             {                                                 for (int i = 0; i < 1000; i++)                                                 {                                                     workQueue.EnqueueItem(i);                                                 }                                             });            Console.ReadLine();            list.ForEach(str=>sw.WriteLine(str));            Console.WriteLine(workQueue.IsEmpty());            sw.Close();        }        static void workQueue_UserWork(object sender, WorkQueue<int>.EnqueueEventArgs e)        {                       StringBuilder sb=new StringBuilder();            sb.Append(e.Item).Append("\t\t").Append(DateTime.Now.ToString("u")+"\t\t").Append(Thread.CurrentThread.ManagedThreadId);            list.Add(sb.ToString());            Thread.Sleep(15);        }    }}

 

一个C#多线程的工作队列