首页 > 代码库 > .NET 同步与异步 之 线程安全的集合 (十一)

.NET 同步与异步 之 线程安全的集合 (十一)

本随笔续接:.NET 同步与异步 之 警惕闭包(十)

 

无论之前说的锁、原子操作 还是 警惕闭包,都是为安全保驾护航,本篇随笔继续安全方面的主题:线程安全的集合。

先看一下命名空间:System.Collections.Concurrent,常用的类型有(均为泛型):BlockingCollection<T>、ConcurrentBag<T>、ConcurrentDictionary<TKey, TValue>、ConcurrentQueue<T>、ConcurrentStack<T> 。

其中:ConcurrentBag<T> 为无序的集合、ConcurrentDictionary<TKey, TValue> 为词典类型。

ConcurrentQueue<T>、ConcurrentStack<T>  分别为队列 和 栈,而 BlockingCollection<T> 可以看做是 队列 和 栈的进一步封装调用,并提供了阻塞(超时)功能。

 

本随笔着重说两个类型:BlockingCollection<T>  和 ConcurrentDictionary<TKey, TValue>。

一、BlockingCollection<T>

1、先看一下 MSDN 上的Demo

技术分享
        /// <summary>        /// MSDN Demo        /// BlockingCollection<T>.Add()        /// BlockingCollection<T>.CompleteAdding()        /// BlockingCollection<T>.TryTake()        /// BlockingCollection<T>.IsCompleted        /// </summary>        public void Demo1()        {            // Construct and fill our BlockingCollection            using (BlockingCollection<int> bc = new BlockingCollection<int>())            {                int NUMITEMS = 10000;                for (int i = 0; i < NUMITEMS; i++)                {                    bc.Add(i);                }                bc.CompleteAdding();                int outerSum = 0;                // Delegate for consuming the BlockingCollection and adding up all items                Action action = () =>                {                    int localItem;                    int localSum = 0;                    while (bc.TryTake(out localItem))                    {                        localSum += localItem;                    }                    Interlocked.Add(ref outerSum, localSum);                };                // Launch three parallel actions to consume the BlockingCollection                Parallel.Invoke(action, action, action);                base.PrintInfo(string.Format("Sum[0..{0}) = {1}, should be {2}", NUMITEMS, outerSum, ((NUMITEMS * (NUMITEMS - 1)) / 2)));                base.PrintInfo(string.Format("bc.IsCompleted = {0} (should be true)", bc.IsCompleted));            }        }
MSDN Demo

从demo中看一下 BlockingCollection<T> 的用法

1)Add 方法, 将项添加到集合中。

2)CompleteAdding 方法,标记当前实例不可以再添加任何项。

3)TryTake 方法,如果可以从当前集合移除一个项,则返回true,否则返回False. 如果该集合为空,则此方法立即返回 false。

  删除了某个项的顺序取决于用于创建集合的类型 BlockingCollection<T> 实例。 当您创建 BlockingCollection<T> 对象,您可以指定要使用的集合类型(通过构造函数指定)。 例如,可以指定 ConcurrentQueue<T> 先进先出 (FIFO) 行为的对象或 ConcurrentStack<T> 后进先出 (LIFO) 行为的对象。 可以使用任何集合类来实现 IProducerConsumerCollection<T> 接口。 默认集合类型 BlockingCollection<T> 是 ConcurrentQueue<T>

4)IsCompleted 属性,获取此 BlockingCollection<T> 是否已标记为完成添加(即 调用了 CompleteAdding 方法)并且为空。

 

2、限制容量

技术分享
        /// <summary>        /// 限制容量        /// </summary>        public void Demo2()        {            BlockingCollection<int> blocking = new BlockingCollection<int>(5);            Task.Run(() =>            {                for (int i = 0; i < 20; i++)                {                    blocking.Add(i);                    PrintInfo($"add:({i})");                }                blocking.CompleteAdding();                PrintInfo("CompleteAdding");            });            // 等待先生产数据            Task.Delay(500).ContinueWith((t) =>            {                while (!blocking.IsCompleted)                {                    var n = 0;                    if (blocking.TryTake(out n))                    {                        PrintInfo($"TryTake:({n})");                    }                }                PrintInfo("IsCompleted = true");            });        }
限制容量

调研Add方法的时候,如果集合中的项的数量已经达到上限,则Add方法将会被阻塞。

 

3、在 BlockingCollection  中使用Stack

