首页 > 代码库 > 分布式消息总线,基于.NET Socket Tcp的发布-订阅框架之离线支持,附代码下载

分布式消息总线,基于.NET Socket Tcp的发布-订阅框架之离线支持,附代码下载

一、分布式消息总线以及基于Socket的实现

     在前面的分享一个分布式消息总线,基于.NET Socket Tcp的发布-订阅框架,附代码下载一文之中给大家分享和介绍了一个极其简单也非常容易上的基于.NET Socket Tcp 技术实现的分布消息总线,也是一个简单的发布订阅框架:

image

    并且以案例的形式为大家演示了如何使用这个分布式消息总线架构发布订阅架构模式的应用程序,在得到各位同仁的反馈的同时,大家也非常想了解订阅者离线的情况,即支持离线构发布订阅框架。

二、离线架构

     不同于订阅者、发布者都同时在线的情况,支持订阅者离线,架构将有所变化,如下图所示:

image

     也会比原先的结构将更加复杂,其中需要处理以下两个关键点:

     1)订阅者的持久化存储。

     2)订阅者离线之后其所订阅消息的持久存储。

三、解决方案

     为解决消息总线的离线支持机制,我们在Socket 框架之中增加了一个接口ISubscribeStorager

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Linq;
   4: using System.Text;
   5:  
   6: namespace EAS.Messages
   7: {   
   8:     /// <summary>
   9:     /// 消息订阅存储接口。
  10:     /// </summary>
  11:     public interface ISubscribeStorager
  12:     {
  13:         /// <summary>
  14:         /// 持久化订阅。
  15:         /// </summary>
  16:         /// <param name="subscriber">订阅者。</param>
  17:         /// <param name="topic">消息主题。</param>
  18:         void Subscribe(string subscriber, string topic);
  19:  
  20:         /// <summary>
  21:         /// 持久化退订。
  22:         /// </summary>
  23:         /// <param name="subscriber">订阅者。</param>
  24:         /// <param name="topic">消息主题。</param>
  25:         void Unsubscribe(string subscriber, string topic);
  26:  
  27:         /// <summary>
  28:         /// 装载订阅信息。
  29:         /// </summary>
  30:         /// <returns>系统之中的订阅清单。</returns>
  31:         List<SubscribeItem> LoadSubscribes();
  32:  
  33:         /// <summary>
  34:         /// 写入消息。
  35:         /// </summary>
  36:         /// <param name="subscriber">订阅者。</param>
  37:         /// <param name="message">消息对象。</param>
  38:         void Write(string subscriber, QueueMessage message);
  39:  
  40:         /// <summary>
  41:         /// 读消息。
  42:         /// </summary>
  43:         /// <param name="subscriber">订阅者。</param>
  44:         /// <param name="message">消息对象。</param>
  45:         /// <returns>成功读取返回true,否则返回false。</returns>
  46:         bool Read(string subscriber, out QueueMessage message);
  47:     }
  48: }

     ISubscribeStorager共提供持久化订阅持久化消息存储共五个函数,其中:

     LoadSubscribes:服务端初始化时读取所有的离线订阅关系,即那个订阅都订阅那那个主题。

     Subscribe:持久化订阅者,当订阅才上线订阅消息时,持久化订阅关系,供离线检测之用。

     Unsubscribe:持久化取消订阅,当订阅者退订消息时,从持久化订阅关系之中删除。

     Write:当订阅者离线时,把订阅消息写入持久化存储。

     Read:当离线订阅者上线时,从持久存储之中读取一条消息向其发送。

     ISubscribeStorager:可以选择自己实现这个接口,以建立满足自己规则的离线存储机制,当然在AgileEAS.NET SOA 中间件之中提供了两种离线存储机制,存储于数据库和存储于MSMQ,下面向大家介绍一下这两种内置实现。

四、两种内置离线存储机制

     在AgileEAS.NET SOA 中间件平台之中提供了两个ISubscribeStorager的实现,基于数据库的离线订阅存储实现EAS.Messages.DbSubscribeStorager和基于MSMQ的离线订阅存储实现EAS.Messages.MsmqSubscribeStorager

     EAS.Messages.DbSubscribeStorager存储订阅关系在messageSubscribe.Config文件之中,消息存储在关系数据库SOA_SUBSCRIBEEVENTS表之中,使用前必须要建立相应的表结构,以下是SQL Server的DDL脚本:

   1: CREATE TABLE [SOA_SUBSCRIBEEVENTS](
   2:     [GUID] [varchar](36) NOT NULL,
   3:     [SUBSCRIBER] [nvarchar](128) NOT NULL,
   4:     [TOPIC] [nvarchar](128) NOT NULL,
   5:     [BODY] [image] NULL,
   6:     [FCTIME] [datetime] NOT NULL,
   7:  CONSTRAINT [PK_SOA_SUBSCRIBEEVENT] PRIMARY KEY CLUSTERED 
   8: (
   9:     [GUID] ASC
  10: )
  11: ) 

      目前理论上支持SQLServer 、Mysql、ORACLE、Sqlite四种数据库结构,具体建表脚本请自行参考相应资料书写,也可以使用AgileEAS.NET SOA中间件所提供的数据库初始化工具创建。

      EAS.Messages.MsmqSubscribeStorager存储订阅关系在messageSubscribe.Config文件之中,消息存储Msmq消息队列之中,使用之前请确保机器上安装了MSMQ消息对列。

