首页 > 代码库 > JMS

JMS

  JSM(Java Message Service)既消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。

  消息模型分为点对点和发布订阅两种模式:

  Point-to-Point(P2P):

  技术分享

  每个消息都被发送到一个特定的队列,接收者从队列中获取消息,队列保留着消息,直到他们被消费或超时。

  每个消息只有一个消费者,一旦被消费,消息就从消息队列中消失。发送者和接收者在时间上没有依赖性,也就是说党发送者发送了消息之后,不管接受者是不是正在运行,都不会影响到消息被发送到队列。接收者成功接收消息之后需要向队列应答成功。

  Publish/Subscribe(Pub/Sub):

  客户端将消息发送到主题,多个发布者将消息发布到Topic,系统将这些消息传递给多个订阅者。

  每个消息可以有多个消费者,发布者和订阅者之间有时间上的依赖性,针对某个主题的订阅者,必须创建一个订阅者后才能消费消息发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅,这样即使订阅者没有运行,他也能接收到发布者的消息。

  ConnectionFactory:

  创建ConnectionFactory对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种,可以通过JNDI来查找ConnectionFactory。

  Destination:

  Destination的意思是生产者的消息发送目标,或消费者的消息来源,对于生产者来说他的Destination是某个队列(Queue)或某个主题(Topic),对于消费者来说,他的Destination也是某个队列或主题。所以,Destination实际上就是两种类型对象,Queue,Topic,可以通过JNDI来查找Destination。

  Connection:

  Connection表示客户端和JMS系统之间建立的链接(对TCP、IP socket的包装)。Connection可以产生一个或多个Session,和ConnectionFactory一样,Connection也有两种类型QueueConnection和TopicConnection。

  Session:

  这里的Session是我没操作消息的接口,可以通过Session创建生产者,消费者,消息等。Session提供了事物的功能,当我们需要使用Session发送或接收消息时,可以将这些发送、接收动作放到一个事物中,同样,Session也分QueueSession和TopicSession两种。

  provider:

  消息 生产者由Session创建,并用于将消息发送到Destination,同样消息生产者也分为两种类型QueueSender、TopicPublisher,通过调用消息生产者的方法send、publish发送消息。

  consumer:

  消息消费者通过Session创建,用于接收被发送到Destionation的消息,同样消息消费者也分两种类型QueueReceiver、TopicSubscriber,通过session的createReceiver(Queue)、createSubscriber(Topic)创建,同时,也可以通过session的createDurableSubscriber方法来创建持久化的订阅者。

  MessageListener:

  消息监听器,如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。

  技术分享

  消息队列可以提供消息的灵活性(将数据从一个应用程序传送到另一个应用程序,从软件的一个模块传递到另一个模块),松耦合,异步性(允许接收者在消息发送很长时间后再取回消息,提高服务器处理性能),进行数据的可靠传输保证数据不重发不丢失,能够实现跨平台操作。

  activeMQ实例:

  spring 配置:

  

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">    <!-- Root Context: defines shared resources visible to all other web components -->    <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">        <property name="brokerURL" value=http://www.mamicode.com/"tcp://localhost:61616" />    </bean>        <!-- config  jmsTemplate-->     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">         <property name="connectionFactory" ref="connectionFactory" />         <property name="defaultDestination" ref="defaultDestination" />      </bean>          <!--  Default Destination Queue Definition-->    <bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQQueue">        <constructor-arg index="0" value=http://www.mamicode.com/"springQueue"/>    </bean>                <!-- Message Receiver Definition  Listener -->    <!-- <bean id="messageReceiver" class="com.wutuobang.jms.MessageReceiver">    </bean>     <bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">        <property name="connectionFactory" ref="connectionFactory" />        <property name="destinationName" value=http://www.mamicode.com/"springQueue" />        <property name="messageListener" ref="messageReceiver" />    </bean> -->       </beans>

  发送消息到Queue:

@RequestMapping(value = http://www.mamicode.com/"/send", method = RequestMethod.GET)    public String send(Locale locale, Model model) {        /**         * send mapMessage         */        Map map = new HashMap();        map.put("Name", "welcome.");                    jmsTemplate.convertAndSend(map);        model.addAttribute("send", "sucess");        return "home";    }    @RequestMapping("/sendDestination")    public String send(Model model){        Map map = new HashMap();        map.put("Name", new Date().getTime());        jmsTemplate.convertAndSend("destination" + System.currentTimeMillis(), map);        model.addAttribute("send","success");        return "home";    }

主动接收消息:

@RequestMapping(value = http://www.mamicode.com/"/receive", method = RequestMethod.GET)    public String test(Locale locale, Model model) {        Message message = jmsTemplate.receive();            if (message instanceof MapMessage) {            final MapMessage mapMessage = (MapMessage) message;            try {                logger.info(mapMessage.getString("Name"));                model.addAttribute("content", mapMessage.getString("Name"));            } catch (JMSException e) {                // do something            }        }                return "receive";    }

自动接收消息:

启动配置文件中监听相关配置

public class MessageReceiver implements MessageListener {

    Logger logger = LoggerFactory.getLogger(MessageReceiver.class);

    public void onMessage(final Message message) {

        if (message instanceof MapMessage) {

            final MapMessage mapMessage = (MapMessage) message;

            try {
                logger.info(mapMessage.getString("Name"));
            
            } catch (JMSException e) {
                // do something
            }
        }
    }
}

 技术分享

 

  

  

JMS