首页 > 代码库 > ActiveMQ JMS 项目 基于 Maven 搭建

ActiveMQ JMS 项目 基于 Maven 搭建

 

JAVA版本:

技术分享

IntellJ IDEA 版本:

IntelliJ IDEA 2017.2
Build #IU-172.3317.76, built on July 15, 2017
Licensed to Administrator

JRE: 1.8.0_152-release-915-b5 amd64
JVM: OpenJDK 64-Bit Server VM by JetBrains s.r.o
Windows 7 6.1

 

一、新建Maven工程

1.选择File => New => Project...

技术分享

技术分享

技术分享

2.或者执行maven 命令行创建工程。

D:cd D:\JavaSourceCode\JavaSamplesmvn archetype:generate -DgroupId=com.phpdragon -DartifactId=jms-activeme-mq -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

3.或者手动创建如下目录结构:

技术分享

4.如果是手动创建目录,需设置目录属性让 IDEA 识别为源码包路径

技术分享

 

二、添加JAR依赖

spring-jms:

spring-test:

activemq-pool:

fastjson:

junit:

testng:

pom.xml配置如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <groupId>com.phpdragon</groupId>    <artifactId>jms-activemq-demo</artifactId>    <version>1.0-SNAPSHOT</version>    <packaging>jar</packaging>    <name>jms-activemq-demo</name>    <url>http://maven.apache.org</url>    <developers>        <developer>            <id>phpdragon</id>            <name>phpdragon</name>            <email>phpdragon@qq.com</email>        </developer>    </developers>    <properties>        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>        <fastjson.vesrion>1.2.35</fastjson.vesrion>        <activemq-pool.version>5.15.0</activemq-pool.version>        <spring.version>4.3.10.RELEASE</spring.version>        <junit.version>4.12</junit.version>        <testng.version>6.11</testng.version>    </properties>    <dependencies>        <dependency>            <groupId>com.alibaba</groupId>            <artifactId>fastjson</artifactId>            <version>${fastjson.vesrion}</version>        </dependency>        <!-- activemq -->        <dependency>            <groupId>org.apache.activemq</groupId>            <artifactId>activemq-pool</artifactId>            <version>${activemq-pool.version}</version>        </dependency>        <!-- spring-jms -->        <dependency>            <groupId>org.springframework</groupId>            <artifactId>spring-jms</artifactId>            <version>${spring.version}</version>        </dependency>        <!-- spring -->        <!--单元测试-->        <dependency>            <groupId>org.springframework</groupId>            <artifactId>spring-test</artifactId>            <version>${spring.version}</version>            <scope>test</scope>        </dependency>        <dependency>            <groupId>junit</groupId>            <artifactId>junit</artifactId>            <version>${junit.version}</version>            <scope>test</scope>        </dependency>        <!--自动化测试-->        <dependency>            <groupId>org.testng</groupId>            <artifactId>testng</artifactId>            <version>${testng.version}</version>            <scope>test</scope>        </dependency>    </dependencies></project>

 

三、编写生产者、消费者

1.添加消费者MqProducer.java

import com.sun.nio.sctp.MessageInfo;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.jms.core.JmsTemplate;import org.springframework.stereotype.Component;@Componentpublic class MqProducer {    @Autowired    private JmsTemplate jmsTemplate;    public void sendMsg(MessageInfo info) {        try {            jmsTemplate.convertAndSend(info);        } catch (Exception e) {            e.printStackTrace();        }    }}

 

2.创建jms消息转换器MqMessageConverter.java

import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;import com.phpdragon.jms.pojo.MessagePojo;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.jms.support.converter.MessageConversionException;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import org.springframework.stereotype.Component;@Component("messageConverter")public class MqMessageConverter implements org.springframework.jms.support.converter.MessageConverter {    private static final Logger LOGGER = LoggerFactory.getLogger(MqMessageConverter.class);    public Object fromMessage(Message message) throws JMSException, MessageConversionException {        LOGGER.info("从mq获得message, message内容:" + message);        JSONObject jsonRoot = (JSONObject) JSON.parse(message.getStringProperty("obj"));        JSONObject jsonObj = JSONObject.parseObject(jsonRoot.getString("value"));        MessagePojo info = JSON.toJavaObject(jsonObj.getJSONObject("body"), MessagePojo.class);        return info;    }    public Message toMessage(Object obj, Session session) throws JMSException, MessageConversionException {        LOGGER.info("往mq插入message, message内容:" + obj);        JSONObject jsonRoot = new JSONObject();        JSONObject jsonObj = new JSONObject();        jsonObj.put("body", obj);        jsonRoot.put("value", jsonObj.toJSONString());        Message message = session.createMapMessage();        message.setObjectProperty("obj", jsonRoot.toJSONString());        return message;    }}

 

