首页 > 代码库 > JMS基础篇(二)
JMS基础篇(二)
简介
异构集成是消息发挥作用的一个领域,大型公司内部可能会遇到很多的平台,Java,.net或者公司自己的平台等。
传送消息还应该支持异步机制,以提高系统整体的性能。异步传输一条消息意味着,发送者不必等到接收者接收或者处理消息,可以接着做后续的处理。
应用程序发送消息至另外一个应用程序,需要使用到消息中间件。消息中间件应提供容错,负载均衡,可伸缩的事务性等特性。
JMS与JDBC类似,是一种与厂商无关的API。应用程序开发者可以使用同样的API来访问不同的系统。
可以认为JMS是一种标准,各消息中间件(MOM)是JMS的具体实现。常见的MOM包括WebSphere MQ,Sonic MQ,ActiveMQ等。
JMS系统机构
消息传递系统可以分为集中式和分散式两种。
集中式的消息系统依赖于一个消息服务器,或者称为消息路由器或者代理(broker)来进行消息的接收及分发。
集中式的消息系统的结构最常见的是星形结构。
图1 集中式消息系统结构
分散式消息系统基于的是IP组播,整个结构又可以为分为多个组播组。每个组播组使用一个IP地址,客户端可以加入到一个或多个组播组。消息的传递不依赖于消息服务器,由网络自身来完成处理的。
图2 分散式消息系统结构
消息传送模型分为2种,点对点式和发布/订阅式。
图3 消息传输模型示意图
生产消息的客户端成为生产者,消费消息的客户端成为消费者。一个JMS的客户端可以既是生产者又是消费者。
点对点模型基于的是拉取(pull)或轮询(polling),消费者从队列中去取消息。发布/订阅基于的是推送(push),消息被主动地从生产者推送至消费者。
一个简单的例子
使用JMS编写一个简单的聊天程序,代码如下:
import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;import javax.jms.Topic;import javax.jms.TopicConnection;import javax.jms.TopicConnectionFactory;import javax.jms.TopicPublisher;import javax.jms.TopicSession;import javax.jms.TopicSubscriber;import javax.naming.Context;import javax.naming.NamingException;import jms.JndiFactoryForJMS;public class Chat implements MessageListener { private TopicPublisher publisher; private TopicSubscriber subscriber; private TopicSession pubSession = null, subSession = null; private TopicConnection connection = null; public Chat() throws JMSException, InterruptedException, NamingException { TopicConnectionFactory factory = null; Context ctx = null; try { JndiFactoryForJMS factoryForJMS = new JndiFactoryForJMS(); ctx = factoryForJMS.getJndiContext(); // 获取连接工厂。 factory = (TopicConnectionFactory) ctx.lookup("con1"); } catch (NamingException e) { e.printStackTrace(); } // 创建连接 connection = factory.createTopicConnection(); // 建立session pubSession = connection.createTopicSession(false, pubSession.AUTO_ACKNOWLEDGE); subSession = connection.createTopicSession(false, pubSession.AUTO_ACKNOWLEDGE); // 指定消息队列 Topic topic = (Topic) ctx.lookup("MyTopic"); publisher = pubSession.createPublisher(topic); subscriber = pubSession.createSubscriber(topic, null, true); subscriber.setMessageListener(this); // 建立连接 connection.start(); } public static void main(String[] srgs) throws JMSException, InterruptedException, NamingException, IOException, CloneNotSupportedException { Chat chat = new Chat(); BufferedReader commandLine = new BufferedReader(new InputStreamReader(System.in)); while (true) { String s = commandLine.readLine(); if (s.equalsIgnoreCase("exit")) { chat.close(); System.exit(-1); } else { chat.writeMessage(s); } } } @Override public void onMessage(Message message) { TextMessage mes = (TextMessage) message; try { System.out.println(mes.getText()); } catch (JMSException e) { e.printStackTrace(); } } private void writeMessage(String text) throws JMSException { TextMessage mes = pubSession.createTextMessage(text); publisher.publish(mes); } private void close() throws JMSException { connection.close(); }}
将程序启动多份,在任意一个程序的console窗口中输入信息,可以看到另外程序的console窗口中出现了所输入的内容,就可以说明另外的程序收到了消息并将消息并打印出了消息内容。
分析上面基于JMS的聊天程序:
上面的JMS聊天程序是基于JNDI的,未使用JNDI的服务器,使用的是,因此需要一个配置文件,需要和上面的类放在同一级目录下。
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://localhost:61616
java.naming.security.principal=system
java.naming.security.credentials=manager
connectionFactoryNames=con1,con2
##queue.MyQueue=MyQueue
topic.MyTopic=MyTopic
topic.topic1=jms.topic1
JndiFactoryForJMS是一个初始化JNDI环境的工厂类,代码如下:
import java.util.Properties;import javax.naming.Context;import javax.naming.InitialContext;import javax.naming.NamingException;public class JndiFactoryForJMS { protected Context context = null; public void initalize() throws NamingException { Properties props = new Properties(); try{ org.apache.activemq.jndi.ActiveMQInitialContextFactory af = new org.apache.activemq.jndi.ActiveMQInitialContextFactory(); props.load(this.getClass().getResourceAsStream("jndi.properties")); context = new InitialContext(props); }catch(Exception ex){ ex.printStackTrace(); } } public Context getJndiContext() throws NamingException { if(context == null){ initalize(); } return context; } }
分析代码中与JNDI相关的部分:
JndiFactoryForJMS factoryForJMS = new JndiFactoryForJMS();
ctx = factoryForJMS.getJndiContext();
// 获取连接工厂。
factory = (TopicConnectionFactory) ctx.lookup("con1");