首页 > 代码库 > JMS-mq-发布/订阅

JMS-mq-发布/订阅

1,Tomcat配置

  <Resource name="topic/connectionFactory" auth="Container"
        type="org.apache.activemq.ActiveMQConnectionFactory" description="JMS Connection Factory"
        factory="org.apache.activemq.jndi.JNDIReferenceFactory"
        brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&amp;maxReconnectAttempts=5"
        brokerName="LocalActiveMQBroker" useEmbeddedBroker="false" />
        
    <Resource name="topic/topic0" 
        auth="Container"
        type="org.apache.activemq.command.ActiveMQTopic" description="My Topic" factory="org.apache.activemq.jndi.JNDIReferenceFactory"
        physicalName="TestTopic" />        

2,发布/订阅

发布:http://localhost:8080/Mq/Publisher

Publisher send:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:WWH-PC-64244-1488884593844-1:3:1:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://TestTopic, transactionId = null, expiration = 0, timestamp = 1488884667978, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, 
text = 2017?3?7?}

订阅:http://localhost:8080/Mq/Subscriber

Subscriber:ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:WWH-PC-64244-1488884593844-1:3:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:WWH-PC-64244-1488884593844-1:3:1:1, destination = topic://TestTopic, transactionId = null, expiration = 0, timestamp = 1488884667978, arrival = 0, brokerInTime = 1488884667978, brokerOutTime = 1488884667982, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@2dbd83ba, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, 
text = 2017?3?7?}

 

3,代码

【1】消息发布

package com.ma.publish;

import java.io.IOException;
import java.io.PrintWriter;

import javax.jms.DeliveryMode;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@WebServlet("/Publisher")
public class Publisher extends HttpServlet{

    
    /**
     * 消息-订阅/发布
     */
    private static final long serialVersionUID = -4470119218802259551L;

    public Publisher(){
        super();
    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        
        PrintWriter out = resp.getWriter();
        try{
            
            InitialContext context = new InitialContext();
            Topic topic = (Topic) context.lookup("java:comp/env/topic/topic0");
            TopicConnectionFactory tConnectionFactory = (TopicConnectionFactory) context.lookup("java:comp/env/topic/connectionFactory");
            
            TopicConnection tConnection = tConnectionFactory.createTopicConnection();
            TopicSession topicSession = tConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            TopicPublisher tpPublisher = topicSession.createPublisher(topic);
            tpPublisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            
            TextMessage txMessage = topicSession.createTextMessage();
            txMessage.setText("2017年3月7日");
            tpPublisher.publish(txMessage);
            
            out.write("Publisher send:" +txMessage);
            tConnection.close();
            
        }catch(Throwable e){
            e.printStackTrace();
        }
        
        
    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
    }
    
    
}

 

【2】消息订阅

package com.ma.publish;

import java.io.IOException;
import java.io.PrintWriter;

import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@WebServlet("/Subscriber")
public class Subscriber extends HttpServlet {

    /**
     * 消息-发布/订阅
     */
    private static final long serialVersionUID = 6058649540492572496L;

    public Subscriber(){
        super();
    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        
        PrintWriter out = resp.getWriter();
        try{
            
            InitialContext context = new InitialContext();
            Topic topic = (Topic) context.lookup("java:comp/env/topic/topic0");
            TopicConnectionFactory tConnectionFactory = (TopicConnectionFactory) context.lookup("java:comp/env/topic/connectionFactory");
            
            TopicConnection tConnection = tConnectionFactory.createTopicConnection();
            TopicSession topicSession = tConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            TopicSubscriber tpSubscriber = topicSession.createSubscriber(topic);
            
            tConnection.start();
            TextMessage txMessage = (TextMessage) tpSubscriber.receive();
            
            out.write("Subscriber:" +txMessage);
            tConnection.close();
            
        }catch(Throwable e){
            e.printStackTrace();
        }
        
        
    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        
    }
    
    
}

 

JMS-mq-发布/订阅