首页 > 代码库 > 核心梳理——消息处理的骨架流程——ESFramework 4.0 进阶(02)

核心梳理——消息处理的骨架流程——ESFramework 4.0 进阶(02)

ESFramework 4.0 概述一文中,我们提到ESFramework.dll作为通信框架的核心,定义了消息处理的骨架流程,本文我们来详细剖析这个流程以及该骨架中所涉及的各个组件。ESFramework的骨架流程如下图所示: 

技术分享

一.所有的网络引擎都使用同一消息处理骨架流程

      ESFramework支持TCP/UDP、二进制协议/文本协议、服务端/客户端组合而成的2x2x2=8种引擎,无论是哪一种引擎,都实现了INetEngine接口,也都使用上图所示的消息处理骨架流程来处理所接收到的所有消息。

    所以,只要掌握了这一消息处理的骨架流程,就掌握了ESFramework的核心机密。也只有掌握了该骨架流程,我们才能轻松自如地驾驭ESFramework。

      在该骨架流程中,涉及了多个消息组件,像消息分派器IMessageDispatcher、消息管道IMessagePipe、消息转换器IMessageTransformer、消息监控器IMessageSpy、消息内层分派器NakeDispatcher、消息处理器工厂IMessageProcesserFactory、以及消息处理器IMessageProcesser。

     注意,MessagePipe可以看做是MessageTransformer以及GatewayMessageSpy和InnerMessageSpy的封装。NakeDispatcher内部则包含了MessageProcesserFactory,并且利用MessageProcesserFactory来创建MessageProcesser。

    网络引擎从网络接收到一个消息后会交给MessageDispatcher进行分派,MessageDispatcher在分派消息的时候,首先会让接收到的消息经过MessagePipe进行必要的监控和转换,然后调用NakeDispatcher对消息进行最终分派处理并获取其返回应答消息,然后再让应答消息经过MessagePipe进行必要的监控和转换,最终返回给网络引擎,网络引擎会将最后得到的应答消息通过网络发送出去。

     消息处理的骨架流程图中使用箭头表示出了消息在框架中的流动路径与方向,可以看到,消息进入的路径与返回的路径是对称的。而消息由进入转向为返回的转折点是在消息处理器MessageProcesser,即消息处理器处理接收到的消息并返回应答消息。如果没有应答消息,则消息的路径到达MessageProcesser节点被处理后即终止了。

 

二.消息分派器MessageDispatcher

    消息分派器IMessageDispatcher的基础接口定义如下: 

    public interface IMessageDispatcher
    {
        IAgileLogger EsfLogger{set ;}
        IMessagePipe MessagePipe { set; }        
        INakeDispatcher NakeDispatcher{set ;}   
         
        IMessage DispatchMessage(IMessage reqMsg) ;
    }  

      首先,消息分派器需要使用MessagePipe组件和NakeDispatcher组件,它实际上是要保证一个流程,即在实现DispatchMessage方法的时候,让进入的消息必须先经过消息管道MessagePipe,然后才提交给NakeDispatcher去处理,如果有应答消息,则还要保证应答消息也必须经过MessagePipe之后,才能返回给网络引擎。IMessageDispatcher接口的实现--MessageDispatcher类,保证了这一点。

     其次,DispatchMessage方法的参数即是接收到的进入的消息,返回值即是应答消息,如果没有应答,则返回值为null。

     再次,框架要求DispatchMessage方法绝不能抛出异常,否则,这个异常将冲击到网络引擎组件,这可能导致对应连接或Session上的后续消息将不能被接收。

    ESFramework.Core.MessageDispatcher类确保了DispatchMessage方法不会抛出异常,因为其在实现DispatchMessage方法的内部截获了所有的异常,并将其通过IAgileLogger 记录到日志。这就是为什么IMessageDispatcher需要注入EsfLogger属性的原因。(关于ESFramework中采用的日志模式的更多信息,可以参见ESFramework 4.0 快速上手 -- 异常日志 )

     一般来说,DispatchMessage内部抛出异常的来源通常是两个:

(1)消息处理器抛出异常。由于真正处理消息的是我们应用程序提供的消息处理器,所以如果在业务处理的过程中没有捕获抛出的异常,那么这个异常最终会被框架的消息分派器捕获。

(2)消息管道在监控或变换消息时抛出的异常。比如,加密解密失败等。

 

