首页 > 代码库 > activemq入门实例
activemq入门实例
首先了解下jms。
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
JMS对象模型包含如下几个要素:
1.ConnectionFactory 创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。
2. Destination Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。又称为消息队列,是实际的消息源
3. Connection Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。JMS连接(Connection)表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。
4. Session (Session)表示JMS客户与JMS服务器之间的会话状态。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。ession是我们操作消息的接口。可以通过session创建生产者、消费者、消息等。同样,也分QueueSession和TopicSession。
5. 生产者(Message Producer)和消费者(Message Consumer)对象由Session对象创建,用于发送和接收消息。消息生产者分两种类型:QueueSender和TopicPublisher。消息消费者也分为两种类型:QueueReceiver和TopicSubscriber。
下面展示一个activemq的入门例子:
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Message;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JmsOwnTest {
public static void main(String[] args) throws Exception {
thread(new HelloWorldProducer(), false);
thread(new HelloWorldProducer(), false);
Thread.sleep(1000);
thread(new HelloWorldConsumer(), false);
Thread.sleep(2000);
thread(new HelloWorldConsumer(), false);
}
public static void thread(Runnable runnable, boolean daemon) {
Thread brokerThread = new Thread(runnable);
brokerThread.setDaemon(daemon);
brokerThread.start();
}
public static class HelloWorldProducer implements Runnable {
public void run() {
try {
// 创建一个连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个名称为firstQueue的消息队列
Destination destination = session.createQueue("firstQueue");
// 通过session创建指定队列的消息生产者
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 消息内容
String text = "我来了! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
TextMessage message = session.createTextMessage(text);
// 发送消息
System.out.println("发送消息内容: 【" + text + "】");
producer.send(message);
session.close();
connection.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
public static class HelloWorldConsumer implements Runnable, ExceptionListener {
public void run() {
try {
// 创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 创建连接
Connection connection = connectionFactory.createConnection();
//启动连接
connection.start();
connection.setExceptionListener(this);
//创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个名称为firstQueue的消息队列
Destination destination = session.createQueue("firstQueue");
//通过session创建指定队列的消息消费者
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("收到的消息: 【" + text + "】");
} else {
System.out.println("收到的消息: 【" + message + "】");
}
consumer.close();
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public synchronized void onException(JMSException ex) {
System.out.println("JMS异常.");
}
}
}
activemq入门实例