首页 > 代码库 > Shuttle ESB(五)——发布订阅模式实例实现(2)

Shuttle ESB(五)——发布订阅模式实例实现(2)

我们接上篇文章,继续来介绍Shuttle ESB的Pub/Sub模式。


上一篇文章中,我们已经用语言描述了一个用ESB实现的场景,下面给我给出具体的代码实现。


首先,我们需要了解一下Shuttle ESB各个dll的功能:

      Shuttle.Core.Data:轻量级框架,使用ADO.NET的工厂和接口

      Shuttle.Core.Domain:提供事件调度支持

      Shuttle.Core.Host:通用主机,它能在控制台应用或者windows服务中运行

      Shuttle.Core.Infrastructure:Shuttle ESB的基础服务

      Shuttle.ESB.Core:Shuttle的核心实现

      Shuttle.ESB.Modules:Shuttle的扩展服务

      Shuttle.ESB.MSMQ:Shuttle基于微软消息队列的实现

      Shuttle.ESB.SqlServer:基于SqlServer数据库的表队列


1、消息实体

创建项目PublishSubscribe.Message,并添加OrderCompletedEvent和WorkDoneEvent两个消息事件实体(至于为什么是两个消息实体,我也已经介绍过了),使用两个消息事件实体,是为了防止死循环的出现。


a、OrderCompletedEvent事件消息实体

using System;
namespace PublishSubscribe.Messages
{
    public class OrderCompletedEvent 
    {
        public Guid OrderId { get; set; }
        public String coment { get; set; }
    }
}


b、WorkDoneEvent事件消息实体

namespace PublishSubscribe.Messages
{
    public class WorkDoneEvent
    {
        public string Comment { get; set; }
    }
}


2、消息发布端

消息发布端,主要功能是:需要启动一个ESB实例,主要负责广播消息,同时,监听消息队列,准备接收消息。


a、添加引用

创建控制台应用程序PublishSubscribe.Publish,然后使用NuGet,添加如下引用:

     PublishSubscribe.Messages

     Shuttle.Core.Data

     Shuttle.Core.Domain

     Shuttle.Core.Host

     Shuttle.Core.Infrastructure

     Shuttle.ESB.Core

     Shuttle.ESB.Modules

     Shuttle.ESB.SqlServer


b、配置文件app.config

配置文件中,主要配置了数据库连接字符串、以及Shuttle ESB的监听队列信息

<?xml version="1.0"?>
<configuration>
	<configSections>
		<section name="serviceBus" type="Shuttle.ESB.Core.ServiceBusSection, Shuttle.ESB.Core"/>
		<section name="sqlServer" type="Shuttle.ESB.SqlServer.SqlServerSection, Shuttle.ESB.SqlServer"/>
	</configSections>
	<appSettings>
		<add key="SubscriptionManagerSecured" value=http://www.mamicode.com/"false"/>>

c、控制台程序

在控制台程序Program中,主要经过如下几步,启动ESB服务总线实例:连接数据库、获取ESB的各种服务、设置ESB实例指定消息类型的监听队列、启动ESB实例、发送消息。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Shuttle.Core.Data;
using Shuttle.ESB.SqlServer;
using PublishSubscribe.Messages;
using Shuttle.ESB.Core;
using Shuttle.Core.Infrastructure;
namespace PublishSubscribe.Publish
{
    class Program
    {
        static void Main(string[] args)
        {
            //连接数据库
            new ConnectionStringService().Approve();
            //获取ESB的各种服务
            var subscriptionManager = SubscriptionManager.Default();
            //设置ESB实例监听队列类型
            subscriptionManager.Subscribe(
                new[]{
                    typeof(WorkDoneEvent).FullName
                }    
            );
            //启动ESB实例
            var bus = ServiceBus.Create(c=>c.SubscriptionManager(subscriptionManager)).Start();
            ColoredConsole.WriteLine(ConsoleColor.Green, "Server bus started.  Press CTRL+C to stop.");
            while (true) { 
                //控制台输入一个值
                string s = Console.ReadLine();
                //发送消息
                var message = new OrderCompletedEvent
                {
                    coment = s
                };
                bus.Publish(message);
                Console.WriteLine("Published coment= {0}", message.coment);
            }
        }
    }
}


d、一般处理程序

添加类:WorkDoneEventHandler。

该类集成需要继承IMessageHandler接口。它的作用主要是接收并处理接收到的消息。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Shuttle.ESB.Core;
using Shuttle.Core.Infrastructure;
using PublishSubscribe.Messages;
namespace PublishSubscribe
{
    public class WorkDoneEventHandler:IMessageHandler<WorkDoneEvent>
	{
		public void ProcessMessage(HandlerContext<WorkDoneEvent> context)
		{
			ColoredConsole.WriteLine(ConsoleColor.Blue, context.Message.Comment);
		}
        public bool IsReusable
        {
            get { return true; }
        }
    }
}


3、消息接收端A