三.消息管道MessagePipe

 

    MessagePipe由MessageTransformer以及GatewayMessageSpy和InnerMessageSpy构成,其主要作用是监控和变换消息。

    IMessagePipe接口的定义如下:

    public interface IMessagePipe
    {
        IMessageTransformerMessageTransformer{ set;}

        /// <summary>
        /// 工作于网关层,网络组件收到的消息需要经过的第一个组件就是GatewayMessageSpy,
        /// 发送的消息在到达网络组件前经过的最后一个组件也是GatewayMessageSpy
        /// </summary>
        IMessageSpy GatewayMessageSpy { set; }

        /// <summary>
        /// 接收的消息到达处理器之前经过的最后一个组件就是InnerMessageSpy,
        /// 处理器返回的结果消息经过的第一个组件也是InnerMessageSpy
        /// </summary>
        IMessageSpy InnerMessageSpy { set; }    

        /// <summary>
        /// 消息在发送之前经过管道处理。
        /// </summary>        
        IMessage PipeOutMessage(IMessage msg);

        /// <summary>
        /// 接收到的消息在被NakeDispatcher分派之前经过管道处理。
        /// </summary>
        IMessage PipeInMessage(IMessage msg);

        /// <summary>
        /// 当消息在Pipe中传递被Pipe丢弃时,触发此事件。
        /// </summary>
        event CbGeneric<IMessage> MessageForbidden;
    }

     当消息由MessageDispatcher分派给MessagePipe时,首先经过的组件是GatewayMessageSpy,其次经过MessageTransformer,最后经过InnerMessageSpy到达NakeDispatcher组件;而NakeDispatcher返回的应答消息所经过的路径刚好与此相反。ESFramework.Core.MessagePipe类的PipeInMessage方法和PipeOutMessage方法的实现保证了这一点。

 

1.消息监控器MessageSpy

     GatewayMessageSpy和InnerMessageSpy都是MessageSpy类型的组件,其都继承自IMessageSpy接口: 

    public interface IMessageSpy
    {
        /// <summary>
        /// 监控所有收到的消息,如请求消息,返回false表明丢弃消息。
        /// </summary>       
        bool SpyMessageReceived(IMessage msg); 

        /// <summary>
        /// 监控所有即将发送的消息,如回复消息,返回false表明丢弃消息。
        /// </summary>       
        bool SpyMessageToBeSent(IMessage msg);
    }

      对于进入的消息,将调用SpyMessageReceived方法对其监控;而对于返回的消息,将调用SpyMessageToBeSent方法对其监控。

  之所以称为“监控”,是因为MessageSpy不会修改消息,它不会改变消息的任何内容;而负责修改或变换消息的是MessageTransformer组件。这一点正是MessageSpy和MessageTransformer的本质区别所在,不同类型的消息组件,职责不一样。

     至于MessageSpy要做什么样的监控,是由我们具体的应用决定的,比如,我们的应用需要记录接收到的最后10条“原始”的消息,那么就可以实现一个MessageSpy,挂接在MessagePipe的GatewayMessageSpy属性上即可。

     除了监控之外, MessageSpy还有个特权 -- 它能决定放行哪些消息、丢弃哪些消息,这由SpyMessageReceived方法和SpyMessageToBeSent方法返回的bool值体现出来。如果返回值为false,则表示丢弃该消息。消息被丢弃后,到此终结,不会再被传递到下一个消息组件。

     一般在什么情况下,MessageSpy会丢弃消息了?比如说,被监控的消息格式不正确,消息可能是黑客模拟的"恶意"的消息;又比如说,在使用UDP通信时,接收到的消息是不完整的,等等。这些消息在MessageSpy这个环节就被过滤掉,不会污染到后续的消息组件,特别是不会给消息处理器带来额外的麻烦。

     如果我们的应用不需要任何消息监控,则可以使用一个“占位符”消息监控器挂接到MessagePipe,ESFramework提供了null object模式实现的ESFramework.Core.EmptyMessageSpy作为这个“占位符”供应用直接使用。

 