五、关于自定义实现ISubscribeStorager

     有兴趣的朋友可以自定义实现接口ISubscribeStorager,这样就可以按自己的规则进行存储,比如把离线消息存储到mongodb、Redis、或者直接存储在文件之中,或者其他更多的实现规则,在此就不一一介绍,如有相关兴趣,请联系作者,如确有必要需要给在家介绍一下如何实现,将会另开一文本介绍如何自定义实现ISubscribeStorager接口。

六、改进在线例子支持离线

     还是跟上次一样,以案例为在家展示一下怎么进行离线消息,就不重新开始例子,对原有例子做一些改进,改进后例子如下:

image

     其中在原有项目的基础上增加了:Demo.Subscriber1和Demo.Subscriber2项目,其项目配置代码、配置文件基本上同Demo.Subscriber一样,其中唯一的差别在于,Demo.Subscriber1和Demo.Subscriber2向服务器提交订阅的时候都增加一个另friendName参数,其使用IMessageBus接口的以下订阅函数:

   1: /// <summary>
   2: /// 订阅消息。
   3: /// </summary>
   4: /// <param name="subscriber">订阅者。</param>
   5: /// <param name="friendName">订阅者名称,用于处理离线订阅。</param>
   6: /// <param name="topic">主题。</param>
   7: /// <param name="notifyHandler">订阅通知。</param>
   8: void Subscribe(object subscriber,string friendName ,string topic, MessageNotifyHandler notifyHandler);

                Demo.Publisher项目为发布者代码。

                Demo.Subscriber项目为订阅者代码。

                Demo.Server项目为服务端代码。

     Demo.Subscriber1项目之中,其Program.cs代码如下:

   1: using System;
   2: using System.Collections.Generic;
   3: using System.Linq;
   4: using System.Windows.Forms;
   5: using EAS.Messages;
   6:  
   7: namespace Demo.Subscriber1
   8: {
   9:     class Program
  10:     {
  11:         static void Main(string[] args)
  12:         {
  13:             var container = EAS.Objects.ContainerBuilder.BuilderDefault();
  14:             var bus = container.GetComponentInstance("MessageBus") as IMessageBus;
  15:             System.Console.WriteLine("Subscriber1");
  16:  
  17:             bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);
  18:             System.Console.ReadLine();
  19:         }
  20:  
  21:         static void MessageNotify(object m)
  22:         {
  23:             Demo.Messages.Message message = m as Demo.Messages.Message;
  24:             System.Console.WriteLine(string.Format("Subscribe:{0}", message.ID));
  25:         }
  26:     }
  27: }

     其中bus.Subscribe(new Program(), "Subscriber1", Demo.Messages.Topics.DEMO_TOPIC, MessageNotify);在订阅消息的时候给了一个friendName为Subscriber1,Demo.Subscriber2与Demo.Subscriber1项目的唯一的差别就是此处为Subscriber2.

     我们使用内置的EAS.Messages.DbSubscribeStorager,则不需要修改服务端的代码,只需要修改服务端的配置文件如下:

   1: <?xml version="1.0" encoding="utf-8"?>
   2: <configuration>
   3:   <configSections>
   4:     <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" />
   5:   </configSections>
   6:   <startup useLegacyV2RuntimeActivationPolicy="true">
   7:     <supportedRuntime version="v4.0"/>
   8:   </startup>
   9:   <eas>
  10:     <objects>
  11:       <!--数据库连接-->
  12:       <object name="DbProvider" assembly="EAS.Data" type="EAS.Data.Access.SqlClientDbProvider" LifestyleType="Thread">
  13:         <property name="ConnectionString" type="string" value="Data Source=.;Initial Catalog=eas_db;Integrated Security=SSPI;Connect Timeout=30" />
  14:       </object>
  15:       <!--数据访问器-->
  16:       <object name="DataAccessor" assembly="EAS.Data" type="EAS.Data.Access.DataAccessor" LifestyleType="Thread">
  17:         <property name="DbProvider" type="object" value="DbProvider"/>
  18:         <property name="Language" type="object" value="TSqlLanguage"/>
  19:       </object>
  20:       <!--ORM访问器-->
  21:       <object name="OrmAccessor" assembly="EAS.Data" type="EAS.Data.ORM.OrmAccessor" LifestyleType="Thread">
  22:         <property name="DataAccessor" type="object" value="DataAccessor"/>
  23:       </object>
  24:       <!--查询语言-->
  25:       <object name="TSqlLanguage" assembly="EAS.Data" type="EAS.Data.Linq.TSqlLanguage" LifestyleType="Thread"/>
  26:       <!--消息持久化-->
  27:       <object name="SubscribeStorager" assembly="EAS.SOA.BootStrap" type="EAS.Messages.DbSubscribeStorager" LifestyleType="Singleton"/>
  28:       <!--日志管理-->
  29:       <object name="Logger" assembly="EAS.MicroKernel" type="EAS.Loggers.TextLogger" LifestyleType="Singleton">
  30:         <property name="RootPath" type="string" value="G:\App.Work\Pub_Sub\Offline\Publish\logs" />
  31:       </object>
  32:     </objects>
  33:   </eas>
  34: </configuration>

     在配置文件的IOC配置之中我们配置了消息存储对象以及其所依赖的数据库访问对象、Linq查询语言表达式,另外需要说明的是,我们需要把配置文件之中所涉及的EAS.Data.dll、EAS.SOA.BootStrap.dll复制到编译输出Publish,这两个文件可以从AgileEAS.NET SOA 中间件平台发布包之中寻找,本案例的下载压碎包之中会包括这两个文件。

     有关于基于Msmq的配置,只需要修改配置文件如下:

   1: <?xml version="1.0" encoding="utf-8"?>
   2: <configuration>
   3:   <configSections>
   4:     <section name="eas" type="EAS.ConfigHandler,EAS.MicroKernel" />
   5:   </configSections>
   6:   <startup useLegacyV2RuntimeActivationPolicy="true">
   7:     <supportedRuntime version="v4.0"/>
   8:   </startup>
   9:   <eas>
  10:     <objects>
  11:       <!--消息持久化-->
  12:       <object name="SubscribeStorager" assembly="EAS.SOA.BootStrap" type="EAS.Messages.MsmqSubscribeStorager" LifestyleType="Singleton"/>
  13:       <!--日志管理-->
  14:       <object name="Logger" assembly="EAS.MicroKernel" type="EAS.Loggers.TextLogger" LifestyleType="Singleton">
  15:         <property name="RootPath" type="string" value="G:\App.Work\Pub_Sub\Offline\Publish\logs" />
  16:       </object>
  17:     </objects>
  18:   </eas>
  19: </configuration>

     到此为止,所有代码均已完成,是不是很简单,接下来,我们跑起来验证一下效果。

