首页 > 代码库 > ActiveMQ简单实现之一对一生产和消费

ActiveMQ简单实现之一对一生产和消费

第一步: 下载ActiveMQ工具包 url:http://activemq.apache.org

技术分享

第二部解压并启动:

全家福

技术分享

启动方式: 注 不要直接启动bin目录下的bat  按系统版本启动相应的bat  比如我的是64位

技术分享

技术分享

启动完毕 进入控制台  默认地址 http://127.0.0.1:8161/admin   账号admin密码admin

技术分享

第三部:创建java工程简单实现简单单点发布和消费 导入activemq-all-xxx.jar

技术分享

创建消息生产者类Producer.java

技术分享

package com.sgor.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 消息生产者类
 * @author xixiao
 *
 */
public class Producer {
	public static void main(String[] args){
		ConnectionFactory connectionFactory; //创建链接工厂
		Connection connection = null;//链接
		Session session;//创建会话
		Destination destination;//消息目的地 消息队列
		MessageProducer messageProducer;//消息生产者
		//实例化链接工厂  参数为 用户,密码,url
		connectionFactory = new ActiveMQConnectionFactory("xixiao", "xixiao", ActiveMQConnection.DEFAULT_BROKER_URL);
		try {
			connection=connectionFactory.createConnection();//通过链接工厂创建链接
			connection.start();//启动链接
			//创建会话 Session.AUTO_ACKNOWLEDGE。receive 或MessageListener.onMessage()成功返回的时候,自动确认收到消息。
			session =connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
			//创建一个消息队列名称为hello ActiveMQ 消息队列中可包含需要发布消息 
			destination = session.createQueue("Hello ActiveMQ");
			//将创建的消息队列hello ActiveMQ交给消息发布者messageProdecer
			messageProducer=session.createProducer(destination);
			for (int i = 0; i <5 ; i++) {
				//生产5条消息
				TextMessage message=session.createTextMessage(i+"条消息");
				System.out.println(message.getText());
                    //发布消息 messageProducer.send(message); } //提交事物 session.commit(); } catch (JMSException e) { e.printStackTrace(); }finally{ try { //关闭连接 connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } }

 创建消息消费类

 这里使用了一个Listener实现了MessageListener中的onMessage  当这个方法成功返回时会话可以自动确认消息被消费

package com.sgor.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {
	public static void main(String[] args) {
		ConnectionFactory connectionFactory; 
		Connection connection = null;
		Session session;
		Destination destination;
		MessageConsumer messageConsumer;
		connectionFactory = new ActiveMQConnectionFactory("xixiao", "xixiao", ActiveMQConnection.DEFAULT_BROKER_URL);
		try {
			//通过链接工厂创建链接
			connection  = connectionFactory.createConnection();
			connection.start();//启动链接
			//这里直接设为false  不需要事物
			session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("Hello ActiveMQ"); //创建消息队列 用于接收发布的消息
			messageConsumer = session.createConsumer(destination);
			/**
			 * 监听生产者方式接受消息,生产者产生消息才开始接收 需要监听器 实现MessageListener (javax.jms.MessageListener包)
			 */
			messageConsumer.setMessageListener(new Listener());
		} catch (JMSException e) {
			e.printStackTrace();
		}
		
		
	}
}

 Listener

package com.sgor.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Listener implements MessageListener{

	@Override
	public void onMessage(Message arg0) {
		try {
			System.out.println("消息:"+((TextMessage)arg0).getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
		
	}

}

 测试一下:

首先生产消息

技术分享

查看控制台》》点击Queues 待处理消息5  消费者0 列队中的消息5   成功生产了5条消息

技术分享

接下来执行消费类,将队列中的消息确认

技术分享

打开控制台查看消息确认情况   待消费消息0 消费者1 消费消息5   消息被1个客户消费

技术分享

 

ActiveMQ简单实现之一对一生产和消费