首页 > 代码库 > activemq api的封装
activemq api的封装
今天无聊写段代码。。学习一下activemq,简单封装了一下activemq 的topic api。跟jdbc很类似
主要代码:
import java.io.Serializable;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.ObjectMessage;import javax.jms.Session;/*本工具封装了*/import javax.jms.TextMessage;import javax.jms.Topic;import javax.jms.TopicConnection;import javax.jms.TopicConnectionFactory;import javax.jms.TopicPublisher;import javax.jms.TopicSession;import javax.jms.TopicSubscriber;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class JMSTopic { TopicConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 TopicConnection connection = null; //用来发布的会话 TopicSession proSession = null; //2一个订阅会话 TopicSession conSession = null; //主题发布者 MessageProducer producer=null; //主题 MessageConsumer consumer=null; // Destination :消息的目的地;消息发送给谁. Destination destination; // MessageProducer:消息发送者 //默认构造函数,默认的连接activemq,可以写多个构造函数 public JMSTopic() { connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { connection= connectionFactory.createTopicConnection(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { connection.start(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //此处先固定消息为String类型 public void writeMessage(String t,String message ) { try { proSession=connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); producer=proSession.createProducer(proSession.createTopic(t)); //使用message构造TextMessage TextMessage text=proSession.createTextMessage(); text.setText(message); producer.send(text); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } //创建发布会话应该是可以配置的,此处先固定 } public void writeMessage(String t,Object o ) { try { proSession=connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); producer=proSession.createProducer(proSession.createTopic(t)); //使用message构造TextMessage ObjectMessage text=proSession.createObjectMessage(); text.setObject((Serializable) o); producer.send(text); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } //创建发布会话应该是可以配置的,此处先固定 } //使用某个Message监听器来监听某个Topic public void receiveMsg(String c,MessageListener ml) { try { conSession=connection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE); Topic t=conSession.createTopic(c); consumer=conSession.createConsumer(t); //设置过来的监视器 consumer.setMessageListener(ml); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
2.测试,发送的消息是对象
a.一个序列化的Stduent 对象
1 package ch02.chat; 2 3 import java.io.Serializable; 4 5 public class Student implements Serializable { 6 private int age; 7 private String name; 8 public Student(int age,String name) 9 {10 this.age=age;11 this.name=name;12 13 14 }15 public String toString()16 {17 return "age ="+age+" name "+ "name";18 }19 20 }
b.客户端发送
1 package ch02.chat; 2 3 public class ClientTest { 4 public static void main(String args[]) 5 { 6 JMSTopic jt=new JMSTopic(); 7 jt.writeMessage( "topic1",new Student(12,"han")); 8 9 10 11 }12 13 }
c.客户端接受信息
1 package ch02.chat; 2 3 import javax.jms.JMSException; 4 import javax.jms.Message; 5 import javax.jms.MessageListener; 6 import javax.jms.ObjectMessage; 7 import javax.jms.TextMessage; 8 9 10 public class ClientTest2 {11 12 public static void main(String args[])13 {14 JMSTopic jt=new JMSTopic();15 16 jt.receiveMsg("topic1",new MessageListener()17 {18 19 @Override20 public void onMessage(Message message) {21 // TODO Auto-generated method stub22 ObjectMessage tm = (ObjectMessage) message; 23 try { 24 System.out.println("Received message: " +tm.getObject()); 25 } catch (JMSException e) { 26 e.printStackTrace(); 27 } 28 29 30 }31 32 33 }34 35 36 );37 38 39 40 }41 42 }
运行喽
activemq api的封装
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。