七、验证效果

     我们在编译输入目录Publish下先启动Demo.Server.exe,再各启动Demo.Subscriber.exe、Demo.Subscriber1.exe、Demo.Subscriber2.exe,再启动一个Demo.Publisher.exe,在Demo.Publisher.exe控制台按回车键:

N_[]_C%GTD_$0[KL}}B~E$A

目前程序三个订阅者都是在线的,Demo.Publisher发布了三条消息,三个订阅者都收到了三条消息,那么我们关闭Demo.Subscriber2之后再由Demo.Publisher发布两条消息:

R(OM90FW6B0WYO6R)0MQ7D4

然后我们再启动Demo.Subscriber2,看是否还能收到其离线之后由Demo.Publisher发布的两条消息:

ZE8XA`V4PE)5%F~QF}NO]0N

OK,到此为止。

八、源代码下载

     本程序的源代码已上传到服务器,请通过http://42.121.30.77/downloads/eas/Demo.Pub_Sub_Offline.rar进行下载,如果在开发过程之中想要了解更多有关Socket通信框架以及更多AgileEAS.NET SOA中间件平台的技术资源,请通过AgileEAS.NET SOA 网站:http://www.smarteas.net的最新下载栏目进行下载。   

九、问题反馈

     麻烦大家在通过视频进行学习的时候能及时把问题反馈给楼主,或者有什么需要改进的一些建议都请向楼主直接反馈,以下是联系方式:

AgileEAS.NET SOA 网站:http://www.smarteas.net

官方博客:http://eastjade.cnblogs.com

楼主QQ:47920381,AgileEAS.NET

QQ群:113723486(AgileEAS SOA 平台)/上限1000人

199463175(AgileEAS SOA 交流)/上限1000人

120661978(AgileEAS.NET 平台交流)/上限1000人

邮件:james@agilelab.cn,mail.james@qq.com,

电话:18629261335。