首页 > 代码库 > ActiveMQ与spring集成实现Queue模式

ActiveMQ与spring集成实现Queue模式

  ActiveMQ可以和spring很好的集成,下面我们来看看,如何做个集成的demo。

  (1)pom.xml引入相关jar

<!-- spring相关 begin -->
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>3.1.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>4.1.5.RELEASE</version>
        </dependency>
        <!-- spring相关 end -->
        <!-- activeMQ相关 begin-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.1.4.RELEASE</version>
        </dependency>
        <!-- activeMQ相关  end-->

  (2)添加生产者配置activemq-sender.xml

<description>JMS发布者应用配置</description>
    
    <!-- CachingConnectionFactory 连接工厂 (有缓存功能)-->
    <bean id="cachingConnectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="20" />
        <property name="targetConnectionFactory">  
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
                <!-- MQ地址 账户名 密码-->  
                <property name="brokerURL" value="tcp://192.168.56.129:61616" />
                <property name="userName" value="parry" />
                <property name="password" value="parry123" />
                <!-- 是否异步发送 -->
                <property name="useAsyncSend" value="true"/>
            </bean>  
        </property>  
    </bean>
    
    <!-- 接收消息的目的地(一个主题)点对点队列 -->
    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 设置消息主题的名字 -->
        <constructor-arg index="0" value="messages" />
    </bean>
    
    <!-- 接收配置JMS模版 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory" />
        <property name="defaultDestination" ref="destination" />
        <!-- value为true为发布/订阅模式; value为false为点对点模式-->
        <property name="pubSubDomain" value="false"/>
    </bean>

  (3)添加消费者配置activemq-consumer.xml

<description>JMS订阅者应用配置</description>
    <!-- CachingConnectionFactory 连接工厂 (有缓存功能)-->
    <bean id="cachingConnectionFactory"
        class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- Session缓存数量 -->
        <property name="sessionCacheSize" value="20" />
        <property name="targetConnectionFactory">  
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
                <!-- MQ地址 账户名 密码-->  
                <property name="brokerURL" value="tcp://192.168.56.129:61616" />
                <property name="userName" value="parry" />
                <property name="password" value="parry123" />
                <!-- 是否异步发送 -->
                <property name="useAsyncSend" value="true"/>
            </bean>  
        </property>  
    </bean>
    
    <!-- 接收消息的目的地(一个主题)点对点队列 -->
    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 设置消息主题的名字 -->
        <constructor-arg index="0" value="messages" />
    </bean>
    
    <!-- 消费者配置 (自己定义) -->
    <bean id="consumer" class="com.parry.MQ.funcion.Listener" />
    
    <!-- 消息监听容器 -->
    <bean id="myListenerContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="cachingConnectionFactory" />
        <property name="destination" ref="destination" />
        <property name="messageListener" ref="consumer" />
        <!-- 如果消息的接收速率,大于消息处理的速率时,可以采取线程池方式 -->
        <property name="taskExecutor" ref="queueMessageExecutor"/>
        <!-- 设置固定的线程数 -->
        <property name="concurrentConsumers" value="30"/>
        <!-- 设置动态的线程数 -->
        <property name="concurrency" value="20-50"/>
        <!-- 设置最大的线程数 -->
        <property name="maxConcurrentConsumers" value="80"/>
    </bean>
    <bean id="queueMessageExecutor"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="30" />
        <property name="maxPoolSize" value="80" />
        <property name="daemon" value="true" />
        <property name="keepAliveSeconds" value="120" />
    </bean>

  (4)新建一个发送消息的方法

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
/**
 * 发送消息
 * @author Administrator
 *
 */
@Component
public class QueueSender {
    
    @Autowired
    private JmsTemplate myJmsTemplate;

    /**
     * 发送一条消息到指定的队列(目标)
     * 
     * @param queueName
     *            队列名称
     * @param message
     *            消息内容
     */
    public void send(String queueName, final String message) {
        myJmsTemplate.send(queueName, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }
}

  (5)添加监听器

package com.parry.MQ.funcion;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 接收者监听类
 * @author Administrator
 *
 */
public class Listener implements MessageListener {

    public void onMessage(Message message) {
        // 业务处理
        try {
            TextMessage message2 = (TextMessage) message;
            System.out.println("接收到信息:" + message2.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

  (6)写个一请求测试一下

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import com.parry.MQ.funcion.QueueSender;

@Controller
public class App {
    
    @Autowired
    private QueueSender sender;
    
    @RequestMapping("test")
    @ResponseBody
    public String Test() {
        
        sender.send("messages", "你好,这是我的第一条消息!");
        return "Hello world";
    }
}

  (7)测试结果

  技术分享

  在ActiveMQ的管理后台,我们也能看到我们的消息(这里我多测试了几次):

  技术分享

ActiveMQ与spring集成实现Queue模式