首页 > 代码库 > java JMS消息队列

java JMS消息队列

本文章的完整代码可从我的github中下载:https://github.com/huangbowen521/SpringJMSSample.git

上一篇文章中介绍了如何安装和运行ActiveMQ。这一章主要讲述如何使用Spring JMS向ActiveMQ的Message Queue中发消息和读消息。

首先需要在项目中引入依赖库。

  • spring-core: 用于启动Spring容器,加载bean。

  • spring-jms:使用Spring JMS提供的API。

  • activemq-all:使用ActiveMQ提供的API。

在本示例中我使用maven来导入相应的依赖库。

pom.xml
1234567891011121314151617181920212223
  <dependencies>    <dependency>      <groupId>junit</groupId>      <artifactId>junit</artifactId>      <version>4.11</version>      <scope>test</scope>    </dependency>      <dependency>          <groupId>org.apache.activemq</groupId>          <artifactId>activemq-all</artifactId>          <version>5.9.0</version>      </dependency>      <dependency>          <groupId>org.springframework</groupId>          <artifactId>spring-jms</artifactId>          <version>4.0.2.RELEASE</version>      </dependency>      <dependency>          <groupId>org.springframework</groupId>          <artifactId>spring-core</artifactId>          <version>4.0.2.RELEASE</version>      </dependency>  </dependencies>

接下来配置与ActiveMQ的连接,以及一个自定义的MessageSender。

springJMSConfiguration.xml
1234567891011121314151617181920212223242526272829303132333435363738
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">        <property name="location">            <value>application.properties</value>        </property>    </bean>    <!-- Activemq connection factory -->    <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">        <constructor-arg index="0" value=http://www.mamicode.com/"${jms.broker.url}"/>    </bean>    <!-- ConnectionFactory Definition -->    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">        <constructor-arg ref="amqConnectionFactory"/>    </bean>    <!--  Default Destination Queue Definition-->    <bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg index="0" value=http://www.mamicode.com/"${jms.queue.name}"/>    </bean>    <!-- JmsTemplate Definition -->    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">        <property name="connectionFactory" ref="connectionFactory"/>        <property name="defaultDestination" ref="defaultDestination"/>    </bean>    <!-- Message Sender Definition -->    <bean id="messageSender" class="huangbowen.net.jms.MessageSender">        <constructor-arg index="0" ref="jmsTemplate"/>    </bean></beans>

在此配置文件中,我们配置了一个ActiveMQ的connection factory,使用的是ActiveMQ提供的ActiveMQConnectionFactory类。然后又配置了一个Spring JMS提供的CachingConnectionFactory。我们定义了一个ActiveMQQueue作为消息的接收Queue。并创建了一个JmsTemplate,使用了之前创建的ConnectionFactory和Message Queue作为参数。最后自定义了一个MessageSender,使用该JmsTemplate进行消息发送。

以下MessageSender的实现。

MessageSender.java
12345678910111213141516
package huangbowen.net.jms;import org.springframework.jms.core.JmsTemplate;public class MessageSender {    private final JmsTemplate jmsTemplate;    public MessageSender(final JmsTemplate jmsTemplate) {        this.jmsTemplate = jmsTemplate;    }    public void send(final String text) {        jmsTemplate.convertAndSend(text);    }}

这个MessageSender很简单,就是通过jmsTemplate发送一个字符串信息。

我们还需要配置一个Listener来监听和处理当前的Message Queue。

springJMSReceiver.xml
12345678910111213141516
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">    <!-- Message Receiver Definition -->    <bean id="messageReceiver" class="huangbowen.net.jms.MessageReceiver">    </bean>    <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">        <property name="connectionFactory" ref="connectionFactory"/>        <property name="destinationName" value=http://www.mamicode.com/"${jms.queue.name}"/>        <property name="messageListener" ref="messageReceiver"/>    </bean></beans>

 

在上述xml文件中,我们自定义了一个MessageListener,并且使用Spring提供的SimpleMessageListenerContainer作为Container。

以下是MessageLinser的具体实现。

MessageReceiver.java
123456789101112131415161718
package huangbowen.net.jms;import javax.jms.*;public class MessageReceiver implements MessageListener {    public void onMessage(Message message) {        if(message instanceof TextMessage) {            TextMessage textMessage = (TextMessage) message;            try {                String text = textMessage.getText();                System.out.println(String.format("Received: %s",text));            } catch (JMSException e) {                e.printStackTrace();            }        }    }}

这个MessageListener也相当的简单,就是从Queue中读取出消息以后输出到当前控制台中。

另外有关ActiveMQ的url和所使用的Message Queue的配置在application.properties文件中。

application.properties
12
jms.broker.url=tcp://localhost:61616jms.queue.name=bar

好了,配置大功告成。如何演示那?我创建了两个Main方法,一个用于发送消息到ActiveMQ的MessageQueue中,一个用于从MessageQueue中读取消息。

SenderApp
123456789101112131415161718192021222324252627282930
package huangbowen.net;import huangbowen.net.jms.MessageSender;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import org.springframework.util.StringUtils;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;public class SenderApp{    public static void main( String[] args ) throws IOException {        MessageSender sender = getMessageSender();        BufferedReader br = new BufferedReader(new InputStreamReader(System.in));        String text = br.readLine();        while (!StringUtils.isEmpty(text)) {            System.out.println(String.format("send message: %s", text));            sender.send(text);            text = br.readLine();        }    }    public static MessageSender getMessageSender() {        ApplicationContext context = new ClassPathXmlApplicationContext("springJMSConfiguration.xml");       return (MessageSender) context.getBean("messageSender");    }}
ReceiverApp.java
12345678910
package huangbowen.net;import org.springframework.context.support.ClassPathXmlApplicationContext;public class ReceiverApp {    public static void main( String[] args )    {        new ClassPathXmlApplicationContext("springJMSConfiguration.xml", "springJMSReceiver.xml");    }}

OK,如果运行的话要先将ActiveMQ服务启动起来(更多启动方式参见我上篇文章)。

1
$:/usr/local/Cellar/activemq/5.8.0/libexec$ activemq start xbean:./conf/activemq-demo.xml

然后运行SenderApp中的Main方法,就可以在控制台中输入消息发送到ActiveMQ的Message Queue中了。运行ReceiverApp中的Main方法,则会从Queue中将消息读出来,打印到控制台。

这就是使用Spring JMS与ActiveMQ交互的一个简单例子了。完整代码可从https://github.com/huangbowen521/SpringJMSSample下载。

//------------------------------------------------------------------------------

 

 

 

 

 

 

 

基于Spring+JMS+ActiveMQ+Tomcat,我使用的版本情况如下所示:

  • Spring 2.5
  • ActiveMQ 5.4.0
  • Tomcat 6.0.30

下面通过学习与配置,实现消息服务的基本功能:发送与接收。Spring对JMS提供了很好的支持,可以通过JmsTemplate来方便地实现消息服务。这里,我们的消息服务不涉及事务管理。下面简单说明实现过程:

先看一下,我们最终的Spring配置文件applicationContext.xml的内容,如下所示:

 

[xhtml] view plaincopy
 
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  4.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd  
  5.         http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">  
  6.   
  7.     <bean id="listenerContainer"  
  8.         class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
  9.         <property name="connectionFactory" ref="connectionFactory"></property>  
  10.         <property name="destination" ref="messageQueue"></property>  
  11.         <property name="messageListener" ref="receiveMessageListener"></property>  
  12.     </bean>  
  13.     <bean id="connectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">  
  14.         <property name="jndiName" value=http://www.mamicode.com/"java:comp/env/myJMS/ConnectionFactory"></property>  
  15.     </bean>  
  16.     <bean id="messageQueue" class="org.springframework.jndi.JndiObjectFactoryBean">  
  17.         <property name="jndiName" value=http://www.mamicode.com/"java:comp/env/myJMS/MessageQueue"></property>  
  18.     </bean>  
  19.     <bean id="receiveMessageListener"  
  20.         class="org.shirdrn.spring.jms.integration.ReceiveMessageListener"></bean>  
  21.   
  22.     <bean id="messageSender" class="org.shirdrn.spring.jms.integration.MessageSender">  
  23.         <property name="jmsTemplate" ref="jmsTemplate"></property>  
  24.     </bean>  
  25.     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
  26.         <property name="connectionFactory" ref="connectionFactory"></property>  
  27.         <property name="defaultDestination" ref="messageQueue"></property>  
  28.     </bean>  
  29.   
  30.     <bean id="sendMessageController"  
  31.         class="org.shirdrn.spring.jms.integration.SendMessageController">  
  32.         <property name="messageSender" ref="messageSender" />  
  33.         <property name="successView" value=http://www.mamicode.com/"/success" />  
  34.     </bean>  
  35.     <bean id="urlMapping"  
  36.         class="org.springframework.web.servlet.handler.SimpleUrlHandlerMapping">  
  37.         <property name="mappings">  
  38.             <props>  
  39.                 <prop key="/sendMessage.do">sendMessageController</prop>  
  40.             </props>  
  41.         </property>  
  42.     </bean>  
  43.     <bean id="viewResolver"  
  44.         class="org.springframework.web.servlet.view.InternalResourceViewResolver">  
  45.         <property name="requestContextAttribute" value=http://www.mamicode.com/"rc" />  
  46.         <property name="viewClass"  
  47.             value=http://www.mamicode.com/"org.springframework.web.servlet.view.JstlView" />  
  48.         <property name="prefix" value=http://www.mamicode.com/"/" />  
  49.         <property name="suffix" value=http://www.mamicode.com/".jsp" />  
  50.     </bean>  
  51.       
  52. </beans>  

 

我们使用Spring的org.springframework.jms.listener.DefaultMessageListenerContainer来收集消息,通过设置一个消息监听器,具体实现类为org.shirdrn.spring.jms.integration.ReceiveMessageListener,代码如下所示:

 

[java] view plaincopy
 
  1. package org.shirdrn.spring.jms.integration;  
  2.   
  3. import javax.jms.JMSException;  
  4. import javax.jms.Message;  
  5. import javax.jms.MessageListener;  
  6. import javax.jms.TextMessage;  
  7.   
  8. import org.apache.log4j.Logger;  
  9.   
  10. public class ReceiveMessageListener implements MessageListener {  
  11.   
  12.     private static final Logger LOG = Logger.getLogger(ReceiveMessageListener.class);  
  13.       
  14.     public void onMessage(Message message) {  
  15.         if (message instanceof TextMessage) {  
  16.             TextMessage text = (TextMessage) message;  
  17.             try {  
  18.                 LOG.info("Received message:" + text.getText());  
  19.             } catch (JMSException e) {  
  20.                 e.printStackTrace();  
  21.             }  
  22.         }  
  23.     }  
  24.   
  25. }  

 

上面,对发送的消息进行监听,并接收处理,我们只是简单地打印出一条日志内容。

对于listenerContainer,还需要注入连接工厂connectionFactory和消息目的destination这两个属性:connectionFactory我们使用ActiveMQ的org.apache.activemq.ActiveMQConnectionFactory,并通过JNDI服务,绑定到名字java:comp/env/myJMS/ConnectionFactory上;而destination属性通过使用ActiveMQ的org.apache.activemq.command.ActiveMQQueue消息队列,也是通过JNDI服务绑定到名字java:comp/env/myJMS/MessageQueue上。所以,在Tomcat的conf/context.xml中的<Context>元素里面加上如下配置:

 

[xhtml] view plaincopy
 
  1. <Resource name="myJMS/ConnectionFactory"   
  2.     auth="Container"     
  3.     type="org.apache.activemq.ActiveMQConnectionFactory"   
  4.     description="JMS Connection Factory"  
  5.     factory="org.apache.activemq.jndi.JNDIReferenceFactory"   
  6.     brokerURL="vm://shirdrnUrl"   
  7.     brokerName="MyActiveMQBroker"/>  
  8.   
  9. <Resource name="myJMS/MessageQueue"   
  10.     auth="Container"   
  11.     type="org.apache.activemq.command.ActiveMQQueue"  
  12.     description="My Message Queue"  
  13.     factory="org.apache.activemq.jndi.JNDIReferenceFactory"   
  14.     physicalName="MyMessageQueue"/>  

 

我们通过使用JmsTemplate来实现消息的发送,所以实现的发送类要将JmsTemplate注入进去,实现代码如下所示:

 

[java] view plaincopy
 
  1. package org.shirdrn.spring.jms.integration;  
  2.   
  3. import javax.jms.JMSException;  
  4. import javax.jms.Message;  
  5. import javax.jms.Session;  
  6. import javax.jms.TextMessage;  
  7.   
  8. import org.apache.log4j.Logger;  
  9. import org.springframework.jms.core.JmsTemplate;  
  10. import org.springframework.jms.core.MessageCreator;  
  11.   
  12. public class MessageSender {  
  13.       
  14.     private static final Logger LOG = Logger.getLogger(MessageSender.class);  
  15.     private JmsTemplate jmsTemplate;  
  16.   
  17.     public void setJmsTemplate(JmsTemplate jmsTemplate) {  
  18.         this.jmsTemplate = jmsTemplate;  
  19.     }  
  20.       
  21.     public void sendMessage(final String message) {  
  22.         LOG.info("Send message: " + message);  
  23.         jmsTemplate.send(new MessageCreator() {  
  24.   
  25.             public Message createMessage(Session session) throws JMSException {  
  26.                 TextMessage textMessage = session.createTextMessage(message);  
  27.                 return textMessage;  
  28.             }  
  29.               
  30.         });  
  31.     }  
  32.   
  33. }  

 

上面基于Spring的MessageCreator来创建消息,通过调用JmsTemplate的send方法发送出去。

对于Web,我们使用了Spring MVC,通过实现一个控制器org.shirdrn.spring.jms.integration.SendMessageController来控制页面消息的发送及其视图的派发。我们实现的SendMessageController类继承自MultiActionController,可以在一个控制器中实现多个Action,代码实现如下所示:

 

[java] view plaincopy
 
  1. package org.shirdrn.spring.jms.integration;  
  2.   
  3. import java.util.HashMap;  
  4. import java.util.Map;  
  5.   
  6. import javax.servlet.http.HttpServletRequest;  
  7. import javax.servlet.http.HttpServletResponse;  
  8.   
  9. import org.springframework.web.servlet.ModelAndView;  
  10. import org.springframework.web.servlet.mvc.multiaction.MultiActionController;  
  11.   
  12. public class SendMessageController extends MultiActionController {  
  13.   
  14.     private String successView;  
  15.     private MessageSender messageSender;  
  16.   
  17.     public ModelAndView sendMessage(HttpServletRequest request,  
  18.             HttpServletResponse response) throws Exception {  
  19.           
  20.         Map<String, Object> retMap = new HashMap<String, Object>();  
  21.         String message = request.getParameter("message");  
  22.         messageSender.sendMessage(message);  
  23.           
  24.         return new ModelAndView(successView, retMap);  
  25.     }  
  26.   
  27.     public String getSuccessView() {  
  28.         return successView;  
  29.     }  
  30.   
  31.     public void setSuccessView(String successView) {  
  32.         this.successView = successView;  
  33.     }  
  34.   
  35.     public MessageSender getMessageSender() {  
  36.         return messageSender;  
  37.     }  
  38.   
  39.     public void setMessageSender(MessageSender messageSender) {  
  40.         this.messageSender = messageSender;  
  41.     }  
  42.   
  43. }  

 

上面调用模型层(Model)的MessageSender来实现发送消息的处理逻辑,如果发送成功,视图派发到successView指定的页面。可以看到,最前面我们给出的Spring配置内容分为三组,最后一组是对控制器的配置:

viewResolver                     视图解析器配置,可以将控制器中指定前缀(/)解析为后缀是.jsp的页面,例如/success解析为/sucess.jsp

urlMapping                         请求URL与控制器的映射,例如对于满足/sendMessage.do模式的请求,都会被指派给sendMessageController去处理

sendMessageController      控制器实现类,里面的方法名称可以自定义,但要在org.springframework.web.servlet.handler.SimpleUrlHandlerMapping中的mappings属性中配置映射

然后,我们需要一个web部署描述文件,web.xml文件配置内容,如下所示:

 

[xhtml] view plaincopy
 
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  4.     xsi:schemaLocation="http://java.sun.com/xml/ns/javaee   
  5.     http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd">  
  6.   
  7.     <context-param>  
  8.         <param-name>contextConfigLocation</param-name>  
  9.         <param-value>  
  10.             classpath:org/shirdrn/spring/jms/integration/applicationContext.xml  
  11.         </param-value>  
  12.     </context-param>  
  13.     <listener>  
  14.         <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>  
  15.     </listener>  
  16.   
  17.     <servlet>  
  18.         <servlet-name>controller</servlet-name>  
  19.         <servlet-class>  
  20.             org.springframework.web.servlet.DispatcherServlet  
  21.         </servlet-class>  
  22.         <init-param>  
  23.             <param-name>contextConfigLocation</param-name>  
  24.             <param-value>  
  25.                 classpath:org/shirdrn/spring/jms/integration/applicationContext.xml  
  26.             </param-value>  
  27.         </init-param>  
  28.         <load-on-startup>1</load-on-startup>  
  29.     </servlet>  
  30.   
  31.     <servlet-mapping>  
  32.         <servlet-name>controller</servlet-name>  
  33.         <url-pattern>*.do</url-pattern>  
  34.     </servlet-mapping>  
  35.     
  36.       
  37.     <welcome-file-list>  
  38.         <welcome-file>index.jsp</welcome-file>  
  39.     </welcome-file-list>  
  40. </web-app>  

 

另外,我们还要实现一个页面,提供输入发送消息的表单,提交后交给后台处理,成功发送后跳转到一个成功页面。表单输入页面为index.jsp,如下所示:

 

[java] view plaincopy
 
  1. <%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>  
  2. <%  
  3.     String path = request.getContextPath();  
  4.     String basePath = request.getScheme() + "://"  
  5.             + request.getServerName() + ":" + request.getServerPort()  
  6.             + path + "/";  
  7. %>  
  8.   
  9. <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">  
  10. <html>  
  11.     <head>  
  12.         <base href=http://www.mamicode.com/"<%=basePath%>">  
  13.   
  14.         <title>My JSP ‘index.jsp‘ starting page</title>  
  15.         <meta http-equiv="pragma" content="no-cache">  
  16.         <meta http-equiv="cache-control" content="no-cache">  
  17.         <meta http-equiv="expires" content="0">  
  18.         <meta http-equiv="keywords" content="keyword1,keyword2,keyword3">  
  19.         <meta http-equiv="description" content="This is my page">  
  20.         <!--  
  21.     <link rel="stylesheet" type="text/css" href=http://www.mamicode.com/"styles.css" mce_href=http://www.mamicode.com/"styles.css">  
  22.     -->  
  23.     </head>  
  24.   
  25.     <body>  
  26.         <div align="center" style="width: 500px; height: 300px; border:2px; borderColor:black">  
  27.             <form action="sendMessage.do" method="post">  
  28.                 <table align="center">  
  29.                     <tr>  
  30.                         <th colspan="2">  
  31.                             消息发送控制台  
  32.                         </th>  
  33.                     </tr>  
  34.                     <tr>  
  35.                         <td>  
  36.                             消息内容:  
  37.                         </td>  
  38.                         <td>  
  39.                             <input type="text" name="message">  
  40.                         </td>  
  41.                     </tr>  
  42.                     <tr>  
  43.                         <td align="center" colspan="2">  
  44.                             <input type="reset" value=http://www.mamicode.com/"清除">  
  45.                                   
  46.                             <input type="submit" value=http://www.mamicode.com/"发送">  
  47.                         </td>  
  48.                     </tr>  
  49.                 </table>  
  50.             </form>  
  51.         </div>  
  52.     </body>  
  53. </html>  

 

成功页面为success.jsp,就是给一个成功的提示信息,如下所示:

 

[java] view plaincopy
 
  1. <%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>  
  2. <%  
  3.     String path = request.getContextPath();  
  4.     String basePath = request.getScheme() + "://"  
  5.             + request.getServerName() + ":" + request.getServerPort()  
  6.             + path + "/";  
  7. %>  
  8.   
  9. <!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">  
  10. <html>  
  11.     <head>  
  12.         <base href=http://www.mamicode.com/"<%=basePath%>">  
  13.   
  14.         <title>My JSP ‘index.jsp‘ starting page</title>  
  15.         <meta http-equiv="pragma" content="no-cache">  
  16.         <meta http-equiv="cache-control" content="no-cache">  
  17.         <meta http-equiv="expires" content="0">  
  18.         <meta http-equiv="keywords" content="keyword1,keyword2,keyword3">  
  19.         <meta http-equiv="description" content="This is my page">  
  20.         <!--  
  21.     <link rel="stylesheet" type="text/css" href=http://www.mamicode.com/"styles.css" mce_href=http://www.mamicode.com/"styles.css">  
  22.     -->  
  23.     </head>  
  24.   
  25.     <body>  
  26.         <div align="center" style="width: 500px; height: 300px; border:2px; borderColor:black">  
  27.             <form action="sendMessage.do" method="post">  
  28.                 <table align="center">  
  29.                     <tr>  
  30.                         <th colspan="2">  
  31.                             消息发送报告  
  32.                         </th>  
  33.                     </tr>  
  34.                     <tr>  
  35.                         <td colspan="2">  
  36.                             状态:发送成功  
  37.                         </td>  
  38.                     </tr>  
  39.                     <tr>  
  40.                         <td align="center" colspan="2">  
  41.                             <a href=http://www.mamicode.com/"index.jsp" mce_href=http://www.mamicode.com/"index.jsp">返回</a>  
  42.                         </td>  
  43.                     </tr>  
  44.                 </table>  
  45.             </form>  
  46.         </div>  
  47.     </body>  
  48. </html>  

 

至此,我们可以将实现的简单web工程发布到Tomcat容器,然后启动Tomcat服务器,通过页面可以发送消息,并通过日志查看,实际消息发送和接收的情况。 

java JMS消息队列