首页 > 代码库 > 消息系统之Apache ActiveMQ

消息系统之Apache ActiveMQ

 

一、下载运行MQ服务

1、下载ActiveMQ :http://activemq.apache.org/

2、解压缩:

技术分享

进入bin目录 win32和win64对应不同位的操作系统,选择进入 点击activemq.bat 运行即可启动ActiveMQ服务。

技术分享

在浏览器输入ActiveMQ 服务地址:http://127.0.0.1:8161/admin/         默认用户名/密码 admin/admin

 技术分享

二、开发

jar:activemq-all-5.11.1.jar   在ActiveMQ安装目录下面就有  拷贝到工程即可

技术分享

 

1、点对点模式

 

package com.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生产者
 * @author Administrator
 */
public class JMSProducer 
{
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址

    public static void main(String[] args)
    {
        ConnectionFactory connfactory;//连接工厂
        Connection conn = null;//连接
        Session session;//接收或者发送消息的线程
        Destination dest;//消息的目的地
        MessageProducer producer;//消息的生产者
        
        //创建连接工厂
        connfactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME,JMSProducer.PASSWORD,JMSProducer.BROKEURL);
        try 
        {
            conn = connfactory.createConnection();//获取连接
            conn.start();//启动连接
            session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);//以事务模式获取会话
            dest = session.createQueue("FirstQueue1");//创建消息队列
            producer = session.createProducer(dest);//创建消息生产者
            sendMessage(session, producer);//生产并发送消息
            session.commit();
        } 
        catch (Exception e) 
        {
            e.printStackTrace();
        }
        finally
        {
            if (conn != null)
            {
                try 
                {
                    conn.close();
                } 
                catch (JMSException e) 
                {
                    e.printStackTrace();
                }
            }
        }
    }
    
    /**
     * 发现哦那个消息
     * @param session
     * @param messageProducer
     * @throws JMSException 
     */
    private static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException
    {
        for(int i=1;i<=10;i++)
        {
            TextMessage text = session.createTextMessage("生产消息:"+i);//session用来生产消息
            messageProducer.send(text);//MessageProducer用来发送消息
        }
    }
    

}
package com.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息消费者
 * @author Administrator
 *
 */
public class JMSConsumer 
{
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址

    public static void main(String[] args) 
    {
        ConnectionFactory connfactory;//连接工厂
        Connection conn = null;//连接
        Session session;//接收或者发送消息的线程
        Destination dest;//消息的目的地
        MessageConsumer messageConsumer;//消息消费者
        //创建连接工厂
        connfactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME,JMSConsumer.PASSWORD,JMSConsumer.BROKEURL);
        
        try 
        {
            conn = connfactory.createConnection();//获取连接
            conn.start();//启动连接
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);//以事务模式获取会话
            dest = session.createQueue("FirstQueue1");//创建消息队列
            messageConsumer = session.createConsumer(dest);
            //receive模式
//            while(true)
//            {
//                TextMessage text = (TextMessage)messageConsumer.receive(100000);
//                if (text != null)
//                {
//                    System.out.println("receive模式接收:"+text.getText());
//                }
//                else
//                {
//                    break;
//                }
//            }
            //监听模式
            messageConsumer.setMessageListener(new Listener());// 注册消息监听  
        } 
        catch (Exception e) 
        {
            e.printStackTrace();
        }
        //后期不能关闭  要一直处于监听模式  需要conn一直开启
    }

}
package com.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Listener implements MessageListener
{

    @Override
    public void onMessage(Message message) 
    {
        try {
            System.out.println("监听模式接收:"+ ((TextMessage)message).getText());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

 

2、发布订阅模式

 

package com.activemq2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSProducer 
{
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址

    public static void main(String[] args)
    {
        ConnectionFactory connfactory;//连接工厂
        Connection conn = null;//连接
        Session session;//接收或者发送消息的线程
        Destination dest;//消息的目的地
        MessageProducer producer;//消息的生产者
        
        //创建连接工厂
        connfactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME,JMSProducer.PASSWORD,JMSProducer.BROKEURL);
        try 
        {
            conn = connfactory.createConnection();//获取连接
            conn.start();//启动连接
            session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);//以事务模式获取会话
            dest = session.createTopic("FirstTopic1");//创建主题  与队列的区别
            producer = session.createProducer(dest);//创建消息生产者
            sendMessage(session, producer);//生产并发送消息
            session.commit();
        } 
        catch (Exception e) 
        {
            e.printStackTrace();
        }
        finally
        {
            if (conn != null)
            {
                try 
                {
                    conn.close();
                } 
                catch (JMSException e) 
                {
                    e.printStackTrace();
                }
            }
        }
    }
    
    /**
     * 发现哦那个消息
     * @param session
     * @param messageProducer
     * @throws JMSException 
     */
    private static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException
    {
        for(int i=1;i<=10;i++)
        {
            TextMessage text = session.createTextMessage("生产消息:"+i);//session用来生产消息
            messageProducer.send(text);//MessageProducer用来发送消息
        }
    }

}
package com.activemq2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import com.activemq.Listener;

public class JMSConsumer1 
{

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址

    public static void main(String[] args) 
    {
        ConnectionFactory connfactory;//连接工厂
        Connection conn = null;//连接
        Session session;//接收或者发送消息的线程
        Destination dest;//消息的目的地
        MessageConsumer messageConsumer;//消息消费者
        //创建连接工厂
        connfactory = new ActiveMQConnectionFactory(JMSConsumer1.USERNAME,JMSConsumer1.PASSWORD,JMSConsumer1.BROKEURL);
        
        try 
        {
            conn = connfactory.createConnection();//获取连接
            conn.start();//启动连接
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);//以事务模式获取会话
            dest = session.createTopic("FirstTopic1");//创建消息主题
            messageConsumer = session.createConsumer(dest);
            //监听模式
            messageConsumer.setMessageListener(new Listener1());// 注册消息监听  
        } 
        catch (Exception e) 
        {
            e.printStackTrace();
        }
        //后期不能关闭  要一直处于监听模式  需要conn一直开启
    }
}
package com.activemq2;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Listener1 implements MessageListener {

    @Override
    public void onMessage(Message message) 
    {
        try {
            System.out.println("监听模式1接收:"+ ((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

 

 

注意:

1、点对点和发布订阅模式的主要区别就是

dest = session.createQueue("FirstQueue1");//创建消息队列
dest = session.createTopic("FirstTopic1");//创建消息主题

2、发布订阅模式必须先订阅 再发布才能接收到。


参考

常见开源消息系统

 

消息系统之Apache ActiveMQ