2.消息转换器 MessageTransformer

  刚刚提到,MessageTransformer对消息可以进行变换,经过MessageTransformer组件的消息,其内容会被改变。这就为我们对消息的加密、解密、压缩、解压等等提供了挂接点。比如,网络上传递的消息都是加密的,网络引擎接收到这些加密的消息传递给分派器,分派器又传递给了MessagePipe,所以,GatewayMessageSpy作为消息进入MessagePipe的第一个组件,其监控到的就是加密的消息;而接下来当消息经过MessageTransformer被解密后,被传递给InnerMessageSpy的消息是已经解密的、是明文的,这种消息在ESFramework中称为赤裸消息“NakeMessage”。所以,InnerMessageSpy监控到的消息是明文的NakeMessage。当接收到的消息被传递到内层消息分派器NakeDispatcher被分派时,已经是NakeMessage,所以内层的消息分派器被称为NakeDispatcher。

     对于NakeMessageDispatcher返回的应答消息,也是明文消息,当其沿出去的方向经过MessageTransformer时,MessageTransformer会对其进行加密,所以,最终网络引擎发送出去的应答消息也是加密的。

     从上面的描述,我们已经看出,InnerMessageSpy和GatewayMessageSpy所处的位置不同而决定了它们监控到的消息的状态不一样。通常,GatewayMessageSpy看到的消息都是经过加密的、压缩的,是密文;而InnerMessageSpy看到的消息都是已经被解密的、被解压缩的,是明文。所以,你的应用到底是需要挂接一个GatewayMessageSpy还是一个InnerMessageSpy,取决于你需要监控到什么状态的消息。

  消息过滤器必须实现IMessageTransformer接口:

    public interface IMessageTransformer
    {
        /// <summary>
        /// 截获即将发出去的消息,可以对截获到的消息进行转换,比如加密、压缩等。
        /// </summary>
        /// <param name="msg">即将发送的消息</param>
        /// <returns>经过截获转换得到的结果</returns>
        IMessage CaptureBeforeSendMessage(IMessage msg);    

        /// <summary>
        /// 截获收到的消息,可以对截获到的消息进行转换,比如解密、解压缩等。
        /// </summary>      
        /// <param name="msg">从网络接收到的消息</param>
        /// <returns>经过截获转换得到的结果</returns>
        IMessage CaptureReceivedMessage(IMessage msg) ;        
    } 

  对于进入的消息,框架将调用MessageTransformer的CaptureReceivedMessage方法对消息进行转换;对于要出去的应答消息,将调用MessageTransformer的CaptureBeforeSendMessage方法对消息进行变换。

     从IMessagePipe接口的定义我们看到,其只能挂接一个MessageTransformer实例,如果我们要挂接多个MessageTransformer,比如,一个MessageTransformer用于加密/解密,一个MessageTransformer用于压缩/解压缩,那该怎么办了?ESFramework提供了一个容器类型MessageTransformer-- ESFramework.Core.ContainerStyleTransformer,它是一个实现了IMessageTransformer接口的容器,内部有一个列表可以容纳多个MessageTransformer实例。由于,ContainerStyleTransformer实现了IMessageTransformer接口,所以,可以将其挂接到MessagePipe组件,而我们再把需要用到的多个MessageTransformer实例放到这个容器里就解决了MessagePipe需要挂接多个MessageTransformer的问题。

     同MessageSpy的情况一样,如果我们的应用不需要加密/解密消息等任何变换,则可以使用一个“占位符”消息过滤器挂接到MessagePipe,ESFramework提供了null object模式实现的ESFramework.Core.EmptyMessageTransformer作为这个“占位符”供应用直接使用。

 

四.内层消息分派器NakeDispatcher

  刚才已经提到,当进入的消息被传递到NakeDispatcher时,已经是明文的了,这个时候,该消息就可以直接被消息处理器处理了。内层消息分派器的接口INakeDispatcher定义如下:

    public interface INakeDispatcher
    {
        IProcesserFactory ProcesserFactory { set; }     
  
        /// <summary>
        /// 在最内部分配消息给最终的消息处理器去处理,并返回处理的结果。
        /// </summary>     
        IMessage DispatchMessage(IMessage msg) ;
    }

    NakeDispatcher需要通过DispatchMessage方法最终分派并处理消息,且返回应答消息(如果有的话)。消息需要被消息处理器处理,那么从哪里获取正确的消息处理器了?是消息处理器工厂。所以NakeDispatcher需要依赖于ProcesserFactory。ESFramework已经提供了ESFramework.Core.NakeDispatcher类实现了INakeDispatcher接口,供我们使用,我们不必再实现此接口。

 

