首页 > 代码库 > C#并行编程-线程同步原语
C#并行编程-线程同步原语
菜鸟学习并行编程,参考《C#并行编程高级教程.PDF》,如有错误,欢迎指正。
背景
有时候必须访问变量、实例、方法、属性或者结构体,而这些并没有准备好用于并发访问,或者有时候需要执行部分代码,而这些代码必须单独运行,这是不得不通过将任务分解的方式让它们独立运行。
当任务和线程要访问共享的数据和资源的时候,您必须添加显示的同步,或者使用原子操作或锁。
之前的.NET Framework提供了昂贵的锁机制以及遗留的多线程模型,新的数据结构允许细粒度的并发和并行化,并且降低一定必要的开销,这些数据结构称为轻量级同步原语。
这些数据结构在关键场合下能够提供更好的性能,因为它们能够避免昂贵的锁机制,如果在等待时间不短的情况下使用它们,这些原语会增加额外的开销。
如果您需要特定的执行顺序,可以通过添加显示同步来实现。
同步原语
.NET Framework 4在现在的System.Threading命名空间中提供了6个同步原语,通过这个命名空间可以访问遗留的线程类、类型和枚举,还提供了新的基于任务的编程模型及特定情形紧密相关的数据结构
Barrier 使多个任务能够采用并行方式依据某种算法在多个阶段中协同工作 通过屏障
CountdownEvent 表示在计数变为0时处于有信号状态的同步基元 通过信号机制
ManualResetEventSlim 允许很多任务等待直到另一个任务手工发出事件句柄,当预计等待时间很短的时候,ManualResetEventSlim 的性能比对应的重量级ManualResetEvent的性能要高。通过信号机制
SemaphoreSlim 限制对可同时访问资源或资源池的线程数,比对应的Semaphore性能要高 通过信号机制
SpinLock 提供一个相互排斥锁基元,在该基元中,尝试获得锁的线程将在重复检查的循环中等待,直至该锁变为可用为止。
SpinWait 提供对基于自旋的等待的支持。
通过屏障同步并发任务 Barrier
当在需要一组任务并行地运行一连串的阶段,但是每一个阶段都要等待其他任务完成前一阶段之后才能开始时,您可以通过使用Barrier类的实例来同步这一类协同工作,通过屏障
下面贴代码方便大家理解,如有问题,请指正,详情见注释:
class Program { private static Task[] _CookTasks { get; set; } private static Barrier _barrier { get; set; } /*获取当前计算机处理器数*/ private static int _particpants = Environment.ProcessorCount; /* coder:释迦苦僧 * 代码中 展示煮饭的步骤 1.打水 2.淘米 3.放入锅中 4.盖上锅盖 5.生火煮饭 */ static void Main(string[] args) { Console.WriteLine("定义{0}个人煮饭3次", _particpants); _CookTasks = new Task[_particpants]; _barrier = new Barrier(_particpants, (barrier) => { Console.WriteLine("当前阶段:{0}", barrier.CurrentPhaseNumber); }); Stopwatch swTask1 = new Stopwatch(); swTask1.Start(); /*定义N个人*/ for (int cook_person = 0; cook_person < _particpants; cook_person++) { _CookTasks[cook_person] = Task.Factory.StartNew((num) => { int index = Convert.ToInt32(num); /*每个人煮3次饭*/ for (int cook_count = 0; cook_count < 3; cook_count++) { CookStepTask1(index, cook_count); CookStepTask2(index, cook_count); CookStepTask3(index, cook_count); CookStepTask4(index, cook_count); CookStepTask5(index, cook_count); } }, cook_person); } /*ContinueWhenAll 提供一组任务完成后 延续方法*/ var finalTask = Task.Factory.ContinueWhenAll(_CookTasks, (tasks) => { /*等待任务完成*/ Task.WaitAll(_CookTasks); swTask1.Stop(); Console.WriteLine("采用并发 {1}个人煮3次饭耗时:{0}", swTask1.ElapsedMilliseconds, _particpants); /*释放资源*/ _barrier.Dispose(); }); Thread.Sleep(4000); Stopwatch swTask = new Stopwatch(); swTask.Start(); /*定义N个人*/ for (int cook_person = 0; cook_person < _particpants; cook_person++) { /*每个人煮3次饭*/ for (int cook_count = 0; cook_count < 3; cook_count++) { CookStep1(cook_person, cook_count); CookStep2(cook_person, cook_count); CookStep3(cook_person, cook_count); CookStep4(cook_person, cook_count); CookStep5(cook_person, cook_count); } } swTask.Stop(); Console.WriteLine("不采用并发 {1}个人煮3次饭耗时:{0}", swTask.ElapsedMilliseconds, _particpants); Thread.Sleep(2000); Console.ReadLine(); } /*1.打水*/ private static void CookStepTask1(int pesron_index, int index) { Console.WriteLine("{0} 第{1}次 打水... 耗时2分钟", pesron_index, index); Thread.Sleep(200); /*存在线程暂停 所以需要将 _barrier.SignalAndWait();放在方法中 */ _barrier.SignalAndWait(); } /*2.淘米*/ private static void CookStepTask2(int pesron_index, int index) { Console.WriteLine("{0} 第{1}次 淘米... 耗时3分钟", pesron_index, index); Thread.Sleep(300); /*存在线程暂停 所以需要将 _barrier.SignalAndWait();放在方法中 */ _barrier.SignalAndWait(); } /*3.放入锅中*/ private static void CookStepTask3(int pesron_index, int index) { Console.WriteLine("{0} 第{1}次 放入锅中... 耗时1分钟", pesron_index, index); Thread.Sleep(100); /*存在线程暂停 所以需要将 _barrier.SignalAndWait();放在方法中 */ _barrier.SignalAndWait(); } /*4.盖上锅盖*/ private static void CookStepTask4(int pesron_index, int index) { Console.WriteLine("{0} 第{1}次 盖上锅盖... 耗时1分钟", pesron_index, index); Thread.Sleep(100); /*存在线程暂停 所以需要将 _barrier.SignalAndWait();放在方法中 */ _barrier.SignalAndWait(); } /*5.生火煮饭*/ private static void CookStepTask5(int pesron_index, int index) { Console.WriteLine("{0} 第{1}次 生火煮饭... 耗时30分钟", pesron_index, index); Thread.Sleep(500); /*存在线程暂停 所以需要将 _barrier.SignalAndWait();放在方法中 */ _barrier.SignalAndWait(); } /*1.打水*/ private static void CookStep1(int pesron_index, int index) { Console.WriteLine("{0} 第{1}次 打水... 耗时2分钟", pesron_index, index); Thread.Sleep(200); } /*2.淘米*/ private static void CookStep2(int pesron_index, int index) { Console.WriteLine("{0} 第{1}次 淘米... 耗时3分钟", pesron_index, index); Thread.Sleep(300); } /*3.放入锅中*/ private static void CookStep3(int pesron_index, int index) { Console.WriteLine("{0} 第{1}次 放入锅中... 耗时1分钟", pesron_index, index); Thread.Sleep(100); } /*4.盖上锅盖*/ private static void CookStep4(int pesron_index, int index) { Console.WriteLine("{0} 第{1}次 盖上锅盖... 耗时1分钟", pesron_index, index); Thread.Sleep(100); } /*5.生火煮饭*/ private static void CookStep5(int pesron_index, int index) { Console.WriteLine("{0} 第{1}次 生火煮饭... 耗时30分钟", pesron_index, index); Thread.Sleep(500); } } class Product { public string Name { get; set; } public string Category { get; set; } public int SellPrice { get; set; } }
如代码所示,在串行代码中,虽然任务是有序进行,但是等待的时间很长,因为只是在一个处理器下进行处理,如下图所示:
而采用并发处理中,使用 Barrier,不仅保证了任务的有序进行,还在性能损耗上得到了最大程度的降低,如下图
ContinueWhenAll 提供一组任务完成后的延续方法
/*ContinueWhenAll 提供一组任务完成后 延续方法*/ var finalTask = Task.Factory.ContinueWhenAll(_CookTasks, (tasks) => { /*等待任务完成*/ Task.WaitAll(_CookTasks); swTask1.Stop(); Console.WriteLine("采用并发 {1}个人煮3次饭耗时:{0}", swTask1.ElapsedMilliseconds, _particpants); /*释放资源*/ _barrier.Dispose(); });
通过屏障同步并发任务 Barrier 下的异常和超时处理
废话不多说 直接贴代码,如有问题请指正:
class Program { private static Task[] _CookTasks { get; set; } private static Barrier _barrier { get; set; } /*获取当前计算机处理器数*/ private static int _particpants = Environment.ProcessorCount; /* coder:释迦苦僧 * 代码中 展示煮饭的步骤 1.打水 2.淘米 3.放入锅中 4.盖上锅盖 5.生火煮饭 */ static void Main(string[] args) { Console.WriteLine("定义{0}个人煮饭3次", _particpants); _CookTasks = new Task[_particpants]; _barrier = new Barrier(_particpants, (barrier) => { Console.WriteLine("当前阶段:{0}", barrier.CurrentPhaseNumber); }); Stopwatch swTask1 = new Stopwatch(); swTask1.Start(); /*定义N个人*/ for (int cook_person = 0; cook_person < _particpants; cook_person++) { _CookTasks[cook_person] = Task.Factory.StartNew((num) => { int index = Convert.ToInt32(num); /*每个人煮3次饭*/ for (int cook_count = 0; cook_count < 3; cook_count++) { CookStepTask1(index, cook_count); /*处理等待中的异常 如果等待时间超过300毫秒的话则抛出 * 参考方法体1中 模拟了超时操作, 则屏障等待时 如果发现超时 则处理异常 */ try { /*屏障 等待超过2秒钟 其执行算法有问题 超时 则抛出异常 记录信息 提醒开发人员观察*/ if (!_barrier.SignalAndWait(2000)) { /*抛出超时异常*/ throw new OperationCanceledException("等待超时,抛出异常"); } } catch (Exception ex) { /*处理异常*/ Console.WriteLine(ex.Message); continue; } CookStepTask2(index, cook_count); _barrier.SignalAndWait(); CookStepTask3(index, cook_count); _barrier.SignalAndWait(); CookStepTask4(index, cook_count); _barrier.SignalAndWait(); CookStepTask5(index, cook_count); _barrier.SignalAndWait(); } }, cook_person); } /*ContinueWhenAll 提供一组任务完成后 延续方法*/ var finalTask = Task.Factory.ContinueWhenAll(_CookTasks, (tasks) => { foreach (Task task in _CookTasks) { if (task.Exception != null) { /*任务执行完成后 输出所有异常 打印异常报表*/ foreach (Exception exception in task.Exception.InnerExceptions) { Console.WriteLine("异常信息:{0}", exception.Message); } } } /*等待任务完成*/ Task.WaitAll(_CookTasks); swTask1.Stop(); Console.WriteLine("采用并发 {1}个人煮3次饭耗时:{0}", swTask1.ElapsedMilliseconds, _particpants); /*释放资源*/ _barrier.Dispose(); }); Console.ReadLine(); } /*1.打水*/ private static void CookStepTask1(int pesron_index, int index) { Console.WriteLine("{0} 第{1}次 打水... 耗时2分钟", pesron_index, index); /*模拟一个方法体内异常抛出*/ //throw new Exception("抛出一个代码异常"); if (pesron_index == 0) { /*模拟超时操作*/ //SpinWait.SpinUntil(() => (_barrier.ParticipantsRemaining == 0), 5000); Thread.Sleep(5000); } } /*2.淘米*/ private static void CookStepTask2(int pesron_index, int index) { Console.WriteLine("{0} 第{1}次 淘米... 耗时3分钟", pesron_index, index); } /*3.放入锅中*/ private static void CookStepTask3(int pesron_index, int index) { Console.WriteLine("{0} 第{1}次 放入锅中... 耗时1分钟", pesron_index, index); } /*4.盖上锅盖*/ private static void CookStepTask4(int pesron_index, int index) { Console.WriteLine("{0} 第{1}次 盖上锅盖... 耗时1分钟", pesron_index, index); } /*5.生火煮饭*/ private static void CookStepTask5(int pesron_index, int index) { Console.WriteLine("{0} 第{1}次 生火煮饭... 耗时30分钟", pesron_index, index); } }
如代码所示,在 CookStepTask1 方法体中,我模拟了超时和异常,并在Task任务中,利用Barrier的SignalAndWait方法处理屏障中的超时信息,和Task中异常记录信息。
锁的特性
互斥和可见性。互斥指的是一次只允许一个线程持有某个特定的锁,因此可以保证共享数据内容的一致性;
可见性指的是必须确保锁被释放之前对共享数据的修改,随后获得锁的另一个线程能够知道该行为。
参考http://www.cnblogs.com/lucifer1982/archive/2008/03/23/1116981.html
互斥锁-System.Threading.Monitor
如果有一个临界区,一次只有一个任务能够访问这个临界区,但是这个临界区需要被很多任务循环访问,那么使用任务延续并不是一个好的选择,那么另一种替换方案就是采用互斥锁原语。
下面已操作字符串为示意,看下不采用锁,采用传统的LOCK和采用互斥锁的区别
不采用任何锁机制代码如下:
class Program { private static Task[] _CookTasks { get; set; } private static object o = new object(); private static StringBuilder AppendStrUnLock = new StringBuilder(); private static StringBuilder AppendStrLock = new StringBuilder(); private static StringBuilder AppendStrMonitorLock = new StringBuilder(); /*获取当前计算机处理器数*/ private static int _particpants = Environment.ProcessorCount; /* coder:释迦苦僧 */ static void Main(string[] args) { _CookTasks = new Task[_particpants]; Stopwatch swTask1 = new Stopwatch(); swTask1.Start(); for (int task_index = 0; task_index < _particpants; task_index++) { _CookTasks[task_index] = Task.Factory.StartNew((num) => { Parallel.For(1, 1000, (i) => { string str = "append message " + i; AppendStrUnLock.Append(str); }); }, task_index); } /*ContinueWhenAll 提供一组任务完成后 延续方法*/ var finalTask = Task.Factory.ContinueWhenAll(_CookTasks, (tasks) => { /*等待任务完成*/ Task.WaitAll(_CookTasks); swTask1.Stop(); Console.WriteLine("不采用Lock操作,字符串长度:{0},耗时:{1}", AppendStrUnLock.Length, swTask1.ElapsedMilliseconds); /*释放资源*/ }); Console.ReadLine(); } }
采用Lock机制代码如下:
class Program { private static Task[] _CookTasks { get; set; } private static object o = new object(); private static StringBuilder AppendStrUnLock = new StringBuilder(); private static StringBuilder AppendStrLock = new StringBuilder(); private static StringBuilder AppendStrMonitorLock = new StringBuilder(); /*获取当前计算机处理器数*/ private static int _particpants = Environment.ProcessorCount; /* coder:释迦苦僧 */ static void Main(string[] args) { _CookTasks = new Task[_particpants]; Stopwatch swTask1 = new Stopwatch(); swTask1.Start(); for (int task_index = 0; task_index < _particpants; task_index++) { _CookTasks[task_index] = Task.Factory.StartNew((num) => { Parallel.For(1, 1000, (i) => { string str = "append message " + i; lock (o) { AppendStrLock.Append(str); } }); }, task_index); } /*ContinueWhenAll 提供一组任务完成后 延续方法*/ var finalTask = Task.Factory.ContinueWhenAll(_CookTasks, (tasks) => { /*等待任务完成*/ Task.WaitAll(_CookTasks); swTask1.Stop(); Console.WriteLine("不采用Lock操作,字符串长度:{0},耗时:{1}", AppendStrLock.Length, swTask1.ElapsedMilliseconds); /*释放资源*/ }); Console.ReadLine(); } }
采用互斥锁代码下:
class Program { private static Task[] _CookTasks { get; set; } private static object o = new object(); private static StringBuilder AppendStrUnLock = new StringBuilder(); private static StringBuilder AppendStrLock = new StringBuilder(); private static StringBuilder AppendStrMonitorLock = new StringBuilder(); /*获取当前计算机处理器数*/ private static int _particpants = Environment.ProcessorCount; /* coder:释迦苦僧 */ static void Main(string[] args) { _CookTasks = new Task[_particpants]; Stopwatch swTask1 = new Stopwatch(); swTask1.Start(); for (int task_index = 0; task_index < _particpants; task_index++) { _CookTasks[task_index] = Task.Factory.StartNew((num) => { Parallel.For(1, 1000, (i) => { string str = "append message " + i; bool lockTaken = false; try { Monitor.Enter(o, ref lockTaken); AppendStrMonitorLock.Append(str); } finally { if (lockTaken) Monitor.Exit(o); } }); }, task_index); } /*ContinueWhenAll 提供一组任务完成后 延续方法*/ var finalTask = Task.Factory.ContinueWhenAll(_CookTasks, (tasks) => { /*等待任务完成*/ Task.WaitAll(_CookTasks); swTask1.Stop(); Console.WriteLine("不采用Lock操作,字符串长度:{0},耗时:{1}", AppendStrMonitorLock.Length, swTask1.ElapsedMilliseconds); /*释放资源*/ }); Console.ReadLine(); } }
System.Threading.Monitor 类通过使用互斥锁提供了对象的同步访问机制,使用Lock关键字的等价代码使用起来更加简洁,不需要额外的异常捕获和处理代码。
但是System.Threading.Monitor好处是提供了些其他的方法(Lock中却没有),通过这些方法可以对锁的过程有更多的控制。需要注意的是 Lock关键字和System.Threading.Monitor类仍然是提供互斥访问的首选方法,不过在某些情形下,其他互斥锁原语可能会提供更好的性能和更小的开销,如SpinLock,Lock和System.Threading.Monitor类智能锁定对象,即引用类型。
锁超时 Monitor.TryEnter(o, 2000, ref lockTaken);
在多任务中,很多任务试图获得锁从而进入临界区,如果其中一个参与者不能释放锁,那么其他所有的任务都要在Monitor.Enter的方法内永久的等待下去。Monitor.TryEnter方法则提供了超时机制,如代码所示:
class Program { private static Task[] _CookTasks { get; set; } private static object o = new object(); private static StringBuilder AppendStrUnLock = new StringBuilder(); private static StringBuilder AppendStrLock = new StringBuilder(); private static StringBuilder AppendStrMonitorLock = new StringBuilder(); /*获取当前计算机处理器数*/ private static int _particpants = Environment.ProcessorCount; /* coder:释迦苦僧 */ static void Main(string[] args) { _CookTasks = new Task[_particpants]; Stopwatch swTask1 = new Stopwatch(); swTask1.Start(); for (int task_index = 0; task_index < _particpants; task_index++) { _CookTasks[task_index] = Task.Factory.StartNew((num) => { try { Parallel.For(1, 200000, (i) => { string str = "append message " + i; bool lockTaken = false; try { Monitor.TryEnter(o, 2000, ref lockTaken); if (!lockTaken) { throw new OperationCanceledException("锁超时...."); } if (i == 2) { Thread.Sleep(40000); } AppendStrMonitorLock.Append(str); } catch (Exception ex) { throw ex; } finally { if (lockTaken) Monitor.Exit(o); } }); } catch (Exception ex) { throw ex; } }, task_index); } /*ContinueWhenAll 提供一组任务完成后 延续方法*/ var finalTask = Task.Factory.ContinueWhenAll(_CookTasks, (tasks) => { /*等待任务完成*/ Task.WaitAll(_CookTasks); swTask1.Stop(); foreach (Task task in _CookTasks) { if (task.Exception != null) { /*任务执行完成后 输出所有异常 打印异常报表*/ foreach (Exception exception in task.Exception.InnerExceptions) { Console.WriteLine("异常信息:{0}", exception.Message); } } } Console.WriteLine("不采用Lock操作,字符串长度:{0},耗时:{1}", AppendStrMonitorLock.Length, swTask1.ElapsedMilliseconds); /*释放资源*/ }); Console.ReadLine(); } }
需要注意,上述代码中,异常并没有被捕捉到,因此每一个不能获得锁的任务都会出错退出并停止执行。
System.Threading.Monitor类还提供了以下三个方法,大家可以参考MSND:
自旋锁 - System.Threading.SpinLock
如果持有锁的时间非常短,锁的粒度很精细,那么自旋锁可以获得比其他锁机制更好的性能,互斥锁System.Threading.Monitor的开销非常大。
下述代码展现System.Threading.Monitor和System.Threading.SpinLock的性能:
class Program { private static Task[] _CookTasks { get; set; } private static object o = new object(); private static StringBuilder AppendStrUnLock = new StringBuilder(); private static StringBuilder AppendStrLock = new StringBuilder(); private static StringBuilder AppendStrMonitorLock = new StringBuilder(); /*获取当前计算机处理器数*/ private static int _particpants = Environment.ProcessorCount; /* coder:释迦苦僧 */ static void Main(string[] args) { SpinLock sl=new SpinLock(); _CookTasks = new Task[_particpants]; Thread.Sleep(4000); Stopwatch swTask1 = new Stopwatch(); swTask1.Start(); for (int task_index = 0; task_index < _particpants; task_index++) { _CookTasks[task_index] = Task.Factory.StartNew((num) => { Parallel.For(1, 200000, (i) => { string str = "append message " + i; bool lockTaken = false; try { Monitor.Enter(o, ref lockTaken); AppendStrMonitorLock.Append(str); } finally { if (lockTaken) Monitor.Exit(o); } }); }, task_index); } /*ContinueWhenAll 提供一组任务完成后 延续方法*/ var finalTask = Task.Factory.ContinueWhenAll(_CookTasks, (tasks) => { /*等待任务完成*/ Task.WaitAll(_CookTasks); swTask1.Stop(); Console.WriteLine("采用Monitor操作,字符串长度:{0},耗时:{1}", AppendStrMonitorLock.Length, swTask1.ElapsedMilliseconds); /*释放资源*/ }); Console.ReadLine(); } }
class Program { private static Task[] _CookTasks { get; set; } private static object o = new object(); private static StringBuilder AppendStrUnLock = new StringBuilder(); private static StringBuilder AppendStrLock = new StringBuilder(); private static StringBuilder AppendStrMonitorLock = new StringBuilder(); /*获取当前计算机处理器数*/ private static int _particpants = Environment.ProcessorCount; /* coder:释迦苦僧 */ static void Main(string[] args) { SpinLock sl = new SpinLock(); _CookTasks = new Task[_particpants]; Thread.Sleep(4000); Stopwatch swTask1 = new Stopwatch(); swTask1.Start(); for (int task_index = 0; task_index < _particpants; task_index++) { _CookTasks[task_index] = Task.Factory.StartNew((num) => { Parallel.For(1, 200000, (i) => { string str = "append message " + i; bool lockTaken = false; try { sl.Enter(ref lockTaken); AppendStrMonitorLock.Append(str); } finally { if (lockTaken) sl.Exit(); } }); }, task_index); } /*ContinueWhenAll 提供一组任务完成后 延续方法*/ var finalTask = Task.Factory.ContinueWhenAll(_CookTasks, (tasks) => { /*等待任务完成*/ Task.WaitAll(_CookTasks); swTask1.Stop(); Console.WriteLine("采用SpinLock操作,字符串长度:{0},耗时:{1}", AppendStrMonitorLock.Length, swTask1.ElapsedMilliseconds); /*释放资源*/ }); Console.ReadLine(); } }
在实际的编程中需要注意的是:不要将SpinLock声明为只读字段,如果声明为只读字段,会导致每次调用都会返回一个SpinLock新副本,在多线程下,每个方法都会成功获得锁,而受到保护的临界区不会按照预期进行串行化。
基于自旋锁的等待-System.Threading.SpinWait
如果等待某个条件满足需要的时间很短,而且不希望发生昂贵的上下文切换,那么基于自旋的等待时一种很好的替换方案,SpinWait不仅提供了基本自旋功能,而且还提供了SpinWait.SpinUntil方法,使用这个方法能够自旋直到满足某个条件为止,此外SpinWait是一个Struct,从内存的角度上说,开销很小。SpinLock是对SpinWait的简单封装。
需要注意的是:长时间的自旋不是很好的做法,因为自旋会阻塞更高级的线程及其相关的任务,还会阻塞垃圾回收机制。SpinWait并没有设计为让多个任务或线程并发使用,因此多个任务或线程通过SpinWait方法进行自旋,那么每一个任务或线程都应该使用自己的SpinWait实例。
当一个线程自旋时,会将一个内核放入到一个繁忙的循环中,而不会让出当前处理器时间片剩余部分,当一个任务或者线程调用Thread.Sleep方法时,底层线程可能会让出当前处理器时间片的剩余部分,这是一个大开销的操作。
因此,在大部分情况下,不要在循环内调用Thread.Sleep方法等待特定的条件满足。
下面贴代码,方便大家理解,如有错误请指正:
class Program { private static Task[] _CookTasks { get; set; } /*定义一个变量 该变量指示是否可以进行下一步操作*/ private static bool _stepbool = false; /*获取当前计算机处理器数*/ private static int _particpants = Environment.ProcessorCount; /* coder:释迦苦僧 */ static void Main(string[] args) { _CookTasks = new Task[_particpants]; for (int task_index = 0; task_index < _particpants; task_index++) { _CookTasks[task_index] = Task.Factory.StartNew((num) => { CookStep1(); /*等待5秒钟 _stepbool变为true ,如果5秒钟内没有淘好米 则提示超时*/ if (!SpinWait.SpinUntil(() => (_stepbool), 1000)) { Console.WriteLine("淘个米都花这么长时间...."); } else { /*按时淘好米开始煮饭*/ Console.WriteLine("淘好米煮饭...."); } }, task_index); } /*主线程创造超时条件*/ Thread.Sleep(3000); _stepbool = true; Console.ReadLine(); } static void CookStep1() { Console.WriteLine("淘米...."); } }
volatile
volatile关键字能够保证;当这个共享变量被不同线程访问和更新且没有锁和原子操作的时候,最新的值总能在共享变量中表示出来。
volatile变量可以看作是“轻量级lock”。当出于简单编码和可伸缩性考虑时,我们可能会选择使用volatile变量而不是锁机制。某些情况下,如果读操作远多于写操作,也会比锁机制带来更高性能。
volatile变量具有“lock”的可见性,却不具备原子特性。也就是说线程能够自动发现volatile变量的最新值。volatile变量可以实现线程安全,但其应用有限。使用volatile变量的主要原因在于它使用非常简单,至少比使用锁机制要简单的多;其次便是性能原因了,某些情况下,它的性能要优于锁机制。此外,volatile操作不会造成阻塞。
参考:http://www.cnblogs.com/lucifer1982/archive/2008/03/23/1116981.html 大家可以看下 写的不错
ManualResetEventSlim
ManualResetEventSlim通过封装手动重置事件等待句柄提供了自旋等待和内核等待的组合。您可以使用这个类的实例在任务直接发送信息,并等待事件的发送。通过信号机制通知任务开始其工作。
其Set 方法将事件状态设置为有信号,从而允许一个或多个等待该事件的线程继续。 其 Wait()方法 阻止当前线程,直到设置了当前 ManualResetEventSlim 为止。
如果需要跨进程或者跨AppDomain的同步,那么就必须使用ManualResetEvent,而不能使用ManualResetEventSlim。
using System;using System.Threading;using System.Threading.Tasks;class MRESDemo{ static void Main() { MRES_SetWaitReset(); } static void MRES_SetWaitReset() { ManualResetEventSlim mres1 = new ManualResetEventSlim(false); var observer = Task.Factory.StartNew(() => { Console.WriteLine("阻塞当前线程,使 mres1 处于等待状态...!"); mres1.Wait(); while (true) { if (mres1.IsSet) { /*等待 mres1 Set()信号 当有信号时 在执行后面代码*/ Console.WriteLine("得到mres1信号,执行后续代码....!"); } Thread.Sleep(100); } }); Thread.Sleep(2000); Console.WriteLine("取消mres1等待状态"); mres1.Set(); Console.WriteLine("当前信号状态:{0}", mres1.IsSet); Thread.Sleep(300); mres1.Reset(); Console.WriteLine("当前信号状态:{0}", mres1.IsSet); Thread.Sleep(300); mres1.Set(); Console.WriteLine("当前信号状态:{0}", mres1.IsSet); Thread.Sleep(300); mres1.Reset(); Console.WriteLine("当前信号状态:{0}", mres1.IsSet); observer.Wait(); mres1.Dispose(); Console.ReadLine(); }}
SemaphoreSlim
有时候,需要对访问一个紫云或者一个资源池的并发任务或者线程的数量做限制时,采用System.Threading.SemaphoreSlim类非常有用。
该了表示一个Windows内核信号量对象,如果等待的时间非常短,System.Threading.SemaphoreSlim类带来的额外开销会更少,而且更适合对任务处理,System.Threading.SemaphoreSlim提供的计数信号量没有使用Windows内核的信号量。
计数信号量:通过跟踪进入和离开任务或线程来协调对资源的访问,信号量需要知道能够通过信号量协调机制所访问共享资源的最大任务数,然后,信号量使用了一个计数器,根据任务进入或离开信号量控制区对计数器进行加减。
需要注意的是:信号量会降低可扩展型,而且信号量的目的就是如此。SemaphoreSlim实例并不能保证等待进入信号量的任务或线程的顺序。
下面贴代码,方便大家理解:
using System;using System.Threading;using System.Threading.Tasks;class MRESDemo{ /*code:释迦苦僧*/ static void Main() { SemaphoreSlim ss = new SemaphoreSlim(3); // 创建SemaphoreSlim 初始化信号量最多计数为3次 Console.WriteLine("创建SemaphoreSlim 初始化信号量最多计数为{0}次", ss.CurrentCount); // Launch an asynchronous Task that releases the semaphore after 100 ms Task t1 = Task.Factory.StartNew(() => { while (true) { /*阻止当前线程,直至它可进入 SemaphoreSlim 为止。*/ /*阻塞当前任务或线程,直到信号量几首大于0时它才能进入信号量*/ ss.Wait(); Console.WriteLine("允许进入 SemaphoreSlim 的线程的数量:{0}", ss.CurrentCount); Thread.Sleep(10); } }); Thread.Sleep(3000); /*当前Task只能进入3次*/ /*退出一次信号量 并递增信号量的计数*/ Console.WriteLine("退出一次信号量 并递增信号量的计数"); ss.Release(); Thread.Sleep(3000); /*退出3次信号量 并递增信号量的计数*/ Console.WriteLine("退出三次信号量 并递增信号量的计数"); ss.Release(3); /*等待任务完成*/ Task.WaitAll(t1); /*释放*/ ss.Dispose(); Console.ReadLine(); }}
CountdownEvent
有时候,需要对数目随时间变化的任务进行跟踪,CountdownEvent是一个非轻量级的同步原语,与Task.WaitAll或者TaskFactory.ContinueWhenAll 等待其他任务完成执行而运行代码相比,CountdownEvent的开销要小得多。
CountdownEvent实例带有一个初始的信号计数,在典型的fork/join场景下,每当一个任务完成工作的时候,这个任务都会发出一个CountdownEvent实例的信号,并将其信号计数递减1,调用CountdownEvent的wait方法的任务将会阻塞,直到信号计数达到0.
下面贴代码,方便大家理解:
class MRESDemo{ /*code:释迦苦僧*/ static void Main() { CountdownEvent cde = new CountdownEvent(3); // 创建SemaphoreSlim 初始化信号量最多计数为3次 Console.WriteLine(" InitialCount={0}, CurrentCount={1}, IsSet={2}", cde.InitialCount, cde.CurrentCount, cde.IsSet); // Launch an asynchronous Task that releases the semaphore after 100 ms Task t1 = Task.Factory.StartNew(() => { while (true) { Thread.Sleep(1000); if (!cde.IsSet) { cde.Signal(); Console.WriteLine(" InitialCount={0}, CurrentCount={1}, IsSet={2}", cde.InitialCount, cde.CurrentCount, cde.IsSet); } } }); cde.Wait(); /*将 CurrentCount 重置为 InitialCount 的值。*/ Console.WriteLine("将 CurrentCount 重置为 InitialCount 的值。"); cde.Reset(); cde.Wait(); /*将 CurrentCount 重置为 5*/ Console.WriteLine("将 CurrentCount 重置为 5"); cde.Reset(5); cde.AddCount(2); cde.Wait(); /*等待任务完成*/ Task.WaitAll(t1); Console.WriteLine("任务执行完成"); /*释放*/ cde.Dispose(); Console.ReadLine(); }}
class MRESDemo{ /*code:释迦苦僧*/ static void Main() { CountdownEvent cde = new CountdownEvent(3); // 创建SemaphoreSlim 初始化信号量最多计数为3次 Console.WriteLine(" InitialCount={0}, CurrentCount={1}, IsSet={2}", cde.InitialCount, cde.CurrentCount, cde.IsSet); /*创建任务执行计数*/ Task t1 = Task.Factory.StartNew(() => { for (int index = 0; index <= 5; index++) { /*重置计数器*/ cde.Reset(); /*创建任务执行计数*/ while (true) { Thread.Sleep(1000); if (!cde.IsSet) { cde.Signal(); Console.WriteLine("第{0}轮计数 CurrentCount={1}", index, cde.CurrentCount); } else { Console.WriteLine("第{0}轮计数完成", index); break; } } /*等待计数完成*/ cde.Wait(); } }); t1.Wait(); /*释放*/ cde.Dispose(); Console.ReadLine(); }}
关于并行编程中的线程同步原语就写到这,如有问题,欢迎指正
作者:释迦苦僧 出处:http://www.cnblogs.com/woxpp/p/3941550.html 本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接。
C#并行编程-线程同步原语