首页 > 代码库 > 发布/订阅消息传送模型

发布/订阅消息传送模型

1、发布/订阅模型概览

    发布/订阅(publish-and-subscribe)模型通常被简写为pub/sub模型。在这个模型中,消息生产者成为发布者(publisher),而消息消费者则称为订阅者(subscribe)。在点对点模型中,是将消息发送到一个队列中,而发布/订阅模型则是将消息发布给一个主题。发布/订阅模型最重要的特性如下:

  • 消息通过一个称为主题的虚拟通道进行交换。
  • 每条消息都会传送给称为订阅者的多个消息消费者。订阅者有许多类型,包括持久性、非持久性和动态性。
  • 发布者通常不会知道、也也意识不到哪一个订阅者正在接收主题消息。
  • 消息被推送给消费者,这意味着消息会传送给消费者,而无需请求。消息通过一个称为主题的虚拟通道进行交换。主题就是生产者发布消息和订阅者消费消息的目的地。传送给一个主题的消息被自动推送给所有合格的消费者。
  • 生产者和消费者之间没有耦合。订阅者和发布者可以在运行时动态添加,这使得系统的复杂性可以随时间的推移而增长。
  • 订阅一个主题的每个客户端都会接收到发布该主题的消息副本。发布者生产的单条消息可以复制并分发给成百上千的订阅者。


    使用发布订阅模型时,JMS提供者会立即将发布到一个主题的消息传送给各个订阅者。因此,与点对点模型不同,订阅者并不是通过"扫描主题"来寻找属于它们的消息。相反,是由JMS提供者将消息的一个副本传送给各个订阅者。

    发布/订阅模型和点对点的另一个主要区别是,发布/订阅模型时在把消息复制给每个订阅者时,使用消息选择器;而点对点模型则是在已将消息添加到队列之后,再使用消息选择器。

    订阅者既可以是持久性的,也可以是非持久型的。非持久订阅者只有在当前订阅者是活动的,而且已经连接到主题的时候,才会接收到消息;而持久订阅者会接收发送到该主题的、它所需要的所有消息,而不管该订阅者活动与否。

    订阅者还可以是动态的,或者是受管的。动态持久型订阅者可以实时创建,而受管型订阅者则是静态的,并且JMS提供者也知道这个受管订阅者的存在。

    何时使用发布/订阅消息传送模型:如果要将事件或消息广播到多个消息消费者,就会用到发布/订阅模型。这里最重要的一条就是,这条消息可供多个消费者消费。发布/订阅模

                                               型的原理是将消息的副本推送给多个订阅者。

发布/订阅消息模型的使用:

 JMS初始化

    在TLender类示例中,suoyoudeJMS初始化逻辑都在构造函数中处理。TLender构造函数的代码,除了两处重要的区别外,几乎和QBorrow构造函数完全相同。

    首先,请注意TLender类的连接工厂、连接及会话对象,都和QBorrow类非常类似,除了它使用的是基于主题的接口而不是基于队列的接口:

//连接到提供者、并获得和JMS的连接Context ctx = new InitialContext();TopicConnectionFactory qFactory = (TopicConnectionFactory)ctx.lookup(topiccf);tConnection = qFactory.createTopicConnection();//创建JMS会话tSession = tConnect.createTopicSession(false,Session.AUTO_ACKNOWLEGE);//查找请求和响应队列topic = (Topic)ctx.lookup(topicName);//现在,创建已经完成,启动连接tconnection.start();

   这里需要重点说明的是,尽管现在使用的是基于主题的API,但它的流程和点对点模型所用的基于队列的API是相同的:

  1. 获得JMS提供的一个初始上下文。
  2. 查找连接工厂。
  3. 创建一个JMS连接。
  4. 创建一个JMS会话。
  5. 查找目的地。
  6. 启动连接。

发布消息

    一旦TLender类被初始化,利率就会通过命令行输入进来。这事,系统从main方法中调用了publishRate方法,利率被发布到该主题智商。与点对点的例子不同,一旦消息发布以后,TLender类将不再等待响应。这事根据发布/订阅模型的去耦本质而为之;TLender类并不知道或并不关心谁在订阅利率、它们怎样处理数据,或者由多少订阅者在接收利率信息。也可能会出现这样的情况:一些订阅者正在接收利率数据,并由这个特定的贷方进行抵押利率波动的趋势分析,而其他订阅者(比如TBorrower类)则在进行分析利率,以决定是否再提供贷款。

    在publishRate方法的一开始,我们创建了一个BytesMessage来保存利率数据。可以选择5中JMS消息类型的任何一种,不过我们选择了可移植性最强的BytesMessage:

BytesMessage msg = tSession.createBytesMessage();msg.writeDouble(newRate);

    在创建消息之后,我们接着又创建了TopicPublisher对象,它指定了希望发布消息的主题,然后,我们使用了publish方法发布了消息:

Topicpublisher publisher = tSession.createPublsher(topic);publisher.publish(msg);

    像点对点模型的send方法一样,在TopicSender对象中也有若干种重写的publish方法可用。我们正在使用的这种方法,只接受JMSMessage对象作为唯一的参数。其他重写的方法则允许您指定主题、传送模型、消息优先级,还有消息有效期。由于我们并未制定任何其他值,所有消息优先级被设置为普通(4),传送模型被设置为持久性消息(DeliveryMode.PERSISTENT),而有效期则被设置为0,表示消息将永不过期。所有这些参数都可以使用其他publish方法来重写。

    这里有一点要说明:虽然请求/应答肯定能够适用于发布/订阅模型,但是它在当今基于主题的消息传送模型中并不常见,这主要是因为发布/订阅模型的本质特性:发布/订阅模型通常用于广播事件或信息,它并不期望对该广播作出响应。

持久订阅者和非持久订阅者

    如果您要运行TBorrower类,并随后发布若干种利率,TBorrower类就会得到新利率,并判断这个利率是否合适。不过,如果您要终止TBorrower类,发布一些新利率,随后再重启TBorrower类,您就不会获得在TBorrower类未运行期间发布到该主题的利率。这事为什么?原因就在于您所创建的TBorrower类是一个非持久订阅者:

TopicSubscriber subscribe = tSession.createSubscriber(topic);

    只有非持久订阅者在主动侦听一个主题时,才会接收到消息。否则,它们将会错过这些消息。在发布/订阅模型中,并不存在保存所有消息的"主题"这样一个真实的概念;确切的说,当JMS提供者接收到一条消息时,提供者将为每个订阅者制作该消息的一个副本。如果订阅者不是活动的,它就不会接收该消息的副本。这个概念如图所示5-3所示。

  

    另一方面,持久订阅者会接收发送到该主题的所有消息(依靠该订阅者使用的消息选择器),无论该订阅者活动与否。这通常称为"保存并转发(store-and-forward)"消息传送。持久订阅者”保存并转发"的概念如下: