首页 > 代码库 > Spring JMS 官方文档学习

Spring JMS 官方文档学习

最后部分的XML懒得写了,因为个人更倾向于JavaConfig形式。

为知笔记版本见这里,带格式~

做了一个小demo,放到码云上了,有兴趣的点我

说明:需要先了解下JMS的基础知识。

1、介绍

Spring 提供了一个JMS集成框架,简化了JMS API的使用,类似于Spring提供的JDBC API集成。

JMS可以粗略地划分成两大功能区域,就是消息的生产(production)和消费(consumption)。JmsTemplate 用于消息的生产和同步的消息接收。对于异步消息接收,类似于Java EE的消息驱动,Spring提供了大量了message listener container,可以用于创建Message-Driven POJOs (MDPs)。Spring还提供了一种声明式创建消息监听器的方式。

  • package org.springframework.jms.core 提供了使用JMS的核心功能。它包含了JMS template类,通过处理资源的创建和释放 -- 这类似于JdbcTemplate为JDBC所做的。common设计准则,就是提供帮助方法 来执行common操作,同时为更复杂的使用 将处理任务代理到用户实现的回调接口。该JMS template遵循了同样的设计。该类提供了不同的便捷方法,用于发送消息、同步消费消息、以及将JMS session和消息的producer暴露给用户。
  • package org.springframework.jms.support 提供了JMSException翻译功能。(Spring的一贯行为 -- by larry)
  • package org.springframework.jms.support.converter 提供了一个MessageConverter abstraction,用于在Java objects和JMS messages之间转换。
  • package org.springframework.jms.support.destination 提供了不同的策略来管理JMS destinations,例如,为存储在JNDI中的destinations提供了一个service locator。
  • package org.springframework.jms.annotation 提供了必要的infrastructure 以支持注解驱动的监听器端点  -- @JmsListener。
  • package org.springframework.jms.config 为jms namespace提供了解析器实现;提供了java config支持以配置监听器容器和创建监听器端点。
  • package org.springframework.jms.connection 提供了在独立应用中使用的ConnectionFactory实现。它同样包含了一个Spring PlatformTransactionManager 用于JMS的实现 -- JmsTransactionManager。这允许将JMS作为一个事务性的资源无缝地集成到Spring的事务管理机制中。

2、使用Spring JMS

2.1、JmsTemplate

JmsTemplate是JMS core package的中心类。它简化了JMS的使用,因为它负责处理资源的创建和释放 -- 当发送、同步接收消息时。

使用JmsTemplate的代码,只需要实现回调接口。MessageCreator回调接口负责创建一个消息,使用的是JmsTemplate提供的Session。为了允许更复杂的JMS API使用,回调SessionCallback 提供了JMS session,而ProducerCallback则暴露了Session、MessageProducer pair。

JMS API 暴露了两种类型的send方法,一种使用投递模式、优先级、TTL作为Quality of Service (QOS)参数,一种不携带QOS参数 -- 使用默认值。由于JmsTemplate中有很多send方法,QOS参数的设置已经被暴露为bean properties,以避免让send方法的数量翻倍。类似地,用于同步接收调用的timeout值也是通过setReceiveTimeout设置的。

