首页 > 代码库 > activeMQ学习(2)---------点对点、发布订阅的消息代码实现

activeMQ学习(2)---------点对点、发布订阅的消息代码实现

以下是个人在学习activemq时从网上找到的资料, 总结留给自己以后复习

点对点的实现

 2       @Test    
public void sendMessage(){ 3 try { 4 // 创建一个连接工厂 5 String url = "tcp://localhost:61616"; 6 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); 7 // 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中,也可以在activemq.xml中配置 8 /* connectionFactory.setUserName("system"); 9 10 11 // 创建连接 12 Connection connection = connectionFactory.createConnection(); 13 connection.start(); 14 15 // 创建Session,参数解释: 16 // 第一个参数是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,没有回应则抛出异常,消息发送程序负责处理这个错误。 17 // 第二个参数消息的确认模式: 18 // AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次,但传输过程中可能因为错误而丢失消息。 19 // CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法(会通知消息提供者收到了消息) 20 // DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息(这种确认模式不在乎接收者收到重复的消息)。 21 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 22 // 创建目标,就创建主题也可以创建队列 23 Destination destination = session.createQueue("test11"); 24 // 创建消息生产者 25 MessageProducer producer = session.createProducer(destination); 26 27 // 设置持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT 28 //DeliveryMode.PERSISTENT 消息持久化,当activemq重启或者传送的过程中出现在问题,消息会被保存下来,当消费者连接时还是会收到消息 29 //DeliveryMode.NOT_PERSISTENT 消息不是持久化,当activemq重启或者传送的过程中出现在问题,消息会不会被保存下来,因此容易失去消息,不可靠的连接 30 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 31 // 创建消息 32 String text = "Hello ActiveMQ!"; 33 TextMessage message = session.createTextMessage(text); 34 // 发送消息到ActiveMQ 35 producer.send(message); 36 System.out.println("Message is sent!"); 37 // 关闭资源 38 session.close(); 39 connection.close(); 40 } 41 catch (Exception e) { 42 e.printStackTrace(); 43 } 44 } 45 46 @Test 47 public void getMessage(){ 48 try{ 49 String url = "tcp://localhost:61616"; 50 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); 51 54 // 创建连接 55 Connection connection = connectionFactory.createConnection(); 56 connection.start(); 57 // 创建Session 58 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 59 // 创建目标,就创建主题也可以创建队列 60 Destination destination = session.createQueue("test11"); 61 // 创建消息消费者 62 MessageConsumer consumer = session.createConsumer(destination); 63 // 接收消息,参数:接收消息的超时时间,为0的话则不超时,receive返回下一个消息,但是超时了或者消费者被关闭,返回null 64 Message message = consumer.receive(1000); 65 if (message instanceof TextMessage) { 66 TextMessage textMessage = (TextMessage) message; 67 String text = textMessage.getText(); 68 System.out.println("Received: " + text); 69 } else { 70 System.out.println("Received: " + message); 71 } 72 consumer.close(); 73 session.close(); 74 connection.close(); 75 } catch (Exception e) { 76 e.printStackTrace(); 77 } 78 }

下面是发布订阅的实现

 1 /**
 2      * topic消息的发送者
 3      *
 4      * @throws Exception
 5      */
 6     @Test
 7     public void sendMessage() throws Exception {
 8 
 9         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
10 
11         Connection connection = activeMQConnectionFactory.createConnection();
12         connection.start();
13 
14         Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
15 
16         Topic topic = session.createTopic("topicMessage");
17 
18         //创建一个topic的生产者
19         MessageProducer producer = session.createProducer(topic);
20 
21         //消息持久化
22         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
23 
24         String text = "send topic message seccess";
25 
26         TextMessage textMessage = session.createTextMessage();
27         textMessage.setText(text);
28         textMessage.setStringProperty("property", "property message");
29         // 生产者发送一条textMessage
30         producer.send(textMessage);
31         System.out.println("seccess");
32 
33         session.commit();
34         session.close();
35         connection.close();
36 
37     }
38 
39     /**
40      * 订阅topic发送的消息
41      * @throws Exception
42      */
43     @Test
44     public void receiveMessage() throws Exception {
45         String clientId = "client_id";
46 
47         //连接active
48         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
49 
50         Connection connection = activeMQConnectionFactory.createConnection();
51 
52         //客户端ID,持久订阅需要设置
53         connection.setClientID(clientId);
54         connection.start();
55 
56         Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
57 
58         Topic topic = session.createTopic("topicMessage");
59         //实例化一个消费者
60         MessageConsumer messageConsumer = session.createDurableSubscriber(topic, clientId);
61 
62         //listener 生产者发送的消息
63         messageConsumer.setMessageListener(new MessageListener() {
64             // 订阅接收方法
65             public void onMessage(Message message) {
66 
67                 TextMessage textMessage = (TextMessage) message;
68                 try {
69                     System.out.println("Received message: " + textMessage.getText() + ":" + textMessage.getStringProperty("property"));
70                 } catch (JMSException e) {
71                     e.printStackTrace();
72                 }
73             }
74         });
75     }

 

activeMQ学习(2)---------点对点、发布订阅的消息代码实现