3.添加spring配置文件, spring-context.xml、app.properties

1)app.properties:

application.main=com.phpdragon.jms.Appapplication.name=jms_activemq_demo_serverapplication.owner=phpdragonmq.queue.name=COM.PHPDRAGON.JMS.DEMO.QUEUEmq.brokerURL=tcp://127.0.0.1:61616

2)spring-context.xml:

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       xmlns:context="http://www.springframework.org/schema/context"       xsi:schemaLocation="http://www.springframework.org/schema/beans       http://www.springframework.org/schema/beans/spring-beans-2.5.xsd       http://www.springframework.org/schema/context       http://www.springframework.org/schema/context/spring-context-4.3.xsd">    <!--扫描classpath路径下的属性配置文件-->    <context:property-placeholder location="classpath*:/*.properties" ignore-resource-not-found="true" "/>    <!--配置spring扫描路径-->    <context:component-scan base-package="com.phpdragon.jms"/>    <!--创建连接工厂 -->    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">        <property name="brokerURL" value="${mq.brokerURL}"></property>        <property name="useAsyncSend" value="true"></property>    </bean>    <!-- 声明ActiveMQ消息目标,目标可以是一个队列,也可以是一个主题ActiveMQTopic -->    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg index="0" value="${mq.queue.name}"></constructor-arg>    </bean>    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">        <property name="connectionFactory" ref="connectionFactory"></property>        <property name="defaultDestination" ref="destination"></property>        <property name="receiveTimeout" value="6000"></property>        <property name="messageConverter" ref="messageConverter"></property>    </bean></beans>

4.编写启动程序App.java

import com.phpdragon.jms.activemq.MqProducer;import com.phpdragon.jms.pojo.MessagePojo;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.ApplicationContext;import org.springframework.context.support.ClassPathXmlApplicationContext;import org.springframework.stereotype.Component;import java.io.IOException;@Componentpublic class App {    public static final String DEFAULT_CONFIG_LOCATION = "/spring-context.xml";    @Autowired    private MqProducer mqProducer;    /**     * 程序入口     *     * @param args     * @throws IOException     */    public static void main(String[] args) throws IOException {        ApplicationContext context = new ClassPathXmlApplicationContext(DEFAULT_CONFIG_LOCATION);        App app = (App) context.getBean("app");        app.run(args);    }    public void run(String[] args) {        MessagePojo msg = new MessagePojo();        msg.setTitle("Test");        msg.setContent("TestContent");        mqProducer.sendMsg(msg);        System.exit(0);    }}

5.到此,一个activeMQ发送程序就写好了,选中App.java的main函数体,鼠标右键点击 debug 运行,执行效果如下:

技术分享

 

6.登录activeMQ管理后台,http://127.0.0.1:8161/admin/queues.jsp, 默认帐号: admin 密码: admin 

技术分享

 

 

7.添加消费者

1)创建MqConsumer.java

import com.phpdragon.jms.pojo.MessagePojo;import org.springframework.stereotype.Component;import javax.jms.JMSException;@Componentpublic class MqConsumer{    public void handleMessage(MessagePojo msg) throws JMSException {        System.out.println("handleMessage:" + msg.toString());    }}

 

