首页 > 代码库 > JMS基础篇(二)

JMS基础篇(二)

简介

  异构集成是消息发挥作用的一个领域,大型公司内部可能会遇到很多的平台,Java,.net或者公司自己的平台等。

     传送消息还应该支持异步机制,以提高系统整体的性能。异步传输一条消息意味着,发送者不必等到接收者接收或者处理消息,可以接着做后续的处理。

    应用程序发送消息至另外一个应用程序,需要使用到消息中间件。消息中间件应提供容错,负载均衡,可伸缩的事务性等特性。

    JMS与JDBC类似,是一种与厂商无关的API。应用程序开发者可以使用同样的API来访问不同的系统。

    可以认为JMS是一种标准,各消息中间件(MOM)是JMS的具体实现。常见的MOM包括WebSphere MQ,Sonic MQ,ActiveMQ等。

JMS系统机构

    消息传递系统可以分为集中式和分散式两种。

    集中式的消息系统依赖于一个消息服务器,或者称为消息路由器或者代理(broker)来进行消息的接收及分发。

    集中式的消息系统的结构最常见的是星形结构。

 

图1 集中式消息系统结构

     分散式消息系统基于的是IP组播,整个结构又可以为分为多个组播组。每个组播组使用一个IP地址,客户端可以加入到一个或多个组播组。消息的传递不依赖于消息服务器,由网络自身来完成处理的。

 

图2 分散式消息系统结构

     消息传送模型分为2种,点对点式和发布/订阅式。

 

图3 消息传输模型示意图

      生产消息的客户端成为生产者,消费消息的客户端成为消费者。一个JMS的客户端可以既是生产者又是消费者。

     点对点模型基于的是拉取(pull)或轮询(polling),消费者从队列中去取消息。发布/订阅基于的是推送(push),消息被主动地从生产者推送至消费者。

      一个简单的例子

    使用JMS编写一个简单的聊天程序,代码如下:   

import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;import javax.jms.Topic;import javax.jms.TopicConnection;import javax.jms.TopicConnectionFactory;import javax.jms.TopicPublisher;import javax.jms.TopicSession;import javax.jms.TopicSubscriber;import javax.naming.Context;import javax.naming.NamingException;import jms.JndiFactoryForJMS;public class Chat implements MessageListener {	private TopicPublisher publisher;	private TopicSubscriber subscriber;	private TopicSession pubSession = null, subSession = null;	private TopicConnection connection = null;	public Chat() throws JMSException, InterruptedException, NamingException {		TopicConnectionFactory factory = null;		Context ctx = null;		try {			JndiFactoryForJMS factoryForJMS = new JndiFactoryForJMS();			ctx = factoryForJMS.getJndiContext();			// 获取连接工厂。			factory = (TopicConnectionFactory) ctx.lookup("con1");		} catch (NamingException e) {			e.printStackTrace();		}		// 创建连接		connection = factory.createTopicConnection();				// 建立session		pubSession = connection.createTopicSession(false, pubSession.AUTO_ACKNOWLEDGE);		subSession = connection.createTopicSession(false, pubSession.AUTO_ACKNOWLEDGE);		// 指定消息队列		Topic topic = (Topic) ctx.lookup("MyTopic");		publisher = pubSession.createPublisher(topic);		subscriber = pubSession.createSubscriber(topic, null, true);		subscriber.setMessageListener(this);		// 建立连接		connection.start();	}	public static void main(String[] srgs) throws JMSException, InterruptedException, NamingException, IOException, CloneNotSupportedException {		Chat chat = new Chat();		BufferedReader commandLine = new BufferedReader(new InputStreamReader(System.in));		while (true) {			String s = commandLine.readLine();			if (s.equalsIgnoreCase("exit")) {				chat.close();				System.exit(-1);			} else {				chat.writeMessage(s);			}		}	}	@Override	public void onMessage(Message message) {		TextMessage mes = (TextMessage) message;		try {			System.out.println(mes.getText());		} catch (JMSException e) {			e.printStackTrace();		}	}	private void writeMessage(String text) throws JMSException {		TextMessage mes = pubSession.createTextMessage(text);		publisher.publish(mes);	}	private void close() throws JMSException {		connection.close();	}}

  

      将程序启动多份,在任意一个程序的console窗口中输入信息,可以看到另外程序的console窗口中出现了所输入的内容,就可以说明另外的程序收到了消息并将消息并打印出了消息内容。

     分析上面基于JMS的聊天程序:

     上面的JMS聊天程序是基于JNDI的,未使用JNDI的服务器,使用的是,因此需要一个配置文件,需要和上面的类放在同一级目录下。

         java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory

   java.naming.provider.url=tcp://localhost:61616 

   java.naming.security.principal=system

        java.naming.security.credentials=manager

        connectionFactoryNames=con1,con2

        ##queue.MyQueue=MyQueue

       topic.MyTopic=MyTopic

       topic.topic1=jms.topic1

       

     JndiFactoryForJMS是一个初始化JNDI环境的工厂类,代码如下: 

import java.util.Properties;import javax.naming.Context;import javax.naming.InitialContext;import javax.naming.NamingException;public class JndiFactoryForJMS {	 protected Context context = null;            public void initalize() throws NamingException      {                 Properties props = new Properties();          try{          	org.apache.activemq.jndi.ActiveMQInitialContextFactory af = new org.apache.activemq.jndi.ActiveMQInitialContextFactory();            props.load(this.getClass().getResourceAsStream("jndi.properties"));            context = new InitialContext(props);          }catch(Exception ex){          	ex.printStackTrace();        }                    }        public Context getJndiContext() throws NamingException {          if(context == null){              initalize();          }          return context;      }       }  

  

        分析代码中与JNDI相关的部分:

        JndiFactoryForJMS factoryForJMS = new JndiFactoryForJMS();

        ctx = factoryForJMS.getJndiContext(); 

        // 获取连接工厂。

       factory = (TopicConnectionFactory) ctx.lookup("con1");