首页 > 代码库 > ActiveMQ安装与使用

ActiveMQ安装与使用

一 .安装运行ActiveMQ:

1.下载activemq

wget http://archive.apache.org/dist/activemq/apache-activemq/5.9.0/apache-activemq-5.9.0-bin.tar.gz

技术分享

2.解压

tar -xf apache-activemq-5.9.0-bin.tar.gz

技术分享

[zcw@g1 ~]$ cd apache-activemq-5.9.0

[zcw@g1 apache-activemq-5.9.0]$ cd bin/

3.运行:

[zcw@g1 bin]$ activemq start

技术分享

三种运行方式:
(1)普通启动 ./activemq start
(2)启动并指定日志文件 ./activemq start >tmp/smlog
(3)后台启动方式nohup ./activemq start >/tmp/smlog
前两种方式下在命令行窗口关闭时或者ctrl+c时导致进程退出,采用后台启动方式则可以避免这种情况

管理后台为:

http://ip:8161/admin/

技术分享

 

4.检查已经启动
 ActiveMQ默认采用61616端口提供JMS服务,使用8161端口提供管理控制台服务,执行以下命令以便检验是否已经成功启动ActiveMQ服务。
打开端口:nc -lp 61616 &
查看哪些端口被打开 netstat -anp
查看61616端口是否打开: netstat -an | grep 61616
检查是否已经启动:
(1).查看控制台输出或者日志文件 
(2).直接访问activemq的管理页面:http://localhost:8161/admin/

5.关闭
如果开启方式是使用(1)或(2),则直接ctrl+c或者关闭对应的终端即可 
如果开启方式是(3),则稍微麻烦一点: 
先查找到activemq对应的进程: 
ps -ef | grep activemq 
然后把对应的进程杀掉,假设找到的进程编号为 168168 
kill 168168 

二.创建ActiveMQ的Eclipse目并运行

1)P2P方式

技术分享

到中心仓库(

http://search.maven.org/

)里面找到:activemq-core

pom.xml:

技术分享 View Code

 

Sender:

package net.datafans.exercise.rockmq.TestJMS;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class Sender {    private static final int SEND_NUMBER = 5;    public static void main(String[] args) {        // ConnectionFactory :连接工厂,JMS 用它创建连接        ConnectionFactory connectionFactory;        // Connection :JMS 客户端到JMS Provider 的连接        Connection connection = null;        // Session: 一个发送或接收消息的线程        Session session;        // Destination :消息的目的地;消息发送给谁.        Destination destination;        // MessageProducer:消息发送者        MessageProducer producer;        // TextMessage message;        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar        connectionFactory = new ActiveMQConnectionFactory(                ActiveMQConnection.DEFAULT_USER,                ActiveMQConnection.DEFAULT_PASSWORD,                "tcp://ip:61616");        try {            // 构造从工厂得到连接对象            connection = connectionFactory.createConnection();            // 启动            connection.start();            // 获取操作连接            session = connection.createSession(Boolean.TRUE,                    Session.AUTO_ACKNOWLEDGE);            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置            destination = session.createQueue("FirstQueue");            // 得到消息生成者【发送者】            producer = session.createProducer(destination);            // 设置不持久化,此处学习,实际根据项目决定            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);            // 构造消息,此处写死,项目就是参数,或者方法获取            sendMessage(session, producer);            session.commit();        } catch (Exception e) {            e.printStackTrace();        } finally {            try {                if (null != connection)                    connection.close();            } catch (Throwable ignore) {            }        }    }    public static void sendMessage(Session session, MessageProducer producer)            throws Exception {        for (int i = 1; i <= SEND_NUMBER; i++) {            TextMessage message = session                    .createTextMessage("ActiveMq 发送的消息" + i);            // 发送消息到目的地方            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);            producer.send(message);        }    }}

 

 

Receiver:

package net.datafans.exercise.rockmq.TestJMS;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.MessageConsumer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class Receiver {    public static void main(String[] args) {        // ConnectionFactory :连接工厂,JMS 用它创建连接        ConnectionFactory connectionFactory;        // Connection :JMS 客户端到JMS Provider 的连接        Connection connection = null;        // Session: 一个发送或接收消息的线程        Session session;        // Destination :消息的目的地;消息发送给谁.        Destination destination;        // 消费者,消息接收者        MessageConsumer consumer;        connectionFactory = new ActiveMQConnectionFactory(                ActiveMQConnection.DEFAULT_USER,                ActiveMQConnection.DEFAULT_PASSWORD,                "tcp://ip:61616");        try {            // 构造从工厂得到连接对象            connection = connectionFactory.createConnection();            // 启动            connection.start();            // 获取操作连接            session = connection.createSession(Boolean.FALSE,                    Session.AUTO_ACKNOWLEDGE);            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置            destination = session.createQueue("FirstQueue");            consumer = session.createConsumer(destination);            while (true) {                //设置接收者接收消息的时间,为了便于测试,这里谁定为100s                TextMessage message = (TextMessage) consumer.receive(100000);                if (null != message) {                    System.out.println("收到消息" + message.getText());                } else {                    break;                }            }        } catch (Exception e) {            e.printStackTrace();        } finally {            try {                if (null != connection)                    connection.close();            } catch (Throwable ignore) {            }        }    }}

 

 

测试过程:

发送

 技术分享

 

接收

 技术分享 

 2)Pub/Sub模式

package net.datafans.exercise.rockmq.TestJMS;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class Pub {    private static final int SEND_NUMBER = 5;    public static void main(String[] args) {        // ConnectionFactory :连接工厂,JMS 用它创建连接        ConnectionFactory connectionFactory;        // Connection :JMS 客户端到JMS Provider 的连接        Connection connection = null;        // Session: 一个发送或接收消息的线程        Session session;        // Destination :消息的目的地;消息发送给谁.        Destination destination;        // MessageProducer:消息发送者        MessageProducer producer;        // TextMessage message;        // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar        connectionFactory = new ActiveMQConnectionFactory(                ActiveMQConnection.DEFAULT_USER,                ActiveMQConnection.DEFAULT_PASSWORD,                "tcp://ip:61616");        try {            // 构造从工厂得到连接对象            connection = connectionFactory.createConnection();            // 启动            connection.start();            // 获取操作连接            session = connection.createSession(Boolean.TRUE,                    Session.AUTO_ACKNOWLEDGE);            Topic topic = session.createTopic("MessageTopic");                        producer = session.createProducer(topic);            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);            TextMessage message = session.createTextMessage();            message.setText("message_hello_chenkangxian");            producer.send(message);                        //            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置//            destination = session.createQueue("FirstQueue");//            // 得到消息生成者【发送者】//            producer = session.createProducer(destination);//            // 设置不持久化,此处学习,实际根据项目决定//            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//            // 构造消息,此处写死,项目就是参数,或者方法获取//            sendMessage(session, producer);//            session.commit();        } catch (Exception e) {            e.printStackTrace();        } finally {            try {                if (null != connection)                    connection.close();            } catch (Throwable ignore) {            }        }    }    public static void sendMessage(Session session, MessageProducer producer)            throws Exception {        for (int i = 1; i <= SEND_NUMBER; i++) {            TextMessage message = session                    .createTextMessage("ActiveMq 发送的消息" + i);            // 发送消息到目的地方            System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);            producer.send(message);        }    }}

 

package net.datafans.exercise.rockmq.TestJMS;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.TextMessage;import javax.jms.Topic;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class Sub {    public static void main(String[] args) {        // ConnectionFactory :连接工厂,JMS 用它创建连接        ConnectionFactory connectionFactory;        // Connection :JMS 客户端到JMS Provider 的连接        Connection connection = null;        // Session: 一个发送或接收消息的线程        Session session;        // Destination :消息的目的地;消息发送给谁.        Destination destination;        // 消费者,消息接收者        MessageConsumer consumer;        connectionFactory = new ActiveMQConnectionFactory(                ActiveMQConnection.DEFAULT_USER,                ActiveMQConnection.DEFAULT_PASSWORD,                "tcp://ip:61616");        try {            // 构造从工厂得到连接对象            connection = connectionFactory.createConnection();            // 启动            connection.start();            // 获取操作连接            session = connection.createSession(Boolean.TRUE,                    Session.AUTO_ACKNOWLEDGE);                        Topic topic = session.createTopic("MessageTopic");                        consumer = session.createConsumer(topic);                        consumer.setMessageListener(new MessageListener() {                                public void onMessage(Message message) {                    // TODO Auto-generated method stub                    TextMessage tm = (TextMessage)message;                    try {                        System.out.println(tm.getText());                    } catch (JMSException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                    }                }            });                        // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置//            destination = session.createQueue("FirstQueue");//            consumer = session.createConsumer(destination);//            while (true) {//                //设置接收者接收消息的时间,为了便于测试,这里谁定为100s//                TextMessage message = (TextMessage) consumer.receive(100000);//                if (null != message) {//                    System.out.println("收到消息" + message.getText());//                } else {//                    break;//                }//            }        } catch (Exception e) {            e.printStackTrace();        } finally {            try {                if (null != connection)                    connection.close();            } catch (Throwable ignore) {            }        }    }}

测试

PS:代码都写出来了只是不知道为啥测试Pub/Sub模式一直都出不来 

3.Request和Reply模式

CONFIG:

/** * @author mike * @date Apr 17, 2014 */package net.datafans.exercise.rockmq.TestJMS;class CONFIG {    public static final java.lang.String AUTHOR = "Mike Tang";    public static final java.lang.String QUEUE_NAME = "REQUEST AND REPLY";        public static void introduce() {        java.lang.StringBuilder builder = new java.lang.StringBuilder();        builder.append("This is a simple example to show how to write a \n");        builder.append("request and reply pattern program with Apache ActiveMQ.\n");        builder.append("which is not support such pattern.\n\n");        builder.append("                       -------- By Mike Tang\n");        builder.append("                   2014-4-17 at Soochow University\n");        System.out.println(builder.toString());    }        public static void introduceServer() {        System.out.println("Wait for the client send messages, and you will see something");    }        public static void introduceClient() {        System.out.println("Input something and ENTER, you will get the reply.\n"                + "and input ‘exit‘ stop the program.");    }}

 

MessageClient

package net.datafans.exercise.rockmq.TestJMS;import java.util.Scanner;import java.util.UUID;import javax.jms.Connection;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TemporaryQueue;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class MessageClient {    String                 brokerurl      = "ip";    static Session         session        = null;    static MessageConsumer consumer       = null;    static MessageProducer producer       = null;    static TemporaryQueue  temporaryQueue = null;    public void setURL(String brokerurl) {        this.brokerurl = brokerurl;    }    public void start() throws JMSException {        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerurl);        Connection connection = connectionFactory.createConnection();        connection.start();        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        Destination destination = session.createQueue(CONFIG.QUEUE_NAME);        temporaryQueue = session.createTemporaryQueue();        {            producer = session.createProducer(destination);            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);            consumer = session.createConsumer(temporaryQueue);            consumer.setMessageListener(new MMessageListener());        }    }    public void request(String request) throws JMSException {        System.out.println("REQUEST TO : " + request);        TextMessage textMessage = session.createTextMessage();        textMessage.setText(request);        textMessage.setJMSReplyTo(temporaryQueue);        textMessage.setJMSCorrelationID(UUID.randomUUID().toString());        MessageClient.producer.send(textMessage);    }    private static class MMessageListener implements MessageListener {        public void onMessage(Message message) {            try {                if (message instanceof TextMessage) {                    String messageText = ((TextMessage) message).getText();                    System.out.println("REPLY FROM : " + messageText.toUpperCase());                }            } catch (JMSException e) {                e.printStackTrace();            }        }    }    // -----------------------------------------------------------------------    public static void main(String[] args) {        CONFIG.introduce();        CONFIG.introduceClient();        try {            MessageClient client = new MessageClient();            client.setURL("tcp://ip:61616");            Scanner scanner = new Scanner(System.in);            client.start();            {                System.out.println("-----------------------------------------");                System.out.println("|           Client Start!               |");                System.out.println("-----------------------------------------");            }            String message = "";            int i = 0;            while (!(message = scanner.next()).equals("exit")) {                client.request(message + " : " + i++);            }            scanner.close();        } catch (JMSException e) {            e.printStackTrace();        }    }}

 

MessageServer

package net.datafans.exercise.rockmq.TestJMS;import javax.jms.Connection;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.broker.BrokerService;public class MessageServer {    String                 brokerurl     = "ip";    static BrokerService   brokerService = null;    static MessageConsumer consumer      = null;    static MessageProducer producer      = null;    static Session         session       = null;    public void setURL(String brokerurl) {        this.brokerurl = brokerurl;    }    public void start() throws JMSException {        createBroker(brokerurl);        setConsumer(brokerurl);    }    public void createBroker(String brokerurl) {        try {            brokerService = new BrokerService();            brokerService.setUseJmx(false);            brokerService.setPersistent(false);            brokerService.addConnector(brokerurl);            brokerService.start();        } catch (Exception e) {            e.printStackTrace();        }    }    public void setConsumer(String url) throws JMSException {        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);        Connection connection = connectionFactory.createConnection();        connection.start();        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        Destination destination = session.createQueue(CONFIG.QUEUE_NAME);        {            producer = session.createProducer(null);            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);            consumer = session.createConsumer(destination);            consumer.setMessageListener(new MMessageListener());        }    }    private static class MMessageListener implements MessageListener {        public void onMessage(Message message) {            try {                TextMessage response = session.createTextMessage();                if (message instanceof TextMessage) {                    String messageText = ((TextMessage) message).getText();                    response.setText(handleMessage(messageText));                    System.out.println("REQUEST FROM " + messageText.toUpperCase());                }                response.setJMSCorrelationID(message.getJMSCorrelationID());                producer.send(message.getJMSReplyTo(), response);            } catch (JMSException e) {                e.printStackTrace();            }        }        private String handleMessage(String text) {            return "RESPONSE TO " + text.toUpperCase();        }    }    // -----------------------------------------------------------------    public static void main(String[] args) {                CONFIG.introduce();        CONFIG.introduceServer();                try {            MessageServer server = new MessageServer();            server.setURL("tcp://ip:61616");            server.start();            {                System.out.println("-----------------------------------------");                System.out.println("|           Server Start!               |");                System.out.println("-----------------------------------------");            }        } catch (JMSException e) {            e.printStackTrace();        }    }}

测试:

技术分享 

 

技术分享

 摘自:http://www.cnblogs.com/super-d2/p/4249768.html

ActiveMQ安装与使用