首页 > 代码库 > 【狼窝乀野狼】Parallel浅尝辄止

【狼窝乀野狼】Parallel浅尝辄止

     前段时间看到园子里面有同学在用Parallel进行批量插入数据库。后面也有很多同学针对这一事件给出了自己的看法和见解。我在这里不评论内容的好坏,至少能将自己东西总结分享这个是要靠勇气和毅力。

     闲话少说,我在最近看崔鹏飞的github的时候,发现他对这块也做了一定的总结,那么我就他这块进行板书与展示。案例是怎么回事呢?话说我有一个公司,里面需要统计一下总收入,另外有一个公司被我收购了,我一起计算总收入。当一天我收购了N个公司,计算总收入的时候,我们采用并行计算。

技术分享
 1    internal class Company 2     { 3         public decimal TotalIncome; 4  5         public Company Merge(Company that) 6         { 7             Calc(); 8             TotalIncome += that.TotalIncome; 9             return this;10         }11 12         /// <summary>13         /// 复杂运算14         /// </summary>15         private void Calc()16         {17             //TODO:省略500字18         }19     }
View Code

首先我们想到的是采用直接累加就行了吧,这是所谓的线性预算。

        /// <summary>        /// 线性运行        /// </summary>        /// <param name="bigCompany"></param>        /// <param name="smallCompanies"></param>         /// <returns></returns>        private static Company LinearMerge(Company bigCompany, IEnumerable<Company> smallCompanies)        {            foreach (Company smallCompany in smallCompanies)            {                bigCompany.Merge(smallCompany);            }            return bigCompany;        }

采用线性运算,毫无疑问结果是正确的。但是,如果的N大一点,例如30000000个,可能就要花一点时间了。

那么是否我们可以采用并行处理呢?OK,直接上代码。

技术分享
 1         /// <summary> 2         /// 并行处理 3         /// </summary> 4         /// <param name="bigCompany"></param> 5         /// <param name="smallCompanies"></param> 6         /// <returns></returns> 7         private static Company ParallelMerge(Company bigCompany, IEnumerable<Company> smallCompanies) 8         { 9             Parallel.ForEach(smallCompanies, smallCompany => bigCompany.Merge(smallCompany));10             return bigCompany;11         }    
View Code

时间很快,但是结果呢?结果和上面线性的一致么?

那么我如果在并行的基础上面加一把锁呢,保证每次独占资源。

 1         /// <summary> 2         /// 并行加锁 3         /// </summary> 4         /// <param name="bigCompany"></param> 5         /// <param name="smallCompanies"></param> 6         /// <returns></returns> 7         private static Company ParallelMergeLock(Company bigCompany, IEnumerable<Company> smallCompanies) 8         { 9             var obj = new object();10             Parallel.ForEach(smallCompanies, smallCompany =>11             {12                 lock (obj)13                 {14                     bigCompany.Merge(smallCompany);15                 }16             });17             return bigCompany;18         }    

毫无疑问,结果也是正确的,那么耗时可能我们就要关心了。那么耗时究竟怎么样呢?

我们可以采用函数式处理嘛。

 1         /// <summary> 2         /// 函数式合并 3         /// </summary> 4         /// <param name="bigCompany"></param> 5         /// <param name="smallCompanies"></param> 6         /// <returns></returns> 7         private static Company FunctionalMerger(Company bigCompany, IEnumerable<Company> smallCompanies) 8         { 9             return smallCompanies.Aggregate(bigCompany, (buyer, seller) => buyer.Merge(seller));10         }

那么我们在在函数式的基础上面进行并行化处理呢?

 1         /// <summary> 2         /// 函数式的并行化 3         /// </summary> 4         /// <param name="bigCompany"></param> 5         /// <param name="smallCompanies"></param> 6         /// <returns></returns> 7         private static Company FunctionParallelMerge(Company bigCompany, IEnumerable<Company> smallCompanies) 8         { 9             return smallCompanies.AsParallel().Aggregate(() => new Company(), (shell, smallCompany) => shell.Merge(smallCompany), (shell1, shell2) => shell1.Merge(shell2), bigCompany.Merge);10         }

上面提出了一些问题,这里我们用实际的测试数据查看。

测试代码

 1         private static IEnumerable<Company> GenerateSmallCompanies() 2         { 3             return Enumerable.Range(0, 30000000).Select(number => new Company { TotalIncome = number }).ToArray(); 4         } 5  6         private static void PrintMergeResult(Func<Company, IEnumerable<Company>, Company> mergeMethod, string funcApproach) 7         { 8             var stopWatch = new Stopwatch(); 9             stopWatch.Start();10             var mergeResult = mergeMethod(new Company { TotalIncome = 1000000 }, m_SmallCompanies);11             stopWatch.Stop();12             Console.WriteLine("{0}:{1}  Time:{2}", funcApproach, mergeResult.TotalIncome, stopWatch.ElapsedMilliseconds);13         }14 15         private static void TryAll()16         {17             Console.WriteLine("============================");18             PrintMergeResult(LinearMerge, "简单直接     ");19             PrintMergeResult(ParallelMerge, "错误并行    ");20             PrintMergeResult(ParallelMergeLock, "加锁并行    ");21             Console.WriteLine("***********");22             PrintMergeResult(FunctionalMerge,"函数式合并 ");23             PrintMergeResult(FunctionParallelMerge, "函数式并行合并 ");24         }25 26 27         private static readonly IEnumerable<Company> m_SmallCompanies = GenerateSmallCompanies();28         static void Main()29         {30             Console.WriteLine("测试数据30000000个");31             for (int i = 0; i < 5; i++)32             {33                 TryAll();34             }35             Console.ReadKey();36         }

测试结果如下:

技术分享

 

按照理论情况,错误并行应该比直接更快,但是不知道我机器(CPU AMD)上面出现这样的情况,其他情况还算正常。在另一台计算机(CPU Intel)上面运行测试,数据如下:

技术分享

 

【狼窝乀野狼】Parallel浅尝辄止