消息接收端的配置与消息发送端的配置大同小异。因为它们都可以发送消息,也可以接收消息。只不过,由于消息接收端不一定是控制台程序,如果是类库呢?执行后就释放了,怎样接收消息呢(因为ESB实例启动后,程序运行着,才会监听到新收到的消息)?Shuttle ESB使用了通用主机的方式。


a、添加引用

创建类库项目PublishSubscribe.Subscriber1,并添加项目引用:

     PublishSubscribe.Messages

     Shuttle.Core.Data

     Shuttle.Core.Domain

     Shuttle.Core.Host

     Shuttle.Core.Infrastructure

     Shuttle.ESB.Core

     Shuttle.ESB.Msmq

     Shuttle.SqlServer


b、配置文件app.config

配置文件与消息发送端的配置基本类似,主要配置了数据库连接字符串、以及Shuttle ESB的监听队列信息。

<?xml version="1.0"?>
<configuration>
  <configSections>
    <section name="serviceBus" type="Shuttle.ESB.Core.ServiceBusSection, Shuttle.ESB.Core"/>
  </configSections>
  <connectionStrings>
    <clear/>
    <add name="Subscription" connectionString="Uid=sa;Pwd=123456;Initial Catalog=shuttle;Data Source=172.22.51.180;Connect Timeout=900" providerName="System.Data.SqlClient"/>
  </connectionStrings>
  <serviceBus>
    <inbox 
      workQueueUri="msmq://./pubsub-subscriber1-inbox-work" 
      errorQueueUri="msmq://./shuttle-pubsub-error"/>
  </serviceBus>
<startup><supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0"/></startup></configuration>


c、类库项目

这里的ServiceBus1Host类,需要继承Shuttle ESB的IHost接口,并实现它的Start方法。

这里,它需要实现的功能也完全类似于消息发送端:

连接数据库、获取ESB的各种服务、设置ESB实例指定消息类型的监听队列、启动ESB实例、加工处理消息。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Shuttle.ESB.Core;
using Shuttle.Core.Data;
using Shuttle.ESB.SqlServer;
using PublishSubscribe.Messages;
using Shuttle.Core.Infrastructure;
using Shuttle.Core.Host;
namespace PublishSubscribe.Subscriber1
{
    public class ServiceBus1Host:IHost,IDisposable
    {
        private IServiceBus bus;
        public void Dispose()
        {
            bus.Dispose();
        }
        public void Start()
        {
            new ConnectionStringService().Approve();
            var subscriptionManager = SubscriptionManager.Default();
            subscriptionManager.Subscribe(
                new[]{
                    typeof(OrderCompletedEvent).FullName
                }
            );
            bus = ServiceBus.Create(c => c.SubscriptionManager(subscriptionManager)).Start();
            ColoredConsole.WriteLine(ConsoleColor.Green, "Sub 1 started.  Press CTRL+C to stop.");
        }
    }
}


d、一般处理程序

添加Subscriber1Handler类:

该类集成需要继承IMessageHandler接口。它的作用主要是接收消息发送端Pub的消息,经过处理后,并将消息重新发型消息。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Shuttle.ESB.Core;
using PublishSubscribe.Messages;
using Shuttle.Core.Infrastructure;
namespace PublishSubscribe.Subscriber1
{
    public class Subscriber1Handler : IMessageHandler<OrderCompletedEvent>
    {
        public void ProcessMessage(HandlerContext<OrderCompletedEvent> context)
        {
            string comment = string.Format("Subscriber1-----lzq: {0}", context.Message.coment);
            ColoredConsole.WriteLine(ConsoleColor.Blue, comment);
            //发送消息
            context.Publish(
                new WorkDoneEvent { 
                    Comment = comment
                }
            );
        }
        public bool IsReusable
        {
            get { return true; }
        }
    }
}


e、修改启动方式

点击本项目的属性,在调试栏中选择“启动操作”,选择“启动外部应用程序”。选择本项目目录debug下的“Shuttle.Core.Host.exe”作为启动程序。例如我选择的目录如下:“D:\PublishSubscribe\PublishSubscribe.Subscriber1\bin\Debug\Shuttle.Core.Host.exe”


4、消息接收端B

完全仿制消息接收端A,编码消息接收端B。

然后修改解决方案的启动方式为多项目启动:将消息发送端和多个消息接收端均设置为启动项目。


5、结果演示


图1为服务端窗口,图2为两个客户端窗口。程序启动后,会出现下面三个控制台。在服务端输入“Hello World!”后,两个客户端(当然可以为更多个,不过太多客户端会影响接收效率)马上接收到消息。接收到后,它会给服务端返回一个消息,然后服务端又会收到两个客户端的消息。具体结果如图所示。

       

                     (图1)

 

     

                    (图2)


PS:这里的两个客户端就是两台终端显示机器,每个终端显示机器都可以有多个终端显示界面。所以,客户端1和客户端2往往是在不同的运行环境下的。


源代码下载

Shuttle ESB(五)——发布订阅模式实例实现(2)