首页 > 代码库 > activimq消息队列基本配置

activimq消息队列基本配置

背景:

在了解一个分布式框架的时候,偶然接触到activimq消息队列,于是就决定写一个小demo

 

首先是linux配置activimq

去官网下载一个activimq的linux安装包,直接解压,到bin目录下执行:

activemq start

这样就启动了,很简单

然后可以查看是否有新的端口61616被监听

技术分享

接着查看是否有相关进程

技术分享

同时activimq还有一个后台监控页面, 但是这个默认端口是8161,访问地址为:

http://10.10.10.30:8161/admin/

默认用户名、密码均为admin

技术分享

接下来是测试代码,maven库地址:

1 <dependency>
2             <groupId>org.apache.activemq</groupId>
3             <artifactId>activemq-core</artifactId>
4             <version>5.7.0</version>
5         </dependency>
 1 package com.asen.activimq;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.DeliveryMode;
 6 import javax.jms.Destination;
 7 import javax.jms.MessageProducer;
 8 import javax.jms.Session;
 9 import javax.jms.TextMessage;
10 
11 import org.apache.activemq.ActiveMQConnection;
12 import org.apache.activemq.ActiveMQConnectionFactory;
13 
14 public class Sender {
15     private static final int SEND_NUMBER = 5;
16 
17     public static void main(String[] args) {
18         // ConnectionFactory :连接工厂,JMS 用它创建连接
19         ConnectionFactory connectionFactory;
20         // Connection :JMS 客户端到JMS
21         // Provider 的连接
22         Connection connection = null;
23         // Session: 一个发送或接收消息的线程
24         Session session;
25         // Destination :消息的目的地;消息发送给谁.
26         Destination destination;
27         // MessageProducer:消息发送者
28         MessageProducer producer;
29         // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
30         connectionFactory = new ActiveMQConnectionFactory(
31                 ActiveMQConnection.DEFAULT_USER,
32                 ActiveMQConnection.DEFAULT_PASSWORD, "tcp://10.10.10.30:61616");
33         try {
34             // 从工厂获取连接对象
35             connection = connectionFactory.createConnection();
36             // 启动
37             connection.start();
38             // 获取操作连接
39             session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
40             // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
41             // 获取消息目的地
42             destination = session.createQueue("FirstQueue");
43             // 获取发送者
44             producer = session.createProducer(destination);
45             // 设置不持久化,此处学习,实际根据项目决定
46             producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
47             // 构造消息,此处写死,项目就是参数,或者方法获取
48             sendMessage(session, producer);
49             session.commit();
50         } catch (Exception e) {
51             e.printStackTrace();
52         } finally {
53             try {
54                 if (null != connection)
55                     connection.close();
56             } catch (Exception e) {
57                 e.printStackTrace();
58             }
59         }
60     }
61 
62     public static void sendMessage(Session session, MessageProducer producer)
63             throws Exception {
64 
65         for (int i = 1; i <= SEND_NUMBER; i++) {
66             TextMessage message = session.createTextMessage("ActiveMq 发送消息" + i);
67             // 发送消息到目的地方
68             System.out.println("发送消息:" + "ActiveMq 发送消息" + i);
69             producer.send(message);
70         }
71     }
72 }
 1 package com.asen.activimq;
 2 
 3 import javax.jms.Connection;
 4 import javax.jms.ConnectionFactory;
 5 import javax.jms.Destination;
 6 import javax.jms.MessageConsumer;
 7 import javax.jms.Session;
 8 import javax.jms.TextMessage;
 9 
10 import org.apache.activemq.ActiveMQConnection;
11 import org.apache.activemq.ActiveMQConnectionFactory;
12 
13 public class Receiver {
14     public static void main(String[] args) {
15         // ConnectionFactory :连接工厂,JMS 用它创建连接
16         ConnectionFactory connectionFactory;
17         // Connection :JMS 客户端到JMS Provider 的连接
18         Connection connection = null;
19         // Session: 一个发送或接收消息的线程
20         Session session ;
21         // Destination :消息的目的地;消息发送给谁.
22         Destination destination;
23         // 消费者,消息接收者
24         MessageConsumer consumer;
25         // 初始化工厂
26         connectionFactory = new ActiveMQConnectionFactory(
27                 ActiveMQConnection.DEFAULT_USER,
28                 ActiveMQConnection.DEFAULT_PASSWORD, "tcp://10.10.10.30:61616");
29         try {
30             // 从工厂获取连接对象
31             connection = connectionFactory.createConnection();
32             // 启动
33             connection.start();
34             // 获取操作连接
35             session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
36             // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在ActiveMq的console配置  
37             // 获取消息目的地:队列
38             destination = session.createQueue("FirstQueue");
39             // 接收消息
40             consumer = session.createConsumer(destination);
41             while(true){
42                 // 设置接收者接收消息的时间,为了便于测试,这里设定为100s
43                 TextMessage message = (TextMessage)consumer.receive(100000);
44                 System.out.println("message:" + message);
45                 if(null != message){
46                     System.out.println("接收到消息:" + message.getText());
47                 }else{
48                     break;
49                 }
50             }
51         } catch (Exception e) {
52             e.printStackTrace();
53         } finally {
54             try {
55                 if (null != connection) {
56                     connection.close();
57                 }
58             } catch (Exception e) {
59                 e.printStackTrace();
60             }
61         }
62 
63     }
64 }

运行之后,后台监控页面可以看见:

技术分享

技术分享

 

activimq持久化常用的有三种方式:1、文件持久化 2、mysql持久化 3、oracle持久化

在activimq的配置文件中默认开启了文件持久化

技术分享

同时我们需要修改一行代码:

技术分享

这样在activimq重启之后就不会有消息丢失了

activimq消息队列基本配置