首页 > 代码库 > 分布式异步消息框架构建笔记4——分布是消息路由

分布式异步消息框架构建笔记4——分布是消息路由

上一篇实现了消息的自动路由,这边写了一个小测试,大家可以猜一下运行输出结果是什么?


 public class RouterTest
    {
        public static void DoRouterTest()
        {
            var contextA = Context.Creat("A");
            var contextB = Context.Creat("B");
            contextA.RegServiceClass(typeof (MyClassA));
            contextB.RegServiceClass(typeof (MyClassB));
            Context.CreatProxyAndRun(MyClassB.FunB, (r) =>
            {
              
                Console.WriteLine("FunB CallBack Run in " + Thread.CurrentThread.ManagedThreadId);
            },100);

        }

        public class MyClassA
        {
            public static int FunA(int x)
            {
                Console.WriteLine("FunA Run in " + Thread.CurrentThread.ManagedThreadId);
              
                return x + 10;
            }
        }

        public class MyClassB
        {
            public static IEnumerable<int> FunB(int x)
            {
                Console.WriteLine("FunB Run in " + Thread.CurrentThread.ManagedThreadId);

                Context.Wating(MyClassA.FunA, (r) =>
                {
                    Console.WriteLine("FunA CallBackRun in " + Thread.CurrentThread.ManagedThreadId);
                 
                    x = r;
                }, x);
                yield return -1;

                yield return x - 1;
            }
        }
    }


以上是跨线程的一个分布式的实现,那么我们如何做到跨域、跨进程和跨服务器呢。

从以上代码可以看出,传递进去的参数有回调函数,这个是不可序列化传递的,参数和调用方法之类的只要可序列化都是没问题的。

先看一下基础的消息处理状态:(晚点可能上个流程图)

1.创建一个Request请求,包含方法,参数,以及回调,这事消息状态为Wating,丢入当前队列

2.事件处理器收到请求后,执行对应的目标方法,把结果包装成一个WatingCallback状态的待处理对象

2.1如果对象是Yield对像,则包装成Yield状态的待处理对象

3.当消息处理器发现这个待处理对象是Yield状态,执行MoveNext

4.如果采用Wating方法则把当前句柄的待处理对象变成YieldWating状态

5.YieldWating会不断的把自己丢入队列,直到当前句柄被释放,重新变成Yield状态

6.当MoveNext不可用,也就是执行完毕,那么这个结果状态会变成WatingCallback

7.WatingCallback状态的对象,执行回调后,完成生命周期,状态变成Dead


然后是跨线程的流程与基本流程的区别:(红色表示)

1.创建一个Request请求,包含方法,参数,以及回调,这事消息状态为Wating,丢入事件路由,时间路由会寻找指定的Context发布

2.事件处理器收到请求后,执行对应的目标方法,把结果包装成一个WatingCallback状态的待处理对象

2.1如果对象是Yield对像,则包装成Yield状态的待处理对象

3.当消息处理器发现这个待处理对象是Yield状态,执行MoveNext

4.如果采用Wating方法则把当前句柄的待处理对象变成YieldWating状态

5.YieldWating会不断的把自己丢入队列,直到当前句柄被释放,重新变成Yield状态

6.当MoveNext不可用,也就是执行完毕,那么这个结果状态会变成WatingCallback

7.WatingCallback状态的对象,检查事件id,如果不为本Context处理,则丢入事件路由,路由会寻找指定的Context发布

7.1当本事件ID的 WatingCallback状态的对象,执行回调后,完成生命周期,状态变成Dead


调用区别并不是很大,那么用网络传递也变得比较简单了

(由于用到了YieldWating状态,远程传递只能用IEnumerable接口,这个有点不满!!)

1.创建一个Request请求,包含方法,参数,以及回调,这事消息状态为Wating,丢入事件路由,时间路由会寻找指定的Context发布

1.1本地创建一个YieldWating状态,并且把Callback保存起来.

1.2当收到事件RemoteWatingCallBack,移除YieldWating句柄,并把结果传入YieldWating回调;

1.3接着执行YieldWating.

2.远程事件处理器收到请求后,执行对应的目标方法,把结果包装成一个WatingCallback状态的待处理对象

2.1如果对象是Yield对像,则包装成Yield状态的待处理对象

3.当消息处理器发现这个待处理对象是Yield状态,执行MoveNext

4.如果采用Wating方法则把当前句柄的待处理对象变成YieldWating状态

5.YieldWating会不断的把自己丢入队列,直到当前句柄被释放,重新变成Yield状态

6.当MoveNext不可用,也就是执行完毕,那么这个结果状态会变成WatingCallback

7.WatingCallback状态的对象,检查事件id,如果不为本Context处理,则丢入事件路由,路由会寻找指定的Context发布

7.1 当WatingCallBack为远程调用时,把结果包装成RemoteWatingCallBack事件,发布.

7.2当本事件ID的WatingCallback状态的对象,执行回调后,完成生命周期,状态变成Dead


大致流程就是:

1.本地创建等待回调句柄,等待远程返回.

2.远程执行完成后,发布远程回调事件.


最后,看起来和实现起来都不怎么复杂的样子.

分布式异步消息框架构建笔记4——分布是消息路由