首页 > 代码库 > activemq api的封装

activemq api的封装

今天无聊写段代码。。学习一下activemq,简单封装了一下activemq 的topic api。跟jdbc很类似

主要代码:

import java.io.Serializable;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.ObjectMessage;import javax.jms.Session;/*本工具封装了*/import javax.jms.TextMessage;import javax.jms.Topic;import javax.jms.TopicConnection;import javax.jms.TopicConnectionFactory;import javax.jms.TopicPublisher;import javax.jms.TopicSession;import javax.jms.TopicSubscriber;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class JMSTopic {    TopicConnectionFactory connectionFactory;    // Connection :JMS 客户端到JMS Provider 的连接    TopicConnection connection = null;    //用来发布的会话        TopicSession proSession = null;    //2一个订阅会话    TopicSession conSession = null;        //主题发布者    MessageProducer producer=null;    //主题    MessageConsumer consumer=null;            // Destination :消息的目的地;消息发送给谁.    Destination destination;    // MessageProducer:消息发送者        //默认构造函数,默认的连接activemq,可以写多个构造函数    public JMSTopic()     {         connectionFactory =  new ActiveMQConnectionFactory(                ActiveMQConnection.DEFAULT_USER,                ActiveMQConnection.DEFAULT_PASSWORD,                "tcp://localhost:61616");        try {            connection= connectionFactory.createTopicConnection();        } catch (JMSException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        try {            connection.start();        } catch (JMSException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }            }        //此处先固定消息为String类型    public void writeMessage(String t,String message )    {        try {            proSession=connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);                        producer=proSession.createProducer(proSession.createTopic(t));                        //使用message构造TextMessage             TextMessage text=proSession.createTextMessage();            text.setText(message);            producer.send(text);                                                        } catch (JMSException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        //创建发布会话应该是可以配置的,此处先固定                    }    public void writeMessage(String t,Object o )    {        try {            proSession=connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);                        producer=proSession.createProducer(proSession.createTopic(t));                        //使用message构造TextMessage             ObjectMessage text=proSession.createObjectMessage();            text.setObject((Serializable) o);            producer.send(text);                                                        } catch (JMSException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        //创建发布会话应该是可以配置的,此处先固定                    }            //使用某个Message监听器来监听某个Topic    public void receiveMsg(String c,MessageListener ml)    {        try {            conSession=connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);            Topic t=conSession.createTopic(c);            consumer=conSession.createConsumer(t);            //设置过来的监视器            consumer.setMessageListener(ml);                    } catch (JMSException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }                                    }    }

2.测试,发送的消息是对象

a.一个序列化的Stduent 对象

 1 package ch02.chat; 2  3 import java.io.Serializable; 4  5 public class Student implements Serializable { 6     private int age; 7     private String name; 8     public Student(int age,String name) 9     {10         this.age=age;11         this.name=name;12         13         14     }15     public String toString()16     {17         return "age ="+age+"  name "+ "name";18     }19 20 }
View Code

b.客户端发送

 1 package ch02.chat; 2  3 public class ClientTest { 4     public static void main(String args[]) 5     { 6         JMSTopic jt=new JMSTopic(); 7         jt.writeMessage( "topic1",new Student(12,"han")); 8          9         10         11     }12 13 }
View Code

c.客户端接受信息

 1 package ch02.chat; 2  3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.MessageListener; 6 import javax.jms.ObjectMessage; 7 import javax.jms.TextMessage; 8  9 10 public class ClientTest2 {11     12     public static void main(String args[])13     {14         JMSTopic jt=new JMSTopic();15         16     jt.receiveMsg("topic1",new MessageListener()17     {18 19         @Override20         public void onMessage(Message message) {21             // TODO Auto-generated method stub22             ObjectMessage tm = (ObjectMessage) message;  23             try {  24                 System.out.println("Received message: " +tm.getObject());  25             } catch (JMSException e) {  26                 e.printStackTrace();  27             }  28             29             30         }31         32         33     }34             35             36             );37         38         39         40     }41 42 }
View Code

 

运行喽

 

activemq api的封装