技术分享
         /// <summary>        /// 在 BlockingCollection  中使用Stack        /// </summary>        public void Demo3()        {            BlockingCollection<int> blocking = new BlockingCollection<int>(new ConcurrentStack<int>(), 5);            Task.Run(() =>            {                for (int i = 0; i < 20; i++)                {                    blocking.Add(i);                    PrintInfo($"add:({i})");                }                blocking.CompleteAdding();                PrintInfo("CompleteAdding");            });            // 等待先生产数据            Task.Delay(500).ContinueWith((t) =>            {                while (!blocking.IsCompleted)                {                    var n = 0;                    if (blocking.TryTake(out n))                    {                        PrintInfo($"TryTake:({n})");                    }                }                PrintInfo("IsCompleted = true");            });        }
在 BlockingCollection 中使用Stack

该Demo和之前的Demo的唯一区别就是:构造函数、指定了 ConcurrentStack<T>  类型为存储容器。

 

除此之外、BlockingCollection<T> 还是提供了对超时的控制,例如:TryAdd(T, TimeSpan) 、 TryTake(T, TimeSpan) 等数个重载版本。

 

二、ConcurrentDictionary<TKey, TValue>。

  ConcurrentDictionary<TKey, TValue> 类, 已经实现 IDictionary<TKey, TValue> 接口。也就是说 它也实现了Dictionary 类型的基础功能。

 此外, ConcurrentDictionary<TKey, TValue> 提供了几种方法中添加或更新键/值对在字典中下, 如表中所述。

任务

使用此方法

用法说明

如果它尚不在字典中存在向字典中,添加新的密钥

TryAdd

如果当前不在字典中存在该键,此方法将添加指定的键/值对。 该方法返回 true 或 false具体取决于是否已添加新对。

如果该注册表项具有特定值,更新为现有键在字典中值

TryUpdate

此方法检查是否密钥具有指定的值,如果它存在,则用新值更新该键。 它相当于CompareExchange 方法,但它的用于字典的元素。

无条件地将键/值对存储在字典中,覆盖已存在的键的值

索引器的资源库︰dictionary[key] = newValue

 

将键/值对添加到字典中,或如果键已存在,更新基于键的现有值的键的值

AddOrUpdate(TKey, Func<TKey, TValue>, Func<TKey, TValue, TValue>)

- 或 -

AddOrUpdate(TKey, TValue, Func<TKey, TValue, TValue>)

AddOrUpdate(TKey, Func<TKey, TValue>, Func<TKey, TValue, TValue>) 接受的键和两个委托。 如果键在字典; 中不存在,则使用第一个委托它接受键并返回应添加的键的值。 如果该键不存在; 它使用第二个委托它接受的键和其当前值,并返回应为项设置的新值。

AddOrUpdate(TKey, TValue, Func<TKey, TValue, TValue>) 接受键、 值要添加,以及更新委托。 这是与以前的重载中,相同之处在于它不使用委托来添加的键。

获取此键在字典中,向字典中添加值并将其返回如果该键不存在的值

GetOrAdd(TKey, TValue)

- 或 -

GetOrAdd(TKey, Func<TKey, TValue>)

这些重载提供延迟初始化为键/值对在字典中,添加的值,仅当不存在。

GetOrAdd(TKey, TValue) 采用键不存在要添加的值。

GetOrAdd(TKey, Func<TKey, TValue>) 将一个委托,可将生成的值,如果键不存在。

这些操作是原子性操作,而且都是线程安全的。在 ConcurrentDictionary<TKey, TValue> 类中 唯一的例外是 AddOrUpdate 和 GetOrAdd 方法,它们是使用细粒度锁定,以确保线程安全。 

 

  ConcurrentDictionary<TKey, TValue> 类 在上述的MSDN文档中 已经介绍的差不多了,不再举例。 当然还要提一句, 该类型不支持阻塞操作。

 

三、线程安全警告

在命名空间 System.Collections.Concurrent  中的类型,都遵循如下的线程安全规则:线程安全集合本身提供的方法 是线程安全的,但是通过其类型实现的接口的方法 和 扩展方法 却不是线程安全的。

 

 

随笔暂告一段落、下一篇随笔:  因地制宜——CPU密集型操作和IO密集型操作

附,Demo : http://files.cnblogs.com/files/08shiyan/ParallelDemo.zip

参见更多:随笔导读:同步与异步

 

.NET 同步与异步 之 线程安全的集合 (十一)