首页 > 代码库 > Spring 消息
Spring 消息
RMI、Hessian/Burlap的远程调用机制是同步的。当客户端调用远程方法时,客户端必须等到远程方法完成之后,才能继续执行。即使远程方法不向客户端返回任何消息,客户端也要被阻塞知道服务完成。
消息是异步发送的,客户端不需要等待服务处理消息,甚至不需要等待消息投递完成。客户端发送消息,然后继续执行,这个是因为客户端假定服务最终可以收到并处理这条信息。
在异步消息中有两个主要的概念:消息代理(message broker)和目的地(destination)。当一个应用发送消息时,会将消息交给一个消息代理。消息大理可以确保消息被投递到指定的目的地,同事解放发送者,使其能够继续进行其他的业务。目的地只关注消息应该从哪里获得,而不关心由谁取走消息。
尽管不同的消息系统会提供不同的消息路由模式,但有两种通用的目的地:队列(queue)和主题(topic)。每种类型都与特定的消息模型相关联,分别死点对点(队列)模型和发布/订阅模型(主题)。
点对点消息模式
在点对点的模型中,没一条消息都有一个发送者和一个接受者。当消息代理得到消息时,它将消息放入一个队列中。当接收者请求队列中的下一条消息时,消息会从队列中取出,并投递给接收者。因为消息投递后会从队列中删除,这样就可以保证消息只能投递给一个接受者。尽管消息队列中的没一条消息只能被投递给一个接收者,但并不意味着只能使用一个接收者从队列中获取消息。事实上,可以使用几个接收者来处理队列中的消息。
发布——订阅消息模型
在发布——订阅消息模型中,消息会发送给一个主题。与队列类似,多个接收者都可以都可以监听一个主题。但与队列不同的是,消息不再是只投递给一个接收者,而是主题的所有订阅者都会接收到此消息的副本。
采用同步通信机制访问远程服务的客户端存在几个限制,最主要的是:
同步通信意味着等待。当客户端调用远程服务的方法时,它必须等待远程方法结束后才能继续执行。如果客户端与远程服务频繁通信,或者远程服务响应很慢,就会对客户端应用的性能带来负面影响。
客户端通过服务接口与远程服务相耦合。如果服务的接口发生变化,此服务的所有客户端都需要做相应的改变。
客户端与远程服务的位置耦合。客户端必须配置服务的网络位置,这样它才能知道如何与远程服务进行交互。如果网络拓扑进行调整,客户端也需要重新配置新的网络位置。
客户端与服务的可用性相耦合。如果远程服务不可用,客户端实际上也无法正常运行。
异步通信组要解决一下问题
无需等待:使用JMS发送消息时,客户端不必等待消息被处理,甚至是被投递。客户端只需要将消息发送给消息代理,就可以确信消息会被投递给相应的目的地。
面向信息和解耦:发送异步消息是以数据为中心的。这意味着客户端并没有与特定的方法签名绑定。任何可以处理数据的队列或主题订阅者都可以处理由客户端发送来的消息,而客户端不必了解远程服务的任何规范。
位置独立:同步RPC服务通常需要网络地址来定位。这意味着客户端无法灵活地适应网络拓扑的改变。与之相反,消息客户端不必知道谁会处理它们的消息或服务的位置在哪里。客户端只需要了解需要通过哪个队列或主题来发送消息,因此,只要服务能够从队列或主题中获取消息即可,消息客户端根本不需要关注服务来自哪里。
在点对点模型中,可以利用这种位置的独立性来创建服务的集群。如果客户端不知道服务的位置,并且服务的唯一要求就是可以访问消息代理,那么我们可以配置多个服务从一个同一个队列中接受消息。如果服务过载,处理能力不足,我们只需要添加一些新的服务实例来监听相同的队列就可以了。
在发布——订阅模型中,位置独立性会产生另一种有趣的效应。多个服务可以订阅同一个主题,接收相同消息的副本。但是在每一个服务对消息的处理逻辑却可能有所不同。
确保投递:为了使客户端可以同步服务通信,服务必须监听指定的IP地址和端口。但当发送异步消息时,客户端可以完全相信消息会被投递。即使在消息发送时,服务无法使用,消息也会被存储起来,直到服务重新可以使用为止。
使用JMS发送消息
Java消息服务(Java Message Service, JMS)是一个Java标准,定义了了使用消息代理的通用API。Spring通过基于模板的抽象为JMS功能提供了支持,这个模板就是JmsTemplate。使用JmsTemplate能够肥肠容易地在消息生产方发送队列和主题消息,在消费消息的那一方,也能很容易地接受这些信息。Spring还提供了消息驱动POJO的理念:这是一个简单的Java对象,它能够以异步的方式响应队列或主题上到达的消息。
使用ActiveMQ在Spring中搭建消息代理
创建连接工厂——ActiveMQConnectionFactory
在Spring中配置
<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616" />
或在Spring中声明amq命名空间
<?xml version="1.0" encoding="UTF-8" ?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:amq="http://activemq/apache/org/schema/core" xsi:schemaLocation="http://activemq.apache.org/schema/core www.springframework.org/schema/jms www.springframework.org/schema/jms/spring-jms.xsd www.springframework.org/schema/beans www.springframework.org/schema/beans/spring-beans.xsd"> <amq:connectionFactory id="connectionFactory" brokerURL="tcp://localhost:61616" /> </beans>
声明消息目的地
在Spring中配置
定义队列
<bean id="queue" class="org.apache.activemq.command.ActiveMQQueue" c:_="spitter_queue" />
定义主题
<bean id="topic" class="org.apache.activemq.command.ActiveMQTopic" c:="spitter_queue" />
或在Spring命名空间中声明
声明队列
<amq:queue id="spittleQueue" physicalName="spittle.alert.queue" />
声明主题
<amq:topic id="spittleTopic" physicalName="spittle.alert.topic" />
使用JmsTemplate
JmsTemplate可以创建连接,获得会话以及发送和接收消息。此外,JmsTemplate可以处理所有抛出的JMSException。JmsTemplate将异常捕获并重新抛出一个非检查型异常。
定义JmsTemplate
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" c:_-ref="connectionFactory" />
发送消息
public class SendMessage{ private JmsOperations jmsOperations; @Autowired public SendMessgae(JmsOperations jmsOperations){ this.jmsOperations = jmsOperations; } public void send(final Spittle spittle){ jmsOperations.send("queueName", new MessageCreator(){ public Message createMessage(Session session) throws JMSException{ return session.createObjectMessage(spittle); } }); } }
JmsOperations的send()方法第一个参数是JMS目的地名称,标识消息将发送给谁。我们使用MessageCreator来构造消息。在MessageCreator的createMessage()方法中,通过session创建一个消息对象。
设置默认目的地
在JmsTemplate中装配一个默认的目的地
<bean id="jmsTemplate" class="..." p:defaultDestinationName="queueName"/>
或将队列或主题的目的地bean装配到jmsTemplate中
<bean id="jmsTemplate" class=‘"..." p:defaultDestination-ref="topicBean" />
convertAndSend()
JmsTemplate还提供了convertAndSend()方法,convertAndSend()方法并不需要MessageCreator作参数。它会使用内置的消息转换器(message converter)为我们创建消息。
MessageConverter是Spring定义的接口,有两个方法
public interface MessageConverter{ Message toMessage(Object object, Session session) throws JMSException, MessageConversionException; Object fromMessage(Message message) throws JMSException, MessageConversionException; }
Spring提供了四个实现
MappingJacksonMessageConverter 使用Jackson JSON库实现消息于JSON格式之间的相互转换
MappingJackson2MessageConverter 使用Jackson2 JSON库实现消息于JSON格式之间的相互转换
MarshallingMessageConverter 使用JAXB库实现消息与XML格式之间的相互转换
SimpleMessageConverter 实现String与TextMessage之间的相互转换,字节组与ByteMessage之间的相互转换,Map与MapMessage之间的相互转换以及Serializable对象与ObjectMessage之间的相互转换(默认)
更改消息转换器
声明MessageConverter bean并将其注入到jmsTemplate中
<bean id="messageConverter" class=""org.springframework.jms.support.converter.MappongJacksonMessageConverter" /> <bean id="jmsTemplate" class=".." c:-_ref="connectionFactory" p:messageConverter-ref="messageConverter" />
接收消息
public Spittle receive(){ try{ ObjectMessage receiveMessage = (ObjectMesasge)jmsOperations.getObject(); return (Spittle) receiveMessage.getObject();
}catch(JMSException jmsException){ throw JmsUtils.convertJmsAccessException(jmsException); } }
public Spittle simpleReceive(){
return (Spittle) jmsOperations.receiveAndConvert();
}
使用JmsTemplate的最大缺点在于receive()和receiveAndConvert()方法都是同步的。这意味着接受者必须等待消息的到来,因此方法一直会被阻塞,直到有可用的消息。
创建消息驱动POJO
使用消息监听器
消息监听器(message listener container)可以监控JMS目的地并等待消息到达。一旦有消息到达,它取出消息并传递给任意一个对此消息感兴趣的消息监听器/
声明处理器
<bean id="spittleHandler" class="com.habuma.spittr.alerts.SpittleAlertHandler" />
声明消息监听器
<jms:listener-container connection-factory="connectionFactory"> <jms:listener destionation="queueName" ref="spittleHandler" method="handleSpittleAlert" /> </jms:listener-container>
AMQP(Advanced Message Queuing Protocol)
AMQP为消息定义了线路层(wire-level protocol)的协议,而JMS所定义的是API规范。JMS的API协议能够确保所有的实现都能通过通用的API来使用,但是并不能保证某个JMS实现所发送的消息能够被另外不同的JMS实现所使用。而AMQP的线路层协议规范了消息的格式,消息在生产者和消费者间传送到时候回遵循这个格式。
在JMS中,通道有助于解耦消息的生产者和消费者,但两者依然会与通道相耦合。生产者会将消息发发布到一个特定的队列或主题上,消费者从特定的队列或主题上接收这些消息。通道具有双重责任,即传递数据以及确定这些消息该发送到什么地方,队列的话使用点对点算法发送,主题使用发布——订阅方式。
AMQP的生产者并不会直接将消息发布到队列中。AMQP在消息的生产者以及传递消息的队列之间引入了一种间接机制:Exchange。消息的生产者奖信息发布到一个Exchange。Exchange会绑定到一个或多个队列上,它负责将信息路由到队列上。信息的消费者会从队列中提取数据并进行处理。AMQP定义了四种不同类型的Exchange,每一种都有不同的路由算法,这些算法决定了是否要将信息放到队列中。根据Exchange的算法不同,它可能会使用消息的routing key和/或参数,并将其与Exchange和队列之间binding的routing key和参数进行对比。若果对比结果满足相应算法,则消息会路由到队列上,否则将不会路由到队列上。
四种标准的AMQP Exchange如下所示:
Direct:如果消息的routing key与binding的routing key直接匹配的话,消息将会路由到该队列上
Topic:如果消息的routing key与binding的routing key符合通配符的话,消息将会路由到该队列上
Headers:如果消息参数表中的头信息和值都与binding参数表中相匹配,消息将会路由到该队列上
Fanout:不管消息的routing key和参数表的头信息/值时什么,消息将会路由到所有队列上
使用RabbitMQ
Spring AMQP为RabbitMQ提供了支持,包括RabbitMQ连接工厂,模板以及Spring配置命名空间。
配置连接工厂
配置RabbitMQ连接工厂最简单的方式时使用Spring AMQP提供的rabbit配置命名空间。
<?xml version="1.0" encoding="UTF-8" ?> <beans:beans xmlns="http://www.springframework.org/schema/rabbit" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <connection-factory id="connectionFactory" host="localhost" port="5672"> <beans>
声明队列,Exchange以及binding
AMQP的路由依赖于如何定义队列和Exchange以及如何将它们绑定在一起。声明队列,Exchange和binding的一种方式是使用RabbitMQ Channel接口的各种方法。rabbit命名空间包含了许多元素帮助我们声明队列,Exchange以及将它们结合在一起的binding
<queue> 创建一个队列
<fanout-exchange> 创建一个fanout类型的Exchange
<header-exchange> 创建一个header类型的Exchange
<topic-exchange> 创建一个topic类型的Exchange
<direct-exchange> 创建一个direct类型的Exchange
<bindings><binding /></bindings> 元素定义一个或多个元素的集合,元素创建Exchange和队列之间的binding
这些元素要与<admin>元素一起使用。<admin>元素会创建一个RabbitMQ管理组建(administrative component),它会自动创建上述这些元素苏声明的队列,Exchange以及binding。默认会有一个没有名称的direct Exchange,所有的队列都会绑定到这个Exchange上,并且routing key与队列的名称相同。
<admin connection-factory="connectionFactory" /> <queue name="queue1" /> <queue name="queue2" /> <queue name="queue3" /> <fanout-exchange name="fanoutdemo"> <binding queue="queue1" /> <binding queue="queue2" /> <binding queue="queue3" /> </fanout-exchange>
使用RabbitTemplate
在rabbit命名空间中声明rabbitTemplate
<template id="rabbitTemplate" connection-factory="connectionFactory" />
public class Send{ private RabbitTemplate rabbit; @Autowired public Send(RabbitTemplate rabbit){ this.rabbit = rabbit; } public void send(Spittle spittle){ rabbit.convertAndSend("fanoutdemo", "routing key", spittle); } }
convertAndSend()方法接受三个参数:Exchange名称,routing key以及要发送的对象。如果省略Exchange或routing key,RabbitTemplate将会使用默认值。
在template中指定默认值
<template id="rabbitTemplate" connection-factory="connectionFactory" exchange="exchangeName" routing-key="routingkeyName" />
接收消息
rabbit.receiveAndConvert("queueName");
receive()和receiveAndConvert()方法都会立即返回,如果队列中没有等待的消息,将会得到null。这需要我们来管理轮询以及必要的线程来实现队列的监控。
定义消息驱动的AMQP POJO
首先定义一个handler后在监听器容器中声明这个bean
<bean id="listenerClass" class="com.habuma.spittr.alert.SpittleAlertHandler" /> <listener-container connection-factory="connectionFactory"> <listener ref="listenerClass" method="listen" queue-name="queue1, queue2" /> </listener-container>
Spring 消息