首页 > 代码库 > 简单的activemq的封装和使用

简单的activemq的封装和使用

天空中飘着小雨,实在是适合写代码的时节。

1

  1 package ch02.chat;  2   3 import java.io.Serializable;  4   5 import javax.jms.Connection;  6 import javax.jms.ConnectionFactory;  7 import javax.jms.Destination;  8 import javax.jms.JMSException;  9 import javax.jms.Message; 10 import javax.jms.MessageConsumer; 11 import javax.jms.MessageListener; 12 import javax.jms.MessageProducer; 13 import javax.jms.ObjectMessage; 14 import javax.jms.Session; 15 /*本工具封装了*/ 16  17  18 import javax.jms.TextMessage; 19 import javax.jms.Topic; 20 import javax.jms.TopicConnection; 21 import javax.jms.TopicConnectionFactory; 22 import javax.jms.TopicPublisher; 23 import javax.jms.TopicSession; 24 import javax.jms.TopicSubscriber; 25  26 import org.apache.activemq.ActiveMQConnection; 27 import org.apache.activemq.ActiveMQConnectionFactory; 28  29 public class JMSTopic { 30     TopicConnectionFactory connectionFactory; 31     // Connection :JMS 客户端到JMS Provider 的连接 32     TopicConnection connection = null; 33     //用来发布的会话 34      35     //TopicSession proSession = null; 36     //2一个订阅会话 37    //TopicSession conSession = null; 38     TopicSession  session=null; 39      40     //主题发布者 41     MessageProducer producer=null; 42     //主题 43     MessageConsumer consumer=null; 44      45      46     // Destination :消息的目的地;消息发送给谁. 47     Destination destination; 48     // MessageProducer:消息发送者 49      50     //默认构造函数,默认的连接activemq,可以写多个构造函数 51     public JMSTopic()  52     {  53         connectionFactory =  new ActiveMQConnectionFactory( 54                 ActiveMQConnection.DEFAULT_USER, 55                 ActiveMQConnection.DEFAULT_PASSWORD, 56                 "tcp://localhost:61616"); 57         try { 58             connection= connectionFactory.createTopicConnection(); 59         } catch (JMSException e) { 60             // TODO Auto-generated catch block 61             e.printStackTrace(); 62         } 63         try { 64             connection.start(); 65         } catch (JMSException e) { 66             // TODO Auto-generated catch block 67             e.printStackTrace(); 68         } 69          70     } 71     public JMSTopic(String user,String name)  72     {  73         connectionFactory =  new ActiveMQConnectionFactory( 74                 user, 75                 name, 76                 "tcp://localhost:61616"); 77         try { 78             connection= connectionFactory.createTopicConnection(); 79         } catch (JMSException e) { 80             // TODO Auto-generated catch block 81             e.printStackTrace(); 82         } 83         try { 84             connection.start(); 85         } catch (JMSException e) { 86             // TODO Auto-generated catch block 87             e.printStackTrace(); 88         } 89          90     } 91      92      93      94      95     //设计session类型 96        public void setSession() throws JMSException 97        { 98            session= connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); 99            100        }101          //设置为原子类型102        public void setAtomicSession() throws JMSException103        {104            session= connection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);105            106        }107     108     //此处先固定消息为String类型109     public void writeMessage(String t,String message,int priority )110     {111         try {112             113             producer=session.createProducer(session.createTopic(t));114             115             //使用message构造TextMessage 116             TextMessage text=session.createTextMessage();117             text.setJMSPriority(priority);118             text.setText(message);119             producer.send(text);120             121             122             123             124         } catch (JMSException e) {125             // TODO Auto-generated catch block126             e.printStackTrace();127         }128         //创建发布会话应该是可以配置的,此处先固定129         130         131     }132     133     public void writeMessage(String t,Object o)134     {135         try {136 137             138             producer=session.createProducer(session.createTopic(t));139             140             //使用message构造TextMessage 141             ObjectMessage text=session.createObjectMessage();142             text.setObject((Serializable) o);143             producer.send(text);144             145             146             147             148         } catch (JMSException e) {149             // TODO Auto-generated catch block150             e.printStackTrace();151         }152         //创建发布会话应该是可以配置的,此处先固定153         154         155     }156         157     //使用某个Message监听器来监听某个Topic158     public void receiveMsg(String c,MessageListener ml)159     {160         try {161         162             Topic t=session.createTopic(c);163             consumer=session.createConsumer(t);164             //设置过来的监视器165             consumer.setMessageListener(ml);166             167         } catch (JMSException e) {168             // TODO Auto-generated catch block169             e.printStackTrace();170         }171         172         173         174         175     }176     public Message receiveMsg(String c)177     {178         try {179             180             Topic t=session.createTopic(c);181             consumer=session.createConsumer(t);182             //设置过来的监视器183             Message message=consumer.receive();184             return message;185             186         } catch (JMSException e) {187             // TODO Auto-generated catch block188             e.printStackTrace();189         }190         return null;191     }192     //同步接收信息193     194     public void commit() throws JMSException195     {196         session.commit();197     }198     public void rollback() throws JMSException199     {200         session.rollback();201     }202     public void close() throws JMSException 203     {204         if(connection!=null) connection.close();205         if(session!=null) session.close();206         if(producer!=null) session.close();207         if(consumer!=null) consumer.close();208         209         210         211         212     }213     214 215 }
View Code

 

2.如何使用呢?

a.做个有关事务的实

 1 package ch02.chat; 2  3 import java.util.Scanner; 4  5 import javax.jms.JMSException; 6 import javax.jms.Session; 7  8 public class ClientTest { 9     public static void main(String args[]) throws JMSException10     {11         //第一个例子,建立一个原子的session做个实验看看,这个一个不会发送任何信息到服务器,12         JMSTopic jt=new JMSTopic();13         //jt.setSession();14         try{15         16         jt.setAtomicSession();17         18         jt.writeMessage("que1", "hansongjiang",4);19         int x=10/0; //会抛出异常,实现回滚,所以que1中不会添加任何信息20         jt.writeMessage("que1","hansong",4);21         jt.commit();22        }23         catch(Exception e)24         {25             jt.rollback();26             27         }28         finally29         {30     31         jt.close();32         }33     34     //如果35     /*    jt=new  JMSTopic();36         try37         {38         jt.setSession();39         jt.writeMessage("que1", "hansongjiang",4);40         //int x=10/0;41         jt.writeMessage("que1","zhangsan",4);42         43         }44         catch(Exception e)45         {46             System.out.println("异常我抓住了");47             48         }49         50         */51         52         53     }54 55 }
View Code

 

入队的信息为的信息为个数为0

b.非事务的执行后呢?我们使用getAtomicSession获得的session执行后入topic个数为1

 

 1 package ch02.chat; 2  3 import java.util.Scanner; 4  5 import javax.jms.JMSException; 6 import javax.jms.Session; 7  8 public class ClientTest { 9     public static void main(String args[]) throws JMSException10     {11     12     13         JMSTopic jt=new  JMSTopic();14         try15         {16         jt.setSession();17         jt.writeMessage("que1", "hansongjiang",4);18         //int x=10/0;19         jt.writeMessage("que1","zhangsan",4);20         21         }22         catch(Exception e)23         {24             System.out.println("异常我抓住了");25             26         }27         28         */29         30         31     }32 33 }
View Code

 

简单的activemq的封装和使用