1.消息处理器工厂 ProcesserFactory

  处理器工厂用于创建或返回正确的消息处理器,IProcesserFactory的接口定义如下:

    public interface IProcesserFactory
    {
        /// <summary>
        /// 根据消息的类型创建对应的处理器 。
        /// </summary>
        /// <param name="messageType">消息的类型</param>       
        IMessageProcesser CreateProcesser(int messageType);
    }  

     CreateProcesser方法创建或返回什么样的处理器取决于消息的类型。即NakeDispatcher会从消息的头部IMessagHeader(详情请参见ESFramework 4.0 进阶(01) -- 消息)中取出MessageType属性的值,然后调用IProcesserFactory的CreateProcesser方法获取正确类型的处理器。如果工厂中没有对应的处理器存在,CreateProcesser方法将返回null,这种情况一般只会在调试阶段出现,通常是因为为对应的消息还没有定义相应的消息处理器。如果在正式运行阶段出现CreateProcesser返回null,那将是非常严重的,要么是服务端和客户端所引用的程序集的版本不一致,要么是黑客在向服务器发送试探消息。当CreateProcesser返回null时,NakeDispatcher会将详细的信息记录到日志文件,我们可以从日志中查看到消息的具体内容。

     ESFramework已经提供了ESFramework.Core.ProcesserFactory类实现了IProcesserFactory接口,其内部可以挂接多个消息处理器实例,大部分情况下,我们直接使用这个实现就行了,不需要再实现自己的处理器工厂。并且,ESFramework.Core.ProcesserFactory类的实现采用的缓存,当根据消息类型寻找处理器时,不需要每次都遍历所有的处理器。

  当NakeDispatcher从IProcesserFactory获取到正确的处理器之后,即将消息提交给处理器进行处理。

    

2.消息处理器MessageProcesser

  消息处理器接口IMessageProcesser定义如下:

    public interface IMessageProcesser
    {      
        /// <summary>
        /// 该消息处理器是否能够处理类型为messageType的消息。
        /// </summary>        
        bool CanProcess(int messageType);

        /// <summary>
        /// 处理消息并返回回复消息,如果返回null,表示没有回复消息。
        /// </summary>       
        IMessage ProcessMessage(IMessage message);        
    }    

      一个消息处理器能够处理哪些类型的消息,它自己是最清楚的,所以IMessageProcesser提供了CanProcess方法来让我们可以查询它是否能处理目标类型的消息。

      同需要定义哪些类型的消息一样,继承了IMessageProcesser接口的消息处理器的实现是由具体的应用来做的,当它们定义好之后,我们便可以将它们挂接到处理器工厂来处理消息了,框架会根据消息的类型来自动调用它们(IOC模式)。     

      附带说一下,之所以采用ESPlus可以加快ESFramework的开发,就是因为ESPlus已经为我们预定义好了一批消息类型、消息协议以及消息处理器,我们只要将它们挂接到ESFramework的消息骨架上,就可以运转起来。而如果我们直接基于ESFramework.dll开发,则这些事情需要我们自己手动去打造,虽然麻烦一些,但是足够灵活。ESPlus虽然让我们省了不少事,但是也限制了ESFramework的某些方面,正所谓“有得必有失”。

 

五.实例化一个最简单消息处理流程    

     假设我们的某个项目不需要任何类型的消息监控,也不需要任何类型的消息变换,仅仅需要挂接两个消息处理器即可。那么,我们可以通过类似下面的代码来实例化这个处理流程:

    //构造MessagePipe
    IMessagePipe messagePipe = new MessagePipe();
    messagePipe.InnerMessageSpy = new EmptyMessageSpy();
    messagePipe.GatewayMessageSpy = new EmptyMessageSpy();
    messagePipe.MessageTransformer= new EmptyMessageTransformer();

    //构造NakeDispatcher
    IMessageProcesser messageProcesser1 = ......;
    IMessageProcesser messageProcesser2 = ......;
    IMessageProcesser[] messageProcessers = new IMessageProcesser[] { messageProcesser1, messageProcesser2 };
    IProcesserFactory processerFactory = new ProcesserFactory(messageProcessers);            
    INakeDispatcher nakeDispatcher = new NakeDispatcher(processerFactory);

    //构造MessageDispatcher
    IMessageDispatcher messageDispatcher = new MessageDispatcher(nakeDispatcher, messagePipe);

     上面的组装代码看视简单,却具有非常强的扩展性,如果项目需要,我们可以构造出非常复杂的流程。甚至,由于采用了接口分离原则,我们还可以实现自己的消息分派器或处理器工厂,比如,我们可以实现一个插件处理器工厂,专门从插件中动态地加载所需要的消息处理器(ESPlus提供的ESPlus.Core.Addins.ServerAddinProcesserFactory正是做这件事情的)。

     如果有通信引擎实例,那么将构造好的messageDispatcher对象挂接到通信引擎,就可以使整个流程运转起来了。

 

  关于ESFramework消息处理的骨架流程,就介绍这么多,下一篇我们将深入到ESFramework提供的各种网络引擎内部去看看。that‘s all,thanks.

核心梳理——消息处理的骨架流程——ESFramework 4.0 进阶(02)