首页 > 代码库 > springboot整合activemq小demo

springboot整合activemq小demo

直接上干货。。。

1、首先配置pom.xml文件如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mq.demo</groupId>
<artifactId>mq-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
<version>1.4.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
<version>1.4.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>1.4.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>1.4.1.RELEASE</version>
</dependency>
<!-- mq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>1.4.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
<version>3.18.1-GA</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>

</dependencies>

<!-- 打包成一个可执行的jar包 -->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>1.4.1.RELEASE</version>
</plugin>
</plugins>
</build>
</project>

2、创建包config,该包下创建类ActiveMQConfig.java文件,实现如下:

@Configuration
public class ActiveMQConfig {

@Bean
public ActiveMQConnectionFactory getActiveMQConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://localhost:61616");
connectionFactory.setCopyMessageOnSend(false);
connectionFactory.setAlwaysSyncSend(true);
connectionFactory.setUseAsyncSend(true);
connectionFactory.setOptimizeAcknowledge(true);
connectionFactory.setTrustAllPackages(true);
return connectionFactory;
}

}

 创建包listener,包下创建MQListener.java文件,内容如下:

public class MQListener implements MessageListener{
private Session session;

public MQListener(){
}

public MQListener(Session session) {
this.session = session;
}

/*
* @see javax.jms.MessageListener#onMessage(javax.jms.Message)
*/
public void onMessage(Message message) {
ObjectMessage objMsg = (ObjectMessage) message;
try {
System.out.println("收到消息 : " + objMsg.getObject());
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
}

创建包consumer,包下创建MyConsumer.java文件,内容如下:

@Component
@EnableScheduling
public class MyConsumer {
@Autowired
private ActiveMQConnectionFactory activeMQConnectionFactory;

@Scheduled(fixedDelay = 3000)
public void receive() throws ConnectException {
try {
System.out.println("======================开始消费========================");
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MQListener(session));
// ObjectMessage message = (ObjectMessage) consumer.receive();
// System.out.println("收到消息 : " + message.getObject());
// session.close();
}
catch (JMSException e) {
e.printStackTrace();
}
}
}

创建包producer,包下创建MyProducer.java文件,内容如下:

@Component
@EnableScheduling
public class MyProducer {

@Autowired
private ActiveMQConnectionFactory activeMQConnectionFactory;

@Scheduled(fixedDelay = 3000)
public void send() throws JMSException {
System.out.println("开始发送-------------------------------");
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test");
MessageProducer producer = session.createProducer(destination);
ObjectMessage objMsg = session.createObjectMessage();
objMsg.setObject("hello mq");
producer.send(objMsg);
session.commit();
session.close();
}
}

最后创建包start,包下创建Start.java文件,内容如下:

@EnableAutoConfiguration
@ComponentScan(basePackages = "com.mq.demo")
@Configuration
public class Start {

public static void main(String[] args) {
SpringApplication.run(Start.class, args);
}
}

 

springboot整合activemq小demo