首页 > 代码库 > c#:ThreadPool实现并行分析,并实现线程同步结束

c#:ThreadPool实现并行分析,并实现线程同步结束

  • 背景:

一般情况下,经常会遇到一个单线程程序时执行对CPU,MEMORY,IO利用率上不来,且速度慢下问题;那么,怎么解决这些问题呢?

据我个人经验来说有以下两种方式:

1、并行、多线程(Parallel、Task、ThreadPool)

2、多进程MutilpleProcess

恰好工作中又一次遇到单线程程序性能低的问题,本次我主要想尝试使用ThreadPool来实现多线程,并且在实现多线程任务同步结束。

测试代码:

 1  static void Main(string[] args) 2         { 3             using (ManualResetEvent finish = new ManualResetEvent(false)) 4             { 5                 int maxThreadCount = 100; 6                 for (var i = 0; i < 100; i++) { 7                     ThreadPool.QueueUserWorkItem((Object state)=> { 8                         Console.WriteLine("task:{0}",state); 9 10                         // 以原子操作的形式递减指定变量的值并存储结果。11                         if (Interlocked.Decrement(ref maxThreadCount) == 0) {12                             // 将事件状态设置为有信号,从而允许一个或多个等待线程继续执行。13                             finish.Set();14                         }                        15                     }, i);16                 }17 18                 // 阻止当前线程,直到当前 System.Threading.WaitHandle 收到信号。19                 finish.WaitOne();20             }21 22             Console.WriteLine("Complete!");23             Console.ReadKey();
  • 实现多线程时,需要注意事项:

可是一般情况下遇到这种业务的情况下,只要修改多线程,必然会遇到某个对象不允许被多个线程操作的问题。

比如:

1、多个线程同时向一个文件中写入内容,这种情况一般使用锁来包成被访问对象的安全性。比如:互斥锁(lock、Mutex)、读写锁(ReadWriteLock)、Monitor、Semaphore(信号灯)、InterLocked(内存共享)等。

2、多个线程同时修改一个非线程安全集合对象(List,Collection,Dictionary,Bag,Queue,Stack,ArrayList,Array,HashTable等)时,往往会抛出异常。针对这种情况,需要使用命名空间System.Collections.Concurrent.*下支持线程安全的集合、字典、队列、栈等对象来替代。

  • 业务场景:

我们需要对一个多行文本文件进行解析,根据具体地址解析其中的经纬度信息。如果解析过程中解析失败的行,需要记录到一个_error.txt;解析成功的记录行,记录到_result.txt。使用单线程分析过程中已经遇到了性能低问题,需求解决方案是使用ThreadPool技术。

  • 业务实现:
  1         private static int maxThreadCount = 0;  2         private static int fakeMaxThreadCount = int.MaxValue;  3         private static ManualResetEvent finish = new ManualResetEvent(false);  4         private static object errorLocker = new object();  5         private static object resultLocker = new object();  6         private static object maxThreadCountLcker = new object();  7   8         public void ParserFile(string filePath)  9         { 10             using (StreamWriter writerError = new StreamWriter(filePath + "_error")) 11             { 12                 using (StreamWriter writerResult = new StreamWriter(filePath + "_result")) 13                 { 14                     finish = new ManualResetEvent(false); 15                     using (StreamReader reader = new StreamReader(filePath)) 16                     { 17                         string line = reader.ReadLine(); 18                         while (line != null) 19                         { 20                             maxThreadCount++; 21                             ThreadPool.QueueUserWorkItem(DoWork, new object[] { line, writerError, writerResult 22 }); 23  24                             line = reader.ReadLine(); 25                         } 26                     } 27  28                     maxThreadCount++; 29                     lock (maxThreadCountLcker) 30                     { 31                         fakeMaxThreadCount = maxThreadCount; 32                     } 33  34                     ThreadPool.QueueUserWorkItem(DoWork, new object[] { }); 35  36                     finish.WaitOne(); 37                     finish.Close(); 38                     finish.Dispose(); 39                 } 40             } 41         } 42  43  44  45         private void DoWork(object state) 46         { 47             object[] objectItem = state as object[]; 48             if (objectItem.Length != 3) 49             { 50                 if (Interlocked.Decrement(ref fakeMaxThreadCount) == 0) 51                 { 52                     finish.Set(); 53                 } 54                 return; 55             } 56             string line = objectItem[0].ToString(); 57             StreamWriter writerError = objectItem[1] as StreamWriter; 58             StreamWriter writerResult = objectItem[2] as StreamWriter; 59  60             try 61             { 62                 string[] fields = line.Split(new char[] { | }); 63  64                 string imsi = fields[0]; 65                 string city = fields[1]; 66                 string county = fields[2]; 67                 string address = fields[3]; 68  69                 // http://restapi.amap.com/v3/geocode/geo?key=7de8697669288fc848e12a08f58d995e&s=rsv3&city=**市&address=**省**市**区**路23号 70                 string uri = " http://restapi.amap.com/v3/geocode/geo"; 71                 string parameter = string.Format("key={0}&s={1}&city={2}&address={3}", "7de8697669288fc848e12a08f58d995e", "rsv3", "**(市名称)", address); 72  73                 // {"status":"1","info":"OK","infocode":"10000","count":"1","geocodes":[{"formatted_address":"***省**市**区***路|23号","province":"***","citycode":"***","city":"***市","district":"***区","township":[],"neighborhood":{"name":[],"type":[]},"building":{"name":[],"type":[]},"adcode":"330105","street":[],"number":[],"location":"120.151367,30.362293","level":"门牌号"}]} 74                 string result = GetRequesetContext(uri, parameter); 75                 if (string.IsNullOrEmpty(result) || result.IndexOf("location") == -1) 76                 { 77                     lock (errorLocker) 78                     { 79                         writerError.WriteLine(result); 80                     } 81                 } 82                 else 83                 { 84                     int indexCount = 0; 85                     List<string> lnglatItems = new List<string>(); 86                     foreach (string resultItem in result.Split(new string[] { "\",\"", ",\"" }, StringSplitOptions.RemoveEmptyEntries)) 87                     { 88                         if (resultItem.IndexOf("location") != -1) 89                         { 90                             indexCount++; 91                             lnglatItems.Add(resultItem.Split(new char[] { : })[1].Replace("\"", string.Empty)); 92                         } 93                     } 94                     if (indexCount == 1) 95                     { 96                         lock (resultLocker) 97                         { 98                             writerResult.WriteLine(address + "|" + lnglatItems[0] + "|" + imsi); 99                         }100                     }101                     else102                     {103                         lock (resultLocker)104                         {                            105                             writerError.WriteLine(address + "|" + string.Join(",", lnglatItems) + "|" + imsi);106                         }107                     }108                 }109             }110             catch (Exception ex)111             {112                 logger.Error("{0}\r\n{1}", ex.Message, ex.StackTrace);113                 lock (errorLocker)114                 {115                     writerError.WriteLine(line);116                 }117             }118             finally119             {120                 lock (maxThreadCountLcker)121                 {122                     if (Interlocked.Decrement(ref fakeMaxThreadCount) == 0)123                     {124                         finish.Set();125                     }126                 }127             }128         }

 

c#:ThreadPool实现并行分析,并实现线程同步结束