首页 > 代码库 > 从无到有实现.net协程(一)

从无到有实现.net协程(一)

协程的概念,就我而言,来源自当初学习Go,它可以用一句话来总结,“单线程无阻塞异步处理”,也就是说,首先,它的范围是针对单个线程来说的,一个线程可以运行多个代码片段,当运行期间遇到IO等待(包括网络IO、磁盘IO等,常见的数据库操作、web服务调用都属于IO等待)时,自动切换到其他代码片段上执行,当执行完毕或再次遇到IO等待时,再回到之前已经完成IO等待的代码片段继续执行,这样,就保证了线程无阻塞,并且多个片段都能被逐步处理(前提是代码中存在IO等待,否则还是顺序处理),以上就是协程的定义,从描述可以看到,这种处理方式特别适合用于高IO、低CPU计算的程序,现实中,大部分的Web应用都是属于这种模式,这就是为什么现在Nodejs, Go这类语言越来越火的原因。

下面的图描述了协程的运行

技术分享

协程的实际处理顺序(实际取决与协程调度程序)

技术分享

在.net中,web处理(asp.net)的模式为多线程异步,其实也很好,也可以做到无阻塞,充分利用CPU资源,这里不谈这种模式,只谈协程模式。

那么.net如何实现协程呢?这里首先必须介绍一个关键字,yield,.net协程就是利用它来实现的

Yield是用来处理迭代器(IEnumerator)的,利用迭代器的特性,间接提供了一种可以中断恢复的处理方式,以代码说话

 1     class Program
 2     {
 3         static void Main(string[] args)
 4         {
 5             var result=TestYield();
 6 
 7             result.MoveNext();
 8             Console.WriteLine(result.Current);
 9 
10             result.MoveNext();
11             Console.WriteLine(result.Current);
12 
13             result.MoveNext();
14             Console.WriteLine(result.Current);
15 
16             result.MoveNext();
17 
18             Console.Read();
19 
20         }
21 
22 
23         static IEnumerator<string> TestYield()
24         {
25             yield return "A";
26             Console.WriteLine("执行完成A");
27             yield return "B";
28             Console.WriteLine("执行完成B");
29             yield return "C";
30             Console.WriteLine("执行完成C");
31         }
32     }

执行结果为

技术分享

从上面的代码可以看出,每当使用MoveNext方法,代码都从yield return语句之后继续执行,当遇到yield return或方法完成,再次返回调用方,这种迭代器模式(迭代器模式是本身就是23种设计模式之一),就提供了分段执行代码的能力,我们通过这种模式,就能用来完成协程,这里有一个特别需要注意的地方,就是,当我们调用TestYield方法时,你会发现它其实并没有被执行,直到我们第一次调用MoveNext方法,该方法才真正开始被执行,还记得Linq吗,Linq说了,只有调用了诸如ToList()、Count()之类的方法,才真正开始计算,和这个是不是很像?其实,在Linq内部,就是使用了迭代器。

好了,现在,已经具备了实现协程的基本元素了,接下来就可以开始构建了,由两个部分组成,协程容器和协程单元,协程容器用来负责调度和执行,协程单元用来处理实际的业务,协程容器提供Register方法来注册每个协程单元,继续用代码来说话

 1     /// <summary>
 2     /// 协程容器接口
 3     /// </summary>
 4     public interface ICoroutineContainer
 5     {
 6         /// <summary>
 7         /// 注册协程单元
 8         /// </summary>
 9         /// <param name="unit">协程单元</param>
10         void Register(ICoroutineUnit unit);
11         /// <summary>
12         /// 执行
13         /// </summary>
14         void Run();
15 }
16     /// <summary>
17     /// 协程单元接口
18     /// </summary>
19     public interface ICoroutineUnit
20     {
21         /// <summary>
22         /// 处理业务
23         /// </summary>
24         /// <returns></returns>
25         IEnumerator<Task> Do();
26 }

这两个接口为整个实现的核心接口,下面的协程容器的基本实现

