首页 > 代码库 > JMS基础篇

JMS基础篇

  首先我们需要下载 ActiveMQ:http://activemq.apache.org/。

  启动 ActiveMQ 服务:解包下载的 ActiveMQ 》进去其bin 目录》双击 activemq.bat。

    ActiveMQ 默认使用的是端口61616,可以在cmd中查看61616端口是否被占用,以确定ActiveMQ 服务是否正常启动。查看的命令如下:

netstat -nao | find "61616",如果服务启动则可以看到ActiveMQ所对应的的进程。

      下面将以三种形式体验ActiveMQ 发送,接收消息的功能。

      第一种是ActiveMQ 所支持的,但不是基于JNDI的,不是JMS标准所建议的。

      发送消息的类如下,基于的端口是默认的61616,连续发送了5了消息,消息队列是xiaoyunduo。

import java.util.Date;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MapMessage;import javax.jms.MessageProducer;import javax.jms.Session;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class Sender {	public static void main(String[] srgs) throws JMSException, InterruptedException {		factory;		Connection connection = null;		Session session = null;		Destination destination = null;		MessageProducer producer = null;		// 创建连接工厂		factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");		// 创建连接		connection = factory.createConnection();		// 建立连接		connection.start();		// 建立session		session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);		// 指定消息队列		destination = session.createQueue("xiaoyunduo");		// 创建消息发生器		producer = session.createProducer(destination);		for (int i = 0; i < 5; i++) {			MapMessage message = session.createMapMessage();			message.setLong("mess", new Date().getTime());			Thread.sleep(1000);			// 发送消息			producer.send(message);		}		session.commit();		session.close();		connection.close();	}}

  接收消息的类如下,基于的端口是默认的61616,消息队列是xiaoyunduo。

import java.util.Date;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MapMessage;import javax.jms.MessageConsumer;import javax.jms.Session;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class Receiver {	public static void main(String[] args) throws JMSException {		ConnectionFactory factory = null;		Connection connection = null;		Session session = null;		Destination destination = null;		MessageConsumer consumer = null;		// 创建连接工厂		factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");		// 创建连接		connection = factory.createConnection();		// 建立连接		connection.start();		// 建立session		session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);		// 指定消息队列		destination = session.createQueue("xiaoyunduo");		// 产生消费者		consumer = session.createConsumer(destination);		for (int i = 0; i < 5; i++) {			//获取jms server中的消息			MapMessage message = (MapMessage) consumer.receive(1000);			session.commit();			System.out.println("收到消息:" + new Date(message.getLong("mess")));		}		session.close();		connection.close();	}}

  先启动发送消息的类,再启动接收消息的类,可以看到消息内容打印在消息接收端。

      第二种是基于文件形式的JNDI,使用Sun自带的RefFSContextFactory来存储JNDI信息。需要引入fscontext.jar和providerutil.jar,这是进行测试的前提。      

  ConnectionFactory改从Context中读取,其余部分保持不变。发送消息和接收消息的类都需要进行对应的修改。

        // 创建连接工厂        //factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");        Hashtable env = new Hashtable(5);        env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.fscontext.RefFSContextFactory");        env.put(Context.PROVIDER_URL, "file:JNDI_REF");                        try {            Context ctx = new InitialContext(env);            ActiveMQConnectionFactory mqFactory = new ActiveMQConnectionFactory();            mqFactory.setBrokerURL("tcp://localhost:61616");            mqFactory.setUserName(null);            mqFactory.setPassword(null);                        //设置的参数少了某一项,只有sender发送等几秒后再去启动receive才没问题,否则接收不到消息            //ctx.bind("mqFactory", mqFactory);//只需要绑定一次            factory = (ConnectionFactory) ctx.lookup("mqFactory");        } catch (NamingException e) {            e.printStackTrace();        }

        第三种基于配置文件的方式,需要先有下面的一个配置文件jndi.properties,其中包含了ActiveMQ 的基本配置信息。

java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactoryjava.naming.provider.url=tcp://localhost:61616java.naming.security.principal=systemjava.naming.security.credentials=manager connectionFactoryNames=con1,con2##queue.MyQueue=MyQueuetopic.MyTopic=MyTopictopic.topic1=jms.topic1

     还需要一个读取配置文件的工厂类,InitialContext利用properties信息进行初始化,将直接利用JNDI从InitialContext中读取信息。

import java.util.Properties;import javax.naming.Context;import javax.naming.InitialContext;import javax.naming.NamingException;public class JndiFactoryForJMS {     protected Context context = null;            public void initalize() throws NamingException      {                 Properties props = new Properties();          try{              org.apache.activemq.jndi.ActiveMQInitialContextFactory af = new org.apache.activemq.jndi.ActiveMQInitialContextFactory();            props.load(this.getClass().getResourceAsStream("jndi.properties"));            context = new InitialContext(props);          }catch(Exception ex){              ex.printStackTrace();        }                    }        public Context getJndiContext() throws NamingException {          if(context == null){              initalize();          }          return context;      }       }  

        和第二种方法类似,只需要改变第一个类的部分代码即可,发送端和接收端要同时修改。

      // 创建连接工厂      //factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp:/      /localhost:61616");				     try {	  JndiFactoryForJMS factoryForJMS = new JndiFactoryForJMS();          	          	    Context ctx = factoryForJMS.getJndiContext();  	       	    //获取连接工厂。  	    factory = (ConnectionFactory)ctx.lookup("con1");  	    } catch (NamingException e) {	    e.printStackTrace();	   }