首页 > 代码库 > JMS-mq-点对点

JMS-mq-点对点

1,修改Tomcat配置,并启动

文件:apache-tomcat-7.0.56-Idea\conf\context.xml
添加:

<Resource name="queue/connectionFactory" 
auth="Container" 
type="org.apache.activemq.ActiveMQConnectionFactory" 
description="JMS Connection Factory" 
factory="org.apache.activemq.jndi.JNDIReferenceFactory" 
brokerURL="tcp://localhost:61616" 
brokerName="LocalActiveMQBroker" /> 

<Resource name="queue/queue0" 
auth="Container" 
type="org.apache.activemq.command.ActiveMQQueue" 
description="My Queue" 
factory="org.apache.activemq.jndi.JNDIReferenceFactory" 
physicalName="TomcatQueue" />

 


2,启动MQ,监听Tomcat中的61616端口
文件:apache-activemq-5.13.0\bin\activemq.bat
管理:http://localhost:8161/admin/queues.jsp

3,部署Mq项目

[1],发送消息
http://localhost:8080/Mq/Send

消息内容:
Message send :ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:WWH-PC-62504-1488880588551-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://TomcatQueue, transactionId = null, expiration = 0, timestamp = 1488880588941, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = Helo world !}


[2],接受消息
http://localhost:8080/Mq/Receive

消息内容:
Message receive :ActiveMQTextMessage {commandId = 5, responseRequired = false, messageId = ID:WWH-PC-62504-1488880588551-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:WWH-PC-62504-1488880588551-1:1:1:1, destination = queue://TomcatQueue, transactionId = null, expiration = 0, timestamp = 1488880588941, arrival = 0, brokerInTime = 1488880588943, brokerOutTime = 1488880714774, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@1457df18, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Helo world !}

 

4,代码

【1】消息发送

package com.mq.core;

import java.io.IOException;
import java.io.PrintWriter;

import javax.jms.DeliveryMode;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@WebServlet("/Send")
public class Send extends HttpServlet {

    private static final long serialVersionUID = 1257616635024046695L;
    
    public Send(){
        super();
    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        
        PrintWriter out = resp.getWriter();
        try{
            
            //1-初始化上下文Context
            InitialContext context = new InitialContext();
            
            //2-查找队列对象  (早在Tomcat的context.xml配置的队列)
            Queue queue = (Queue) context.lookup("java:comp/env/queue/queue0");
            //3-查找连接工厂对象
            QueueConnectionFactory quConnectionFactory = (QueueConnectionFactory) context.lookup("java:comp/env/queue/connectionFactory");
            //4-获取连接
            QueueConnection quConnection = quConnectionFactory.createQueueConnection();
            //5-创建队列Session
            QueueSession quSession = quConnection.createQueueSession(false, Session.DUPS_OK_ACKNOWLEDGE);
            
            //6-创建发送者
            QueueSender quSender = quSession.createSender(queue);
            quSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            //7-创建发送内容
            TextMessage teMessage = quSession.createTextMessage("Helo world !");
            
            //8-消息发送
            quSender.send(teMessage);
            out.write("Message send :" + teMessage);
            
            //9-关闭连接
            quConnection.close();
        }catch(Throwable e){
            e.printStackTrace();
        }
    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        
    }
    
    

}

 

【2】消息接收

package com.mq.core;

import java.io.IOException;
import java.io.PrintWriter;

import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@WebServlet("/Receive")
public class Receive extends HttpServlet{

    /**
     * 
     */
    private static final long serialVersionUID = -8933907441633969743L;
    
    public Receive(){
        super();
    }

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        
        PrintWriter out = resp.getWriter();
        try{
            
            InitialContext context = new InitialContext();
            Queue queue = (Queue) context.lookup("java:comp/env/queue/queue0");
            QueueConnectionFactory quConnectionFactory = (QueueConnectionFactory) context.lookup("java:comp/env/queue/connectionFactory");
            QueueConnection quConnection = quConnectionFactory.createQueueConnection();
            QueueSession quSession = quConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            QueueReceiver quReceiver = quSession.createReceiver(queue);
            
            quConnection.start();
            TextMessage txMessage = (TextMessage) quReceiver.receive();
            out.write("Message receive :" + txMessage);
            quConnection.close();
        }catch(Throwable e){
            e.printStackTrace();
        }
    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        
    }

    
    
}

 参考:http://www.cnblogs.com/chenpi/p/5565618.html

JMS-mq-点对点