/// <summary>
    /// 协程容器的基本实现
    /// </summary>
    public class CoroutineContainerBase : ICoroutineContainer
    {
        /// <summary>
        /// 存储协程单元的列表
        /// </summary>
        private List<UnitItem> _units = new List<UnitItem>();
        /// <summary>
        /// 存储新注册的协程单元,与协程单元列表分开,实现注册与执行互不影响
        /// </summary>
        private List<UnitItem> _addUnits = new List<UnitItem>();
        /// <summary>
        /// 错误处理
        /// </summary>
        private Action<ICoroutineUnit, Exception> _errorHandle;

        /// <summary>
        /// 构造函数
        /// </summary>
        /// <param name="errorHandle">错误处理</param>
        public CoroutineContainerBase(Action<ICoroutineUnit, Exception> errorHandle)
        {
            _errorHandle = errorHandle;
        }

        public void Register(ICoroutineUnit unit)
        {
            lock (_addUnits)
            {
                _addUnits.Add(new UnitItem() { Unit = unit, UnitResult = null });
            }

        }

        public void Run()
        {
  	    //开启一个单独任务执行
            Task.Run(() =>
            {
                //循环处理协程单元
                while (true)
                {
                    //将新注册的协程单元加入到列表中
                    lock (_addUnits)
                    {
                        foreach (var addItem in _addUnits)
                        {
                            _units.Add(addItem);
                        }
                        _addUnits.Clear();
                    }

                    //依次处理协程单元
                    foreach (var item in _units)
                    {
                        if (item.UnitResult == null)
                        {
                            var result = item.Unit.Do();

                            //运行到下一个断点
                            try
                            {
                                result.MoveNext();
                            }
                            catch (Exception ex)
                            {
                                _errorHandle(item.Unit, ex);

                                _units.Remove(item);

                                break;
                            }

                            item.UnitResult = result;
                        }
                        else
                        {
                            //检查等待是否已经完成,如果已经完成则继续运行
                            if (item.UnitResult.Current.IsCanceled || item.UnitResult.Current.IsCompleted || item.UnitResult.Current.IsFaulted)
                            {
                                var nextResult = true;
                                try
                                {
                                    nextResult = item.UnitResult.MoveNext();
                                }
                                catch (Exception ex)
                                {
                                    _errorHandle(item.Unit, ex);
                                    _units.Remove(item);

                                    break;
                                }
                                if (!nextResult)
                                {

                                    _units.Remove(item);

                                    break;
                                }
                            }
                        }
                    }
                    
                }
            });

        }


        /// <summary>
        /// 协程单元存储格式
        /// </summary>
        private class UnitItem
        {
            /// <summary>
            /// 协程单元
            /// </summary>
            public ICoroutineUnit Unit { get; set; }
            /// <summary>
            /// 协程单元使用的迭代器
            /// </summary>
            public IEnumerator<Task> UnitResult { get; set; }
        }
    }

 

实现两个协程单元

/// <summary>
    /// 协程单元1
    /// 执行一个网络IO,访问163站点
    /// </summary>
    public class Action1 : ICoroutineUnit
    {
        public IEnumerator<Task> Do()
        {
            Console.WriteLine("开始执行Action1");
            HttpClient client = new HttpClient();

            yield return innerDo();

            Console.WriteLine("结束执行Action1");
        }

        private Task innerDo()
        {
            HttpClient client = new HttpClient();
            return client.GetAsync("http://www.163.com");
        }
    }

    /// <summary>
    /// 协程单元2
    /// 执行一个网络IO,访问163站点
    /// </summary>
    public class Action2 : ICoroutineUnit
    {
        public IEnumerator<Task> Do()
        {
            Console.WriteLine("开始执行Action2");
            yield return innerDo();
            Console.WriteLine("结束执行Action2");
        }

        private Task innerDo()
        {
            HttpClient client = new HttpClient();
            return client.GetAsync("http://www.163.com");
        }
}

 

主程序调用执行

        static void Main(string[] args)
        {
            //错误处理仅仅是将错误显示在控制台里
            Action<ICoroutineUnit,Exception> errorHandle = (unit, ex) =>
              {
                  Console.WriteLine(ex.ToString());
              };
            //初始化协程容器
            ICoroutineContainer coroutineContainerBase = new CoroutineContainerBase(errorHandle);
            //注册Action1
            coroutineContainerBase.Register(new Action1());
            //注册Action2
            coroutineContainerBase.Register(new Action2());
            //运行容器
            coroutineContainerBase.Run();

            Console.Read();

        }

执行结果

技术分享

注意,每次执行的顺序可能不一样,取决于网络速度,但可以明显看到代码的分段执行。

至此,一个最基本的协程框架已经完成

 

从无到有实现.net协程(一)