首页 > 代码库 > .net中的线程同步基础(搬运自CLR via C#)
.net中的线程同步基础(搬运自CLR via C#)
线程安全
此类型的所有公共静态(Visual Basic 中为 Shared)成员对多线程操作而言都是安全的。但不保证任何实例成员是线程安全的。
在MSDN上经常会看到这样一句话。表示如果程序中有n个线程调用这个方法,那么这n个线程都是安全的, 但是实例成员就不能保证了。
比如Math.Max方法,不管有多少个线程调用,都不会出现线程不安全的情况。
列举一个由于多线程引起的数据不安全。
static void Main(string[] args) { Stopwatch watch = new Stopwatch(); watch.Start(); int sum = 0; var t1 = Task.Run(() => { for (int i = 0; i < 3000000; i++) { sum += i; } }); var t2 = Task.Run(() => { for (int i = 3000000; i < 5000000; i++) { sum += i; } }); Task.WaitAll(t1, t2); Console.WriteLine("和:"+ sum); Console.WriteLine("用时:"+watch.ElapsedMilliseconds); }
和:-1271379077
用时:48
请按任意键继续. . .
可能也许有人不理解,为什么变成这个负的了,我以线程1,线程2分别表示t1,t2运行时候的线程。
线程1 | 线程2 | sum |
刚进入循环i=0 | 0 | |
刚进入循环i=0 | 0 | |
sum=sum+i=0+0=0; i=i+1=1; | 0 | |
sum=sum+i=0+0=0; i=i+1=1; | 0 | |
sum=sum+i=0+1=1; i=i+1=2; | 1 | |
sum=sum+i=1+2=3; i=i+1=3 | 3 | |
sum=sum+i=3+1=4; i=i+1=2; | 1 |
通过表格最后一行,我们发现,这个时候线程1,希望的sum=0,但是被线程2修改了。
线程不安全的本质:一个线程想要的数据被其他线程更改了
怎么解决?我想很多人可能都会(当然,我也是)lock,lock,lock
private static readonly object objAsync = new object(); static void Main(string[] args) { Stopwatch watch = new Stopwatch(); watch.Start(); int sum = 0; var t1 = Task.Run(() => { for (int i = 0; i < 3000000; i++) { lock (objAsync) { sum += i; } } }); var t2 = Task.Run(() => { for (int i = 3000000; i < 5000000; i++) { lock (objAsync) { sum += i; } } }); Task.WaitAll(t1, t2); Console.WriteLine("和:" + sum); Console.WriteLine("用时:" + watch.ElapsedMilliseconds); }
和:1642668640
用时:461
请按任意键继续. . .
思考
其实这里的例子完全可以不通过加锁来解决,但是为了表现加锁带来的负面影响,就很明显了,线程同步是需要代价的;
原子操作
为什么要给sum += i加锁,因为它是不原子操作,多个线程同时操作就会产生竞态条件。
百度百科:原子操作是不需要同步的,这是.Net多线程编程的老生常谈了。所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (换到另一个线程)
在.net中,CLR保证一下类型的读写是原子性的。Boolean、Char、(S)Byte、(U)Int16、(U)Int32、(U)IntPtr、Single、引用类型。不包括Double、Int64、(U)Int64.
规律:CLR保证4字节以内的基础类型的读写是原子性的。因为对于Int64类型变量的读写,只要执行两次相应的机器指令。
线程同步
对于线程同步,有用户模式和内核模式两种,由于用户模式速度显著快于内核模式。
用户模式:提供特殊的CPU指令来协调线程,意味着协调是在硬件中发生的,缺点是操作系统和CLR永远检测不到一个线程在用户模式上阻塞了,线程池也不会创建新的线程来替换临时阻塞的线程,这些CPU指令阻塞相当短的时间。
内核模式:由操作系统提供,优点通过内核模式获取其他线程拥有的资源时,Windows会阻塞线程以避免它浪费CPU时间,资源可用时,恢复线程,允许它访问资源。
Volatile
private int flag = 0; private int number = 0; public void Thread1() { number = 5; flag = 1; } public void Thread2() { if (flag == 1) { Console.WriteLine(number); } }
Thread1中先将value赋值为0,再把flag赋值为1,按道理说在Thread2中,如果flag为1了,那么value必定为5,实际上不是这样子的,C#编译器,JIT编译器,CPU会优化代码,导致Thread1中执行顺序不确定性,我也是服了,这有什么用,为什么要做这样的优化!!!
可以通过Volatile.Read或Volatile.Write阻止这种优化。Volatile.Read保证提供的变量中的值在调用时读取,并且之前的加载和存储操作是按照我们的编码顺序,Volatile.Write保证提供的变量中的值在调用时写入,并且之后的加载和存储操作是按照我们的编码顺序。volatile不能用来线程同步。这里有点坑啊感觉。
private int flag = 0; private int number = 0; public void Thread1() { number = 5; Volatile.Write(ref flag, 1); } public void Thread2() { if (Volatile.Read(ref flag) == 1) { Console.WriteLine(number); } }
也可以通过volatile关键字来定义变量,效果是一样的。
private volatile int flag = 0; private int number = 0; public void Thread1() { number = 5; flag = 1; } public void Thread2() { if (flag == 1) { Console.WriteLine(number); } }
Interlocked
Interlocked提供了一组用来原子操作的静态方法。
int Add(ref int location1, int value) | 原子加法操作,支持long |
int Decrement(ref int location) | 原子自减 |
int Decrement(ref int location) | 原子自增 |
int Exchange(ref int location1, int value) | 原子交换 |
int CompareExchange(ref int location1, int value, int comparand) | 原子比较交换 |
这些方法基本都同时支持int、long、double、float、object的。解释一下最后一个方法。
CompareExchange:如果location1等于comparand,就把value赋值给它。语法和下面的代码相同。
int location1 = 1; int comparand = 1; int value = http://www.mamicode.com/2; if(location1 == comparand) { location1 = value; }
假如上面的类型不能满足呢?
使用Interlocked实现自旋锁:让一个线程在原地打转,避免它跑去和另一个线程竞争资源,但是会浪费宝贵的CPU时间,自旋锁应该用于保护那些会执行的非常快的代码区域。
struct SimpleSpinLock { private int m_ResourceInUse; public void Enter() { while (true) { if (Interlocked.Exchange(ref m_ResourceInUse, 1) == 0 { return; } } } public void Leave() { Volatile.Write(ref m_ResourceInUse, 0); } } static int sum = 0; static void Main(string[] args) { Stopwatch watch = new Stopwatch(); watch.Start(); SimpleSpinLock g= new SimpleSpinLock(); var t1 = Task.Run(() => { for (int i = 0; i < 3000000; i++) { g.Enter(); sum += i; g.Leave(); } }); var t2 = Task.Run(() => { for (int i = 3000000; i < 5000000; i++) { g.Enter(); sum += i; g.Leave(); } }); Task.WaitAll(t1, t2); Console.WriteLine("和:" + sum); Console.WriteLine("用时:" + watch.ElapsedMilliseconds); }
和:1642668640
用时:610
请按任意键继续. . .
WaitHandle
抽象类,封装一个Windows内核对象句柄。
//封装等待对共享资源的独占访问的操作系统特定的对象。 public abstract class WaitHandle : MarshalByRefObject, IDisposable { public const int WaitTimeout = 258; protected static readonly IntPtr InvalidHandle; protected WaitHandle(); [Obsolete("Use the SafeWaitHandle property instead.")] public virtual IntPtr Handle { get; set; } //获取或设置本机操作系统句柄。 public SafeWaitHandle SafeWaitHandle { get; set; } //向一个 System.Threading.WaitHandle 发出信号并等待另一个,指定超时间隔为 32 位有符号整数,并指定在进入等待前是否退出上下文的同步域。 public static bool SignalAndWait(WaitHandle toSignal, WaitHandle toWaitOn); //等待指定数组中的所有元素收到信号,使用 System.Int32 值指定时间间隔,并指定是否在等待之前退出同步域。都收到信号返回true public static bool WaitAll(WaitHandle[] waitHandles, int millisecondsTimeout); //等待指定数组中的任一元素收到信号,使用 32 位带符号整数指定时间间隔并指定是否在等待之前退出同步域。返回收到信号的对象的数组索引 public static int WaitAny(WaitHandle[] waitHandles, int millisecondsTimeout, bool exitContext); public virtual void Close(); //阻止当前线程,直到当前的 System.Threading.WaitHandle 收到信号为止,同时使用 32 位带符号整数指定时间间隔,并指定是否在等待之前退出同步域。收到信号返回true public virtual bool WaitOne(TimeSpan timeout, bool exitContext); protected virtual void Dispose(bool explicitDisposing); }
EventWaitHandle
//表示一个线程同步事件。 public class EventWaitHandle : WaitHandle { // 初始化 System.Threading.EventWaitHandle 类的新实例,并指定在此调用后创建的等待句柄最初是否处于终止状态,它是自动重置还是手动重置,系统同步事件的名称,一个 // Boolean 变量(其值在调用后表示是否创建了已命名的系统事件),以及应用于已命名的事件(如果创建了该事件)的访问控制安全性。 public EventWaitHandle(bool initialState, EventResetMode mode, string name, out bool createdNew, EventWaitHandleSecurity eventSecurity); // 打开指定名称为同步事件(如果已经存在)。 public static EventWaitHandle OpenExisting(string name); // 用安全访问权限打开指定名称为同步事件(如果已经存在)。 public static EventWaitHandle OpenExisting(string name, EventWaitHandleRights rights); // 打开指定名称为同步事件(如果已经存在),并返回指示操作是否成功的值。 public static bool TryOpenExisting(string name, out EventWaitHandle result); // 用安全访问权限打开指定名称为同步事件(如果已经存在),并返回指示操作是否成功的值。 public static bool TryOpenExisting(string name, EventWaitHandleRights rights, out EventWaitHandle result); // 获取 System.Security.AccessControl.EventWaitHandleSecurity 对象,该对象表示由当前 System.Threading.EventWaitHandle // 对象表示的已命名系统事件的访问控制安全性。 public EventWaitHandleSecurity GetAccessControl(); // 将事件状态设置为非终止状态,导致线程阻止。 public bool Reset(); // 将事件状态设置为终止状态,允许一个或多个等待线程继续。 public bool Set(); // 设置已命名的系统事件的访问控制安全性。 public void SetAccessControl(EventWaitHandleSecurity eventSecurity); }
AutoResetEvent
// // 摘要: // 通知正在等待的线程已发生事件。此类不能被继承。 public sealed class AutoResetEvent : EventWaitHandle { // // 摘要: // 使用 Boolean 值(指示是否将初始状态设置为终止的)初始化 System.Threading.AutoResetEvent 类的新实例。 // // 参数: // initialState: // 若要将初始状态设置为终止,则为 true;若要将初始状态设置为非终止,则为 false。 public AutoResetEvent(bool initialState); }
使用EventWaitHandle子类AutoResetEvent创建线程同步锁,效果和Interlocked的自旋锁一样的,但是效率要慢很多,但是这里没有抢占成功的线程会阻塞,不会自旋。
class SimpleWaitLock { private AutoResetEvent m_ResourceInUse; public SimpleWaitLock() { m_ResourceInUse = new AutoResetEvent(true); } public void Enter() { m_ResourceInUse.WaitOne(); } public void Leave() { m_ResourceInUse.Set(); } }
Semaphore
//限制可同时访问某一资源或资源池的线程数。 public sealed class Semaphore : WaitHandle { // 初始化 System.Threading.Semaphore 类的新实例,并指定初始入口数和最大并发入口数,可以选择指定系统信号量对象的名称, // 指定一个变量来接收指示是否创建了新系统信号量的值,以及指定系统信号量的安全访问控制。 public Semaphore(int initialCount, int maximumCount, string name, out bool createdNew, SemaphoreSecurity semaphoreSecurity); // 用安全访问权限打开指定名称为信号量(如果已经存在)。 public static Semaphore OpenExisting(string name, SemaphoreRights rights); // 用安全访问权限打开指定名称为信号量(如果已经存在),并返回指示操作是否成功的值。 public static bool TryOpenExisting(string name, SemaphoreRights rights, out Semaphore result); // 获取已命名的系统信号量的访问控制安全性。 public SemaphoreSecurity GetAccessControl(); // 退出信号量并返回前一个计数。 public int Release(); // 以指定的次数退出信号量并返回前一个计数。 public int Release(int releaseCount); // 设置已命名的系统信号量的访问控制安全性。 public void SetAccessControl(SemaphoreSecurity semaphoreSecurity); }
Mutex
Mutex代表互斥体,和AutoResetEvent和计数为一的Semaphore相似,三者都是一次只释放一个正在等待的线程。
// // 摘要: // 还可用于进程间同步的同步基元。 public sealed class Mutex : WaitHandle { // 使用可指示调用线程是否应具有互斥体的初始所有权以及字符串是否为互斥体的名称的 Boolean // 值和当线程返回时可指示调用线程是否已赋予互斥体的初始所有权以及访问控制安全是否已应用到命名互斥体的 // Boolean 变量初始化 System.Threading.Mutex 类的新实例。 public Mutex(bool initiallyOwned, string name, out bool createdNew, MutexSecurity mutexSecurity); // 利用所需的安全访问权限,打开指定的已命名的互斥体(如果已经存在)。 public static Mutex OpenExisting(string name, MutexRights rights); // 利用所需的安全访问权限,打开指定的已命名的互斥体(如果已经存在),并返回指示操作是否成功的值。 public static bool TryOpenExisting(string name, MutexRights rights, out Mutex result); // 获取表示已命名的互斥体的访问控制安全性的 System.Security.AccessControl.MutexSecurity 对象。 public MutexSecurity GetAccessControl(); // 释放 System.Threading.Mutex 一次。 public void ReleaseMutex(); // 设置已命名的系统互斥体的访问控制安全性。 public void SetAccessControl(MutexSecurity mutexSecurity); }
Mutex支持递归,当调用M1时获取Mutext执行了线程安全的操作,调用M2时,仍然可以执行线程安全的操作,此时如果是AutoResetEvent对象就会在M2方法中阻塞。
class SomeClass : IDisposable { private readonly Mutex m_lock = new Mutex(); public void Dispose() { m_lock.Dispose(); } public void M1() { m_lock.WaitOne(); //do somethine safe M2(); m_lock.ReleaseMutex(); } public void M2() { m_lock.WaitOne(); //do something safe m_lock.ReleaseMutex(); } }
AutoResetEvent也可以实现递归锁,效率比Mutex高
class RecurisveAutoResetEvent : IDisposable { private AutoResetEvent m_lock = new AutoResetEvent(true); private int m_owningThread = 0; private int m_recursionCount = 0; public void Dispose() { m_lock.Dispose(); } public void Enter() { int currentThreadId = Thread.CurrentThread.ManagedThreadId; //如果调用线程拥有锁,就再递归一次 if(m_owningThread == currentThreadId) { m_recursionCount++; return; } //没有锁,就等待它 m_lock.WaitOne(); //调用线程拥有了锁,初始化线程id和计数 m_owningThread = currentThreadId; m_recursionCount = 1; } public void Leave() { //如果调用线程没有锁,就出错了 if(m_owningThread != Thread.CurrentThread.ManagedThreadId) { throw new InvalidOperationException(); } //递归数减一 if(--m_recursionCount == 0) { //如果为0,表名该线程没有锁了 m_owningThread = 0; //唤醒等待的另一个线程 m_lock.Set(); } } }
线程同步之混合模式
没有线程竞争时,混合模式提供了用户模式所有具有的性能优势,有竞争时,混合模式使用了内核模式提供不自旋的优势(避免浪费CPU时间)
使用Interlocked和AutoResetEvent自定义混合锁
class SimpleHybridLock : IDisposable { private int m_waiters = 0; private AutoResetEvent m_waiterLock = new AutoResetEvent(false); public void Dispose() { m_waiterLock.Dispose(); } public void Enter() { //这个线程想要获得锁 if(Interlocked.Increment(ref m_waiters) == 1) {//无竞争,获得成功 return; } //获得失败,没有自旋 //而是使线程等待 m_waiterLock.WaitOne(); //到这里线程拿到了锁 } public void Leave() { //这个线程想要释放锁 if (Interlocked.Decrement(ref m_waiters) == 0) { //没有其他线程等待,直接返回 return; } //有其他线程正在阻塞,唤醒其中一个 m_waiterLock.Set(); } }
同样用之前的程序进行测试。明显要比单纯的内核模式要快很多,但是由于转换内核模式造成性能的损失,结果还是比单纯的用户模式慢很多。
和:1642668640
用时:11855
请按任意键继续. . .
自旋片刻的混合锁
通过上面的例子我们知道,转换内核模式会造成巨大的性能损失。而通常线程占有锁的时间都非常短,所以可以让线程自旋一小段时间,再转为内核模式。这样可以避免,一遇到静态条件就转为内核模式避免的性能消耗,由于占有时间非常短,自旋的过程中就可能获得锁了。
class AnotherHybridLock : IDisposable { //用户模式 private int m_waiters = 0; //内核模式 private AutoResetEvent m_waiterLock = new AutoResetEvent(false); //控制自旋 private int m_spincount = 4000; //当前线程,当前线程占用数 private int m_owingThreadId = 0, m_recursion = 0; public void Dispose() { m_waiterLock.Dispose(); } public void Enter() { int currentThreadId = Thread.CurrentThread.ManagedThreadId; //如果调用线程拥有锁,就再递归一次 if (m_owingThreadId == currentThreadId) { m_recursion++; return; } //尝试获取 SpinWait spinwait = new SpinWait(); for(int i = 0; i < m_spincount; i++) { //如果锁可以使用了,这个线程就获得它,设置状态并返回 if (Interlocked.CompareExchange(ref m_waiters, 1, 0) == 0) goto GotLock; //给其他线程运行的机会,希望锁会被释放 spinwait.SpinOnce(); } //自旋结束,仍未获得锁,再试一次 if(Interlocked.Increment(ref m_waiters) > 1) { //仍是竞态条件,阻塞 m_waiterLock.WaitOne(); //新来时拥有锁了。。。 } GotLock: //获得锁了。。 m_owingThreadId = currentThreadId; m_recursion = 1; } public void Leave() { int currentThreadId = Thread.CurrentThread.ManagedThreadId; if (m_owingThreadId != currentThreadId) { throw new InvalidOperationException(); } //如果这个线程仍然有锁,直接返回 if(--m_recursion > 0) { return; } //现在没有线程拥有锁 m_owingThreadId = 0; //如果没有其他线程等待,直接返回 if(Interlocked.Decrement(ref m_waiters) == 0) { return; } //有线程等待,唤醒一个 m_waiterLock.Set(); } }
FCL中的混合模式
ManualResetEventSlim、SemaphoreSlim使用上和内核模式一致,只是都在用户模式中自旋,并且都是在放生第一次竞争时,创建内核模式对象。并且可以通过CancellationToken进行中断退出。
static void Main(string[] args) { Stopwatch watch = new Stopwatch(); watch.Start(); int sum = 0; SemaphoreSlim g = new SemaphoreSlim(1); var t1 = Task.Run(() => { for (int i = 0; i < 3000000; i++) { g.Wait(); sum += i; g.Release(); } }); var t2 = Task.Run(() => { for (int i = 3000000; i < 5000000; i++) { g.Wait(); sum += i; g.Release(); } }); Task.WaitAll(t1, t2); Console.WriteLine("和:" + sum); Console.WriteLine("用时:" + watch.ElapsedMilliseconds); }
和:1642668640
用时:1267
请按任意键继续. . .
Monitor
保存在堆上的对象都有一个同步索引块,默认情况下同步索引块值为1,并且CLR初始化时在堆中分配一个同步数组,调用Monitor.Enter方法时,如果对象同步索引块为1(否则,等待),那么CLR在数组中找到一个空白块,并设置对象的同步块索引,让它引用该同步块,调用Exit方法时,会检查是否有其他线程正在等待使用这个同步块,没有的话,就把同步索引块重新设置为1,有的话就让等待的线程进恢复。Enter的第二个参数表示状态,使用的时候,设置一个默认的false,如果竞态成功,那么就改为true.
public static class Monitor { public static void Enter(object obj); public static void Enter(object obj, ref bool lockTaken); public static void Exit(object obj); public static bool IsEntered(object obj); public static void Pulse(object obj); public static void PulseAll(object obj); public static void TryEnter(object obj, int millisecondsTimeout, ref bool lockTaken); public static bool Wait(object obj, int millisecondsTimeout, bool exitContext); } private static readonly object ObjAsync = new object(); static void Main(string[] args) { Stopwatch watch = new Stopwatch(); watch.Start(); int sum = 0; var t1 = Task.Run(() => { for (int i = 0; i < 3000000; i++) { Monitor.Enter(ObjAsync); sum += i; Monitor.Exit(ObjAsync); } }); var t2 = Task.Run(() => { for (int i = 3000000; i < 5000000; i++) { Monitor.Enter(ObjAsync); sum += i; Monitor.Exit(ObjAsync); } }); Task.WaitAll(t1, t2); Console.WriteLine("和:" + sum); Console.WriteLine("用时:" + watch.ElapsedMilliseconds); }
和:1642668640
用时:337
请按任意键继续. . .
ReaderWriterLockSlim读写锁分离
ReaderWriterLockSlim用来提供读写锁分离,前面介绍的锁,多个线程同时访问共享数据时,线程竞争失败是都会阻塞,造成应用程序的伸缩性和吞吐能力下降。如果所有线程只以读的方式访问数据,根本没有必要阻塞他们,应该允许并发的访问数据。
public class ReaderWriterLockSlim : IDisposable { public ReaderWriterLockSlim(); public ReaderWriterLockSlim(LockRecursionPolicy recursionPolicy); public int WaitingReadCount { get; } public int RecursiveWriteCount { get; } public int RecursiveUpgradeCount { get; } public int RecursiveReadCount { get; } public int CurrentReadCount { get; } public LockRecursionPolicy RecursionPolicy { get; } public bool IsWriteLockHeld { get; } public bool IsUpgradeableReadLockHeld { get; } public bool IsReadLockHeld { get; } public int WaitingUpgradeCount { get; } public int WaitingWriteCount { get; } public void Dispose(); public void EnterReadLock(); public void EnterUpgradeableReadLock(); public void EnterWriteLock(); public void ExitReadLock(); public void ExitUpgradeableReadLock(); public void ExitWriteLock(); public bool TryEnterReadLock(int millisecondsTimeout); public bool TryEnterUpgradeableReadLock(int millisecondsTimeout); public bool TryEnterWriteLock(int millisecondsTimeout); }
使用方法。构造函数中如果枚举为SupportsRecursion表示支持递归
static void Main(string[] args) { Stopwatch watch = new Stopwatch(); watch.Start(); ReaderWriterLockSlim lo = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion); int sum = 0; var t1 = Task.Run(() => { for (int i = 0; i < 3000000; i++) { lo.EnterWriteLock(); sum += i; lo.ExitWriteLock(); } }); var t2 = Task.Run(() => { for (int i = 3000000; i < 5000000; i++) { lo.EnterWriteLock(); sum += i; lo.ExitWriteLock(); } }); Task.WaitAll(t1, t2); Console.WriteLine("和:" + sum); Console.WriteLine("用时:" + watch.ElapsedMilliseconds); }
和:1642668640
用时:622
请按任意键继续. . .
双锁技术(单例模式)
也许会有人觉得为什么lock外已经判断了_instance不为null了,为什么还要判断?
可能会有两个线程同时通过第一个if判断,然后只有一个线程进入了lock内部,创建完成没有问题,但是原先等待的线程就进来了,假如没有第二个if,他就又创建了一个对象!!!
class Singleton { private static object s_lock = new object(); private static Singleton _instance = null; private Singleton() { } public static Singleton GetSingleTon() { if (_instance != null) return _instance; lock (s_lock) { if(_instance == null) { Singleton temp = new Singleton(); Volatile.Write(ref _instance, temp); } } return _instance; } }
小结
这一节学习了.net中的各种锁,Monitor肯定时使用的最多的,但是有些时候,可能需要具体分析吧,比如在MVC中的路由的读取,由于路由是程序启动的时候初始化的(只有一次),所以使用了ReaderWriterLockSlim来提用读写锁分离的模式,又比如MVC中封装的AsyncResultWrapper是采取Monitor和Interlocked结合的方式来进行线程同步的。这一篇真的好长啊。鼓励一下自己,加油,嘿嘿!!
.net中的线程同步基础(搬运自CLR via C#)