一些JMS providers允许通过ConnectionFactory的配置来设置QOS的默认值。这会有影响,当调用 MessageProducer’s send method `send(Destination destination, Message message) 时,会使用不同的QOS默认值,而非在JMS spec中指定的值。为了提供一致的QOS管理,JmsTemplate 必须特别地启用它自己的QOS值 -- 通过将 isExplicitQosEnabled设为 true。

为了方便,JmsTemplate还暴露了一个基础的 请求-应答 操作,允许发送一个消息,然后等待一个响应 -- 在该操作创建的一个临时的queue上。

注意:一旦配置完成,JmsTemplate的实例都是线程安全的。这非常重要,因为这意味着你可以配置一个唯一的JmsTemplate实例,然后将其安全地注入到多个bean中。要清楚,JmsTemplate是带状态的,它维持了一个对ConnectionFactory的引用,但该状态不是一个对话状态(双向状态)。

自Spring Framework 4.1起,JmsMessagingTemplate 构建于JmsTemplate之上,提供了消息抽象的集成,例如 org.springframework.messaging.Message。 这允许你以泛泛的方式创建消息来发送。

2.2、连接

JmsTemplate 要求引用一个ConnectionFactory。ConnectionFactory 是JMS spec的一部分,作为JMS的入口。客户端应用会使用它来创建连接 -- 其实现由JMS provider提供,并封装不同的配置参数,许多参数都是供应商特定的,例如SSL配置选项。

当在EJB中使用JMS时,供应商提供了JMS接口的实现,因此,它们可以参与声明式事务管理,可以执行连接和session的池化。为了使用这种实现,Java EE containers 通常要求用户声明一个JMS连接工厂作为EJB或servlet部署描述符中的 resource-ref。为了确保EJB中JmsTemplate对这些功能的使用,客户端应用应该确保它引用了被管理的ConnectionFactory实现。

缓存的消息资源

标准API 涉及到了很多中间件对象的创建。想要发送一条消息,会执行下面的API步骤:

ConnectionFactory->Connection->Session->MessageProducer->send

这ConnectionFactory和Send操作之间,创建和销毁了三个中间件对象。我们提供了两个ConnectionFactory的实现,以优化资源的使用和释放。

SingleConnectionFactory

该实现,会在所有的createConnection()调用时返回相同的Connection,并忽略close()的调用。这在测试环境和独立运行环境中很有用,因为多次JmsTemplate的调用都会使用同一个连接,方便事务管理。

该实现,需要引用标准的ConnectionFactory(通常来自JNDI)。

CachingConnectionFactory

该实现继承了SingleConnectionFactory,并添加了Session、MessageProducer、MessageConsumer的缓存。默认的缓存数量是1,可以使用property sessionCacheSize修改。

注意:实际缓存的sessions数量会多于那个值,这是因为session的缓存是基于它们的应答模式(acknowledgment mode),所以,最多可能四倍的数量。

MessageProducer和MessageConsumer会缓存在各自的session中。

MessageProducer是基于它们的destination而缓存的。MessageConsumer是基于一个key而缓存的,该key由destination、selector、noLocal delivery flag、以及durable subscription name(如果创建了durable consumers的话)组成。

2.3、Destination管理

类似于ConnectionFactories,Destinations也是JMS管理的对象,也可以被存储,可以通过JNDI获取。

在配置一个Spring的application context的时候,你可以使用JNDI工厂类 JndiObjectFactoryBean / <jee:jndi-lookup> 来执行依赖注入,注入到对JMS destinations引用的对象中。然而,经常,这种策略是冗长的 -- 如果有大量的destinations 或者有不同于JMS provider的高级destination管理特性。这种高级destination管理的例子,可能是动态destinations的创建 或者 对于destinations层级命名空间的支持。

JmsTemplate使用DestinationResolver接口的实现来代理了destination名字到JMS destination对象的解析。

JmsTemplate默认使用的实现是DynamicDestinationResolver,负责解析动态的destinations。

还提供了JndiDestinationResolver,作为包含在JNDI中的destinations的一个service locator,并能可选地会滚到DynamicDestinationResolver的行为。

很常见的一种情况是,JMS应用中的destinations仅在运行时才知道,因此,无法在应用部署时创建。这很常见是因为 在交互式系统组件之间有一种共享的应用逻辑,就是在运行时按照一种众所周知的命名惯例来创建destinations。即使动态destinations的创建不是JMS spec的一部分,大多数提供商仍然提供了这种功能。动态destinations是使用用户定义的名字创建的,这使得它们与临时的destinations不同,且经常不在JNDI中注册。创建动态destinations的API随着提供商的不同而不同,因为关键到destination的properties通常都是提供商特有的。However, a simple implementation choice that is sometimes made by vendors is to disregard the warnings in the JMS specification and to use the TopicSession method createTopic(String topicName) or the QueueSession method createQueue(String queueName) to create a new destination with default destination properties. Depending on the vendor implementation, DynamicDestinationResolver may then also create a physical destination instead of only resolving one.

JmsTemplate的boolean property pubSubDomain 被用于配置使用什么样的JMS domain。其默认值是false,表明是一个p2p domain,会使用Queues。JmsTemplate使用该property来决定动态destination解析的行为 -- 通过DestinationResolver接口的实现。

你还可以为JmsTemplate配置一个默认的destination -- 通过property defaultDestination。当不指定destination时,就会使用这个默认的destination来发送和接收。

2.4、Message Listener Containers  消息监听器容器

在EJB世界中 JMS消息的最常见用法的一种是,驱动 消息驱动的beans(drive message-driven beans MDBs)。Spring提供了一种解决方案来创建message-driven POJOs (MDPs),不需要将用户绑定到EJB容器。(见4.2、异步接收 - MDPs)。自Spring Framework 4.1起,可以使用@JmsListener来注解端点方法,详见6、注解驱动的监听器端点。

消息监听器容器是被用于接收来自JMS消息队列的消息,并驱动注入其中的MessageListener。该监听器容器负责所有的消息接收线程,并分发到监听器处理。消息监听器容器,是MDP和消息提供者之间的中间件,负责注册接收消息、参与事务、资源获取和释放、异常转换等等。这可以让你作为一个应用开发者专注业务逻辑。

Spring提供了两种标准JMS消息监听器容器,每种都有其独特的特色集。

SimpleMessageListenerContainer

该消息监听器容器是两种标准flavors的简化版。它会在启动时创建固定数量的JMS sessions和consumers,使用标准的JMS MessageConsumer.setMessageListener()方法注册监听器,并由JMS提供者来执行监听器回调。该变体不允许动态适配运行时需求,不允许参与外部被管理的事务。

在兼容性方面,非常类似于独立的JMS spec,但一般不兼容Java EE的JMS限制。

注意:虽然不允许参与外部被管理的事务,但支持native JMS事务:将sessionTransacted设为true即可,或者在namespace中将acknowledge设为transacted,此时,你的监听器抛出的异常都会导致回滚,消息会重新投递。

或者,考虑使用‘CLIENT_ACKNOWLEDGE‘ mode,也会在异常时重新投递,但不使用transacted Sessions,因此,没有包含任何其他Session操作(例如发出响应消息) -- 以事务协议。

DefaultMessageListenerContainer

该消息监听器容器可以用于大多数场景。不同于SimpleMessageListenerContainer,该容器变体允许动态适配运行时需求,且参与外部被管理的事务。

接收到的每个消息,都会被注册一个XA事务 -- 当配置了JtaTransactionManager时;因此,其处理可能利用XA事务语义。

该消息监听器容器试图在降低对JMS provider的要求、高级功能(如参与外部的被管理的事务)和Java EE环境的兼容性之间做出一个平衡。

该容器的缓存级别可以自定义。注意,当不启用缓存时,每次消息接收都会创建新的connection和session。当与高负载下的非持久化订阅结合时,可能会导致消息的丢失。务必使用一个恰当的缓存级别。

当broker挂掉时,该容器也具备恢复能力。默认,一个简单的BackOff实现会每隔5秒重试一次。也可以指定一个自定义的BackOff实现,例子可以参考ExponentialBackOff。

注意:和SimpleMessageListenerContainer类似,DefaultMessageListenerContainer也支持native JMS事务,还允许自定义应答模式。(略)

2.5、事务管理

Spring提供了JmsTransactionManager,负责管理单一JMS ConnectionFactory的事务。它允许JMS应用利用Spring的事务管理特性。

JmsTransactionManager执行本地资源事务,从指定的ConnectionFactory到线程 绑定一个JMS Connection/Session pair。

JmsTemplate自动侦测该事务性的资源,并操作它们。

在一个JavaEE环境中,ConnectionFactory会池化Connections和Sessions,因此,资源会在事务之间被复用。在独立环境中,使用Spring的SingleConnectionFactory 会导致一个共享的JMS Connection,每个事务都有其独立的Session。或者,考虑使用provider-specific pooling adapter,如ActiveMQ的PooledConnectionFactory。

JmsTemplate也可以使用JtaTransactionManager 和 XA-capable JMS ConnectionFactory 来执行分布式事务。注意,这要求使用JTA事务管理器 以及 合适的XA-configured ConnectionFactory!(详见Java EE server / JMS provider 的文档。)

当使用JMS API来从一个Connection创建一个Session时,在被管理和未被管理的事务性环境中 复用代码 可能混淆。 这是因为JMS API只有一个工厂方法来创建Session,它要求设置事务和应答模式。在一个被管理的环境中,这些值的设置是环境的事务性设施来负责的,因此 这些值会被供应商的JMS Connection封装所忽略。当在未被管理的环境中使用JmsTemplate时,你可以通过使用properties sessionTransacted和sessionAcknowledgeMod来指定这些值。当配合PlatformTransactionManager时,JmsTemplate会总是使用一个事务性的JMS Session。

3、发送消息

JmsTemplate 包含了很多便利方法 来发送消息。有需要使用Destination对象来指定destination的方法,也有使用String来指定destination的方法。那些没有指定destination的方法,会使用默认的destination。

  1. import javax.jms.ConnectionFactory;
  2. import javax.jms.JMSException;
  3. import javax.jms.Message;
  4. import javax.jms.Queue;
  5. import javax.jms.Session;
  6. import org.springframework.jms.core.MessageCreator;
  7. import org.springframework.jms.core.JmsTemplate;
  8. public class JmsQueueSender {
  9. private JmsTemplate jmsTemplate;
  10. private Queue queue;
  11. public void setConnectionFactory(ConnectionFactory cf) {
  12. this.jmsTemplate = new JmsTemplate(cf);
  13. }
  14. public void setQueue(Queue queue) {
  15. this.queue = queue;
  16. }
  17. public void simpleSend() {
  18. this.jmsTemplate.send(this.queue, new MessageCreator() {
  19. public Message createMessage(Session session) throws JMSException {
  20. return session.createTextMessage("hello queue world");
  21. }
  22. });
  23. }
  24. }

该例子使用 MessageCreator 回调来创建一个text message -- 使用提供的Session对象。JmsTemplate的构造需要传入一个ConnectionFactory的引用。或者,无参构造,然后设置connectionFactory。或者,考虑从Spring的JmsGatewaySupport 便捷基类中中获取,会提供一个预先构建好JMS配置的bean properties。

方法 send(String destinationName, MessageCreator creator) 可以让你发送一条消息。其destination是字符串形式的,如果是在JNDI中注册的, 那你应该将template的destinationResolver property设为JndiDestinationResolver。

如果你创建了JmsTemplate,并指定了默认的destination,方法send(MessageCreator c) 会直接发送到那个destination。

3.1、使用消息转换器  Message Converters

为了能够发送domain model objects,JmsTemplate有不同的发送方法 接受一个Java对象作为参数 -- 作为消息的内容。

重载方法convertAndSend()和receiveAndConvert() 将这种转换处理代理到MessageConverter接口的实例。该接口定义了一个简单的约定来转换Java对象和JMS消息。

默认的实现是SimpleMessageConverter,支持String和TextMessage、byte[]和BytesMessage、java.util.Map和MapMessage之间的转换。

使用该转换器,你和你的代码可以专注于业务对象。

sandbox 目前包含一个MapMessageConverter,它使用反射来进行JavaBean和MapMessage之间的转换。其他流行的实现包括JAXB、Castor、XMLBeans、XStream,来创建一个TextMessage。

为了容纳消息的properties、headers以及body的设置 (不能通用地封装进一个转换器类中),MessagePostProcessor接口允许在转换之后、发送之前访问该消息。下面的例子演示了如何修改一个消息的header和一个property -- 在java.util.Map被转成消息之后。

  1. public void sendWithConversion() {
  2. Map map = new HashMap();
  3. map.put("Name", "Mark");
  4. map.put("Age", new Integer(47));
  5. jmsTemplate.convertAndSend("testQueue", map, new MessagePostProcessor() {
  6. public Message postProcessMessage(Message message) throws JMSException {
  7. message.setIntProperty("AccountID", 1234);
  8. message.setJMSCorrelationID("123-00001");
  9. return message;
  10. }
  11. });
  12. }

其结果是这样的:

  1. MapMessage={
  2. Header={
  3. ... standard headers ...
  4. CorrelationID={123-00001}
  5. }
  6. Properties={
  7. AccountID={Integer:1234}
  8. }
  9. Fields={
  10. Name={String:Mark}
  11. Age={Integer:47}
  12. }
  13. }

3.2、SessionCallback 和 ProducerCallback

有一些情况下,你可能想要在一个JMS Session或MessageProducer上执行多个操作。SessionCallback 和 ProducerCallback 暴露了JMS Session 和 Session / MessageProducer pair。JmsTemplate的execute() 方法会执行这些回调。

4、接收消息

4.1、同步接收

JMS通常关联到异步处理,但也可以进行同步处理。

重载方法receive(..) 提供这种功能。在一个同步接收中,调用的线程会阻塞住,直到获取了消息。这是一种很危险的操作,因为调用的线程可能永久卡住。property receiveTimeout 可以指定接收器等待多久,而不必一直卡住。

4.2、异步接收 - Message-Driven POJOs 

Spring 还提供了注解式监听器端点 -- 使用@JmsListener。目前为止,这是最便捷的方式来设置一个异步接收器,详见6.2、启用监听器端点注解。

类似于EJB世界里的Message-Driven Bean,Message-Driven POJO扮演了JMS消息的接收器。MDP的一个限制是,它必须实现javax.jms.MessageListener接口   (看一下下面MessageListenerAdapter的讨论)。请注意,POJO在多线程上接收消息时,必须确保你的实现是线程安全的。

下面是MDP的一个简单实现:

  1. import javax.jms.JMSException;
  2. import javax.jms.Message;
  3. import javax.jms.MessageListener;
  4. import javax.jms.TextMessage;
  5. public class ExampleListener implements MessageListener {
  6. public void onMessage(Message message) {
  7. if (message instanceof TextMessage) {
  8. try {
  9. System.out.println(((TextMessage) message).getText());
  10. }
  11. catch (JMSException ex) {
  12. throw new RuntimeException(ex);
  13. }
  14. }
  15. else {
  16. throw new IllegalArgumentException("Message must be of type TextMessage");
  17. }
  18. }
  19. }

当你实现了你的MessageListener,就可以创建一个消息监听器容器了。

下面的例子演示了如何定义和配置一个消息监听器容器(Spring提供的,该例子用的是DefaultMessageListenerContainer)。

  1. <!-- this is the Message Driven POJO (MDP) -->
  2. <bean id="messageListener" class="jmsexample.ExampleListener" />
  3. <!-- and this is the message listener container -->
  4. <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  5. <property name="connectionFactory" ref="connectionFactory"/>
  6. <property name="destination" ref="destination"/>
  7. <property name="messageListener" ref="messageListener" />
  8. </bean>

详情请查阅不同的消息监听器容器的javadocs。

4.3、SessionAwareMessageListener接口

该接口是Spring专有的接口,提供了类似于JMS MessageListener接口的约定,还提供了消息处理方法 -- 通过访问接收Message的JMS Session。

  1. package org.springframework.jms.listener;
  2. public interface SessionAwareMessageListener {
  3. void onMessage(Message message, Session session) throws JMSException;
  4. }

你可以选择让你的MDPs实现该接口(也可以偏好JMS MessageListener) -- 如果你想你的MDPs能够响应任何接收到的消息(使用这里提供的Session)。

请注意:这里的onMessage(..)方法抛出了JMSException。而标准的JMS MessageListener则需要用户自己处理异常。

4.4、MessageListenerAdapter

该类是Spring的异步消息支持的最终组件:简言之,它允许你将几乎任意类暴露为MDP(当然,有一些限制)。

看看下面的接口定义。注意,虽然该接口既没有继承MessageListener,也没有继承SessionAwareMessageListener,它仍然可以被用作一个MDP -- 通过使用MessageListenerAdapter类。

  1. public interface MessageDelegate {
  2. void handleMessage(String message);
  3. void handleMessage(Map message);
  4. void handleMessage(byte[] message);
  5. void handleMessage(Serializable message);
  6. }
  7. public class DefaultMessageDelegate implements MessageDelegate {
  8. // implementation elided for clarity...
  9. }

特别的,注意上面的实现,没有任何JMS依赖!通过下面的配置,我们可以将其制作成一个MDP!!!

  1. <!-- this is the Message Driven POJO (MDP) -->
  2. <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
  3. <constructor-arg>
  4. <bean class="jmsexample.DefaultMessageDelegate"/>
  5. </constructor-arg>
  6. </bean>
  7. <!-- and this is the message listener container... -->
  8. <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  9. <property name="connectionFactory" ref="connectionFactory"/>
  10. <property name="destination" ref="destination"/>
  11. <property name="messageListener" ref="messageListener" />
  12. </bean>

下面是另一个MDP例子,只能处理JMS TextMessage。注意,实际调用的消息处理方法是 receive (而MessageListenerAdapter的默认方法名字是handleMessage),但可以配置(下面有说)。注意,receive(..)方法仅能响应JMS TextMessage消息。

  1. public interface TextMessageDelegate {
  2. void receive(TextMessage message);
  3. }
  4. public class DefaultTextMessageDelegate implements TextMessageDelegate {
  5. // implementation elided for clarity...
  6. }

相应的MessageListenerAdapter的配置如下:

  1. <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
  2. <constructor-arg>
  3. <bean class="jmsexample.DefaultTextMessageDelegate"/>
  4. </constructor-arg>
  5. <property name="defaultListenerMethod" value="http://www.mamicode.com/receive"/>
  6. <!-- we don‘t want automatic message context extraction -->
  7. <property name="messageConverter">
  8. <null/>
  9. </property>
  10. </bean>

注意,上面的messageListener 如果接收了一个JMS类型,而非TextMessage类型,会抛出IllegalStateException。该异常会被消化!!

MessageListenerAdapter的另一个能力就是能够自动发回一个响应Message  -- 如果一个处理器方法返回了non-void值。看看下面的接口和类:

  1. public interface ResponsiveTextMessageDelegate {
  2. // notice the return type...
  3. String receive(TextMessage message);
  4. }
  5. public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
  6. // implementation elided for clarity...
  7. }

如果上面的 DefaultResponsiveTextMessageDelegate 配合 MessageListenerAdapter 使用,然后receive(..)返回的所有的non-null值 都会(默认配置)被转成TextMessage。该TextMessage会被发送至 在源Message中定义的JMS Reply-To property Destination(如果存在),或者发送至 MessageListenerAdapter中设置的默认Destination;如果没有找到Destination,那么会抛出InvalidDestinationException (请注意,该异常不会被消化,而是会传播到上一层调用栈)。

4.5、在事务中处理消息

在一个事务中调用消息监听器,只需要重新配置下监听器容器。

通过设置监听器容器的sessionTransacted,可以很轻松地激活本地资源事务。每个消息监听器调用都会在一个活动的JMS事务中操作,监听器执行失败时 会回滚消息异常。

通过SessionAwareMessageListener发送一个响应消息,也是同一个本地事务的一部分,但任何其他资源操作(如数据库访问)则会独立地操作。这一般要求在监听器实现中进行重复的消息侦测,需要涵盖 数据库处理已提交 但消息处理提交失败的情况。

  1. <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  2. <property name="connectionFactory" ref="connectionFactory"/>
  3. <property name="destination" ref="destination"/>
  4. <property name="messageListener" ref="messageListener"/>
  5. <property name="sessionTransacted" value="http://www.mamicode.com/true"/>
  6. </bean>

为了参与进外部被管理的事务中,你需要配置一个事务管理器,并使用支持外部被管理的事务的监听器容器,通常是 DefaultMessageListenerContainer。

为了配置一个消息监听器容器 使用XA 事务参与,需要配置一个JtaTransactionManager (默认,其会代理到Java EE server的事务子系统中)。注意,底层的JMS ConnectionFactory需要是XA-capable的,并正确地注册到你的JTA事务坐标系中。(详见你的Java EE server的JNDI资源配置)。这允许消息接收 以及数据库访问 成为同一个事务的一部分(统一的提交语义,耗费的是XA事务日志overhead)。

  1. <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

然后,你只需要将其添加到我们之前的容器配置中即可。该容器会负责剩下的部分:

  1. <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  2. <property name="connectionFactory" ref="connectionFactory"/>
  3. <property name="destination" ref="destination"/>
  4. <property name="messageListener" ref="messageListener"/>
  5. <property name="transactionManager" ref="transactionManager"/>
  6. </bean>

5、对JCA 消息端点的支持  (囧,又一个未知领域)

自版本2.5开始,Spring还提供了对于基于JCA的MessageListener容器的支持。

>>>>>>>>>>>>>>>>>>略<<<<<<<<<<<<<<<<<<<

6、注解驱动的监听器端点

异步接收消息 的最简单的方式 是使用注解式监听器端点。简言之,它允许你将一个被管理的bean的方法暴露成一个JMS监听器端点。

  1. @Component
  2. public class MyService {
  3. @JmsListener(destination = "myDestination")
  4. public void processOrder(String data) { ... }
  5. }

上面的例子就是,只要在 javax.jms.Destination "destination"上有可用的消息时,就会调用该processOrder 方法(这种情况下,JMS消息的内容类似于MessageListenerAdapter提供的)。

该注解式端点设施,在幕后 为每一个被注解的方法 创建了一个消息监听器容器  -- 通过使用JmsListenerContainerFactory。该容器没有注册在application context中,但是,通过使用JmsListenerEndpointRegistry bean,可以容易地定位该容器 以便管理。

注意:在Java 8中,@JmsListener 是一个可重复的注解,就是说,可以关联多个JMS destinations到同一个方法中。而在Java 6和7中,你可以使用@JmsListeners 注解。

6.1、启用 监听器端点注解

为了启用注解@JmsListener,你需要在你的一个@Configuration类上添加注解@EnableJms。

  1. @Configuration
  2. @EnableJms
  3. public class AppConfig {
  4. @Bean
  5. public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
  6. DefaultJmsListenerContainerFactory factory =
  7. new DefaultJmsListenerContainerFactory();
  8. factory.setConnectionFactory(connectionFactory());
  9. factory.setDestinationResolver(destinationResolver());
  10. factory.setConcurrency("3-10");
  11. return factory;
  12. }
  13. }

默认,该infrastructure 会查找名字为jmsListenerContainerFactory 的bean,将其作为factory的源 来创建消息监听器容器。 在上面的例子中,忽略掉JMS infrastructure 的设置,processOrder方法会调用3个初始线程、最多10个池线程。

也可以自定义监听器容器工厂,按照每个注解来使用,或者配置一个显式的默认--通过实现JmsListenerConfigurer 接口。如果至少一个端点没有被配置特定的容器工厂,那就会需要该默认。

详见javadoc和下面的例子。

如果你倾向于使用XML配置,请使用<jms:annotation-driven> 元素。

  1. <jms:annotation-driven/>
  2. <bean id="jmsListenerContainerFactory"
  3. class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
  4. <property name="connectionFactory" ref="connectionFactory"/>
  5. <property name="destinationResolver" ref="destinationResolver"/>
  6. <property name="concurrency" value="http://www.mamicode.com/3-10"/>
  7. </bean>

6.2、编码式端点注册

JmsListenerEndpoint 提供一个JMS端点的model,并负责为该model配置容器。除了通过注解JmsListener配置之外,infrastructure 还允许你编码式配置端点。

  1. @Configuration
  2. @EnableJms
  3. public class AppConfig implements JmsListenerConfigurer {
  4. @Override
  5. public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
  6. SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
  7. endpoint.setId("myJmsEndpoint");
  8. endpoint.setDestination("anotherQueue");
  9. endpoint.setMessageListener(message -> {
  10. // processing
  11. });
  12. registrar.registerEndpoint(endpoint);
  13. }
  14. }

在上面的例子中,我们使用了SimpleJmsListenerEndpoint,它负责提供实际的MessageListener,你也可以构建你自己的端点变体。

注意,你还可以不使用@JmsListener,完全使用JmsListenerConfigurer 编码式注册你的端点。

6.3、注解式端点方法签名

到目前为止,我们已经在我们的端点中注入了一个简单的String,但它实际上还可以有更灵活的方法签名。让我们来重写一下,注入Order和自定义header:

  1. @Component
  2. public class MyService {
  3. @JmsListener(destination = "myDestination")
  4. public void processOrder(Order order, @Header("order_type") String orderType) {
  5. ...
  6. }
  7. }

你可以注入到JMS监听器端点的主要元素包括:

原生的javax.jms.Message 或其任意子类(当然需要匹配来信类型)。

javax.jms.Session 以可选的访问native JMS API,如发生一个自定义的应答。

org.springframework.messaging.Message 代表了incoming JMS消息。注意,该消息持有自定义的和标准的headers(如JmsHeaders所定义的)。

@Header 注解的参数,必须能够赋给 java.util.Map以访问所有headers。

一个无注解的元素,且不是被支持的类型(如MessageSession),被认为是payload(内容?)。你可以使用注解@Payload来显式地声明。你还可以打开校验,通过再添加一个@Valid。

注入Spring的Message abstraction的能力特别重要 -- 可以利用所有存储在特定传输中的消息的所有信息,而不需要使用特定传输的API来应答。

  1. @JmsListener(destination = "myDestination")
  2. public void processOrder(Message<Order> order) { ... }

方法签名的处理,是由DefaultMessageHandlerMethodFactory 提供的,DefaultMessageHandlerMethodFactory还可以进一步定制以支持更多的方法参数。还可以同时定制转换和校验。

例如,如果我们想确认我们的Order 是有效的(在处理它之前),我们可以使用@Valid 注解,并配置必要的校验器,如下:

  1. @Configuration
  2. @EnableJms
  3. public class AppConfig implements JmsListenerConfigurer {
  4. @Override
  5. public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
  6. registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
  7. }
  8. @Bean
  9. public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
  10. DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
  11. factory.setValidator(myValidator());
  12. return factory;
  13. }
  14. }

6.4、响应管理

在MessageListenerAdapter中已有的支持 允许你的方法拥有一个non-void返回类型。这种情况下,该返回值会被封装进一个javax.jms.Message 中,发送到源message的JMSReplyTo header中指定的destination,或者发送到监听器配置的默认destination。该默认destination 现在也可以在消息抽象上使用@SendTo 注解来设置。

假定我们的方法processOrder 应该返回一个OrderStatus,我们可以用下面的方式让其自动发送一个响应:

  1. @JmsListener(destination = "myDestination")
  2. @SendTo("status")
  3. public OrderStatus processOrder(Order order) {
  4. // order processing
  5. return status;
  6. }

注意:如果你有几个@JmsListener 注解的方法,你还可以将@SendTo 放在类级别上,以共用一个默认的应答destination。

如果你需要独立于传输之外设置额外的headers,那你应该返回Message,如下:

  1. @JmsListener(destination = "myDestination")
  2. @SendTo("status")
  3. public Message<OrderStatus> processOrder(Order order) {
  4. // order processing
  5. return MessageBuilder
  6. .withPayload(status)
  7. .setHeader("code", 1234)
  8. .build();
  9. }

如果你需要在运行时计算响应destination,你可以把你的响应封装进JmsResponse,它提供了运行时可用的destination。前面的例子可以重写如下:

  1. @JmsListener(destination = "myDestination")
  2. public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
  3. // order processing
  4. Message<OrderStatus> response = MessageBuilder
  5. .withPayload(status)
  6. .setHeader("code", 1234)
  7. .build();
  8. return JmsResponse.forQueue(response, "status");
  9. }

7、JMS namespace支持

Spring提供了XML namespace以简化JMS的配置。想要使用该JMS namespace elements,你需要引用JMS schema:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:jms="http://www.springframework.org/schema/jms"
  5. xsi:schemaLocation="
  6. http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  7. http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">
  8. <!-- bean definitions here -->
  9. </beans>

该namespace由三个顶级的elements构成:<annotation-driven/><listener-container/><jca-listener-container/>。第一个,启用了注解驱动的监听器端点。后两个定义了共享的监听器容器配置,可能包含<listener/> 子元素。下面是一个简单的例子:

  1. <jms:listener-container>
  2. <jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>
  3. <jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/>
  4. </jms:listener-container>

上面的例子,等效于创建两个不同的监听器容器bean定义 和 两个不同的MessageListenerAdapter bean定义 -- 如4.4章节所述。除了以上attributes,listener元素还可以包含几个可选项:

Table 30.1. Attributes of the JMS <listener> element

>>>>>>>>>>>>>>>>后面的部分略,因为个人更喜欢JavaConfig形式<<<<<<<<<<<<<<<<<

官方文档地址:

http://docs.spring.io/spring/docs/current/spring-framework-reference/html/jms.html

Spring JMS 官方文档学习