首页 > 代码库 > jms入门

jms入门

一.所需jar(maven)

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-all</artifactId>
   <version>5.14.3</version>
</dependency>

 

二.创建生产者

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;

import javax.jms.*;
import java.io.Serializable;

public class JMSProducer {
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionfactory =new ActiveMQConnectionFactory("username","pwd","tcp://localhost:61616");

        //创建与JMS服务的连接:ConnectionFactory被管理的对象,由客户端创建,用来创建一个连接对象
        Connection connection = connectionfactory.createConnection();
        /*
                确认消息的方式有如下三种:
                AUTO_ACKNOWLEDGE(自动通知)
                CLIENT_ACKNOWLEDGE(客户端自行决定通知时机)
                DUPS_OK_ACKNOWLEDGE(延时//批量通知)

         */
         
        /*
            打开会话,一个单独的发送和接受消息的线程上下文 
            为true时,事务会话必须session.commit();
        */
        Session session =connection.createSession(true,Session.AUTO_ACKNOWLEDGE );
        JMSProducer qs = new JMSProducer();
        qs.sendTextMsg(session,"helli text","jmsText");
        
        /*
            MapMessage mapMsg = session.createMapMessage();
            mapMsg.setString("name", "张三");
            mapMsg.setInt("age", 35);
            qs.sendMap(session, mapMsg, "queue.msgMap");//发送map类型的消息

            JMS jms = new JMS();//发送Object类型消息
            jms.setName("zhangsan");
            jms.setSex("男");
            qs.sendObj(session,jms,"queue.msgObj");
        */
          
        session.commit(); //在事务性会话中,只有commit之后,消息才会真正到达目的地
        session.close();
        connection.close();
    }

    /*
       发送文本消息
     */
    public void sendTextMsg(Session session,String MsgContent,String name) throws JMSException{
            Queue queue = new ActiveMQQueue(name); // Topic topic=new ActiveMQTopic(name); 创建topic
            MessageProducer msgProducer = session.createProducer(queue);
            Message textMessage = session.createTextMessage(MsgContent);
            msgProducer.send(textMessage);

        /*    
            //发送byte字节
            byte[] bs={1,2};
            BytesMessage  msg1= session.createBytesMessage();
            msg1.writeBytes(bs);
            msgProducer.send(msg1);

            //流消息
            StreamMessage streamMessage = session.createStreamMessage();
            streamMessage.writeString("streamMessage流消息");
            streamMessage.writeLong(55);
            producer.send(streamMessage);
         */
    }
    
    /*
       发送MAP类型消息
     */
    public void sendMap(Session session, MapMessage map, String name) throws JMSException {
        Topic topic=new ActiveMQTopic(name);   // Queue queue = new ActiveMQQueue(name);
        MessageProducer msgProducer1=session.createProducer(topic);
        msgProducer1.send(map);
        //msgProducer1.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 设置了重启之后消息会丢失
        //msgProducer1.setTimeToLive(1000*60*60);  消息有效期1小时
    }
    
    /*
       发送Object类型消息
     */
    public void sendObj(Session session,Object obj,String name) throws JMSException{
        Destination queue = new ActiveMQQueue(name);
        //发送对象时必须让该对象实现serializable接口
        ObjectMessage objMsg=session.createObjectMessage((Serializable) obj);
        MessageProducer msgProducer = session.createProducer(queue);
        msgProducer.send(objMsg);
    }
}

三.创建消费者

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;

import javax.jms.*;

public class JMSConsumer implements MessageListener{
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionfactory =null;
        Connection connection=null;
        Session session=null;
        if(connectionfactory==null){
            connectionfactory = new ActiveMQConnectionFactory("username","pwd","tcp://localhost:61616");
            //接收对象时,设置这个为true
            ((ActiveMQConnectionFactory) connectionfactory).setTrustAllPackages(true);
        }
        if(connection==null){
            connection = connectionfactory.createConnection();
            connection.start();
        }
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Queue queue = new ActiveMQQueue("que");//根据发送的名称接受消息
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(new JMSConsumer());//不继承MessageListener时可以用consumer.receive()手动接受消息

        Topic queue1 = new ActiveMQTopic("topic-name");
        MessageConsumer consumer1 = session.createConsumer(queue1);
        consumer1.setMessageListener(new JMSConsumer());

        Queue queue3 = new ActiveMQQueue("queue.msgMap");
        MessageConsumer consumer3 = session.createConsumer(queue3);
        consumer3.setMessageListener(new JMSConsumer());

        Queue queue2 = new ActiveMQQueue("queue.msgObj");
        MessageConsumer consumer2 = session.createConsumer(queue2);
        consumer2.setMessageListener(new JMSConsumer());
    }

    public void onMessage(Message message) {
        //instanceof 测试它所指向的对象是否是TextMessage类
        if(message instanceof TextMessage){ //接受文本消息
            TextMessage text = (TextMessage) message;
            try {
                System.out.println("message:"+message);
                System.out.println("发送的文本消息内容为:"+text.getText()); 
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
       
        if(message instanceof MapMessage){ //接收map消息
            MapMessage map = (MapMessage) message;
            try {
                System.out.println("姓名:"+map.getString("name"));
                System.out.println("年龄:"+map.getInt("age"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        
        if(message instanceof ObjectMessage){ //接收object
            try {
                System.out.println("ObjectMessage");
                ObjectMessage objMsg =(ObjectMessage) message;
                JMS jms=(JMS) objMsg.getObject();
                System.out.println(jms);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        if(message instanceof BytesMessage){ //接收字节消息
            byte[] b = new byte[1024];
            int len = -1;
            BytesMessage byteMsg = (BytesMessage)message;
            try {
                while((len=byteMsg.readBytes(b))!=-1){
                    System.out.println(new String(b, 0, len));
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
            /*
                  if(message instanceof StreamMessage){ //接收流消息
                    StreamMessage message = (StreamMessage)message;
                    System.out.println(message.readString());
                    System.out.println(message.readLong());
                }
             */

    }
}

jms入门