2)添加activeMQ 监听配置

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"       xmlns:context="http://www.springframework.org/schema/context"       xsi:schemaLocation="http://www.springframework.org/schema/beans       http://www.springframework.org/schema/beans/spring-beans-2.5.xsd       http://www.springframework.org/schema/context       http://www.springframework.org/schema/context/spring-context-4.3.xsd">    <!--扫描classpath路径下的属性配置文件-->    <context:property-placeholder location="classpath*:/*.properties" ignore-resource-not-found="true"/>    <!--配置spring扫描路径-->    <context:component-scan base-package="com.phpdragon.jms"/>    <!--创建连接工厂 -->    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">        <property name="brokerURL" value="${mq.brokerURL}"></property>        <property name="useAsyncSend" value="true"></property>    </bean>    <!-- 声明ActiveMQ消息目标,目标可以是一个队列,也可以是一个主题ActiveMQTopic -->    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg index="0" value="${mq.queue.name}"></constructor-arg>    </bean>        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">        <property name="connectionFactory" ref="connectionFactory"></property>        <property name="defaultDestination" ref="destination"></property>        <property name="receiveTimeout" value="6000"></property>        <property name="messageConverter" ref="messageConverter"></property>    </bean>    <!-- 消息监听适配器 -->    <bean id="myMessageListenerAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">        <property name="delegate" ref="mqConsumer"/>        <property name="messageConverter" ref="messageConverter"/>    </bean>    <bean id="mqContainer" class="org.springframework.jms.listener.SimpleMessageListenerContainer">        <property name="connectionFactory" ref="connectionFactory"/>        <property name="destinationName" value="${mq.queue.name}"/>        <!-- 使用MessageListenerAdapter来作为消息监听器 -->        <property name="messageListener" ref="myMessageListenerAdapter"/>        <!--最小并发数是4,最大并发数为8-->        <property name="concurrency" value="4-8"/>        <property name="sessionTransacted" value="true"/>    </bean></beans>

3) 右键debug运行,效果如下:

技术分享

4) 查看消费情况

技术分享

 

 

 

四、集成logback

是否觉得debug日志太简单?那我们引入logback支持。实现丰富日志输出、日志back等

 

1.添加logback-classic Maven依赖

<!-- logback --><dependency>    <groupId>ch.qos.logback</groupId>    <artifactId>logback-classic</artifactId>    <version>1.2.3</version></dependency>

 

2.在resources资源目录中添加logback.xml文件

<?xml version="1.0" encoding="UTF-8" ?><configuration>    <!--日志配置 -->    <property name="LOG_BACK_DIR" value="logs"/>    <!-- logback 工程的日志配置 -->    <appender name="DEBUG_ROLLING"              class="ch.qos.logback.core.rolling.RollingFileAppender">        <file>${LOG_BACK_DIR}/debug.log</file>        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!-- rollover daily -->            <fileNamePattern>${LOG_BACK_DIR}/debug_%d{yyyyMMddHH}.%i.log            </fileNamePattern>            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">                <!-- or whenever the file size reaches 100MB -->                <maxFileSize>256MB</maxFileSize>            </timeBasedFileNamingAndTriggeringPolicy>            <maxHistory>48</maxHistory>            <!-- 保存最大文件数 -->        </rollingPolicy>        <encoder>            <pattern>%d{yyyy-MM-dd                HH:mm:ss.SSS}|%X{threadId}|%X{traceId}-%X{rpcId}|%level|%C|%M|%L|%.-512msg%n            </pattern>            <charset>UTF-8</charset>        </encoder>        <filter class="ch.qos.logback.classic.filter.ThresholdFilter"><!-- 只打印错误日志 -->            <level>TRACE</level>        </filter>    </appender>    <!-- logback 工程的日志配置 END -->         <!-- 开发环境使用 打印在控制台 -->    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">        <layout class="ch.qos.logback.classic.PatternLayout">            <param name="Pattern"                   value="%d{yyyy-MM-dd HH:mm:ss.SSS}|%X{threadId}|%X{traceId}-%X{rpcId}|%level|%C|%M|%L|%.-512msg%n"/>        </layout>    </appender>    <logger name="org.springframework" level="WARN"/>    <!--开发环境为DEBUG等级 -->    <root level="DEBUG">        <appender-ref ref="STDOUT"/>        <appender-ref ref="INFO_ROLLING"/>    </root></configuration>

 

3.右键debug运行,效果如下:

技术分享

 

五、集成assembly

 

六、单元测试与自动测试

 

七、编译并上传远程仓库

 

 

 

源码地址:

有XML配置版本: https://github.com/phpdragon/JavaSamples/tree/master/jms-activemq-demo

 spring注解版本:https://github.com/phpdragon/JavaSamples/tree/master/jms-activemq-demo-no-xml

spring-boot版本:https://github.com/phpdragon/JavaSamples/tree/master/spring-boot-starter-activemq-demo

 

ActiveMQ JMS 项目 基于 Maven 搭建