首页 > 代码库 > HornetQ Topic/Queue for Clojure
HornetQ Topic/Queue for Clojure
HornetQ Topic/Queue for Clojure
1. HornetQ 概述
HornetQ是一个支持集群和多种协议,可嵌入、高性能的异步消息系统。HornetQ完全支持JMS,HornetQ不但支持JMS1.1 API同时也定义属于自己的消息API,这可以最大限度的提升HornetQ的性能和灵活性。在不久的将来更多的协议将被HornetQ支持。
2. HornetQ 特点
(1)HornetQ拥有超高的性能,HornetQ在持久化消息方面的性能可以轻易的超于其它常见的非持久化消息引擎的性能。当然,HornetQ的非持久化消息的性能会表现的更好!
(2)HornetQ完全使用POJO,纯POJO的设计让HornetQ可以尽可能少的依赖第三方的包。从设计模式来说,HornetQ这样的设计入侵性也最小。HornetQ既可以独立运行,也可以与其它Java应用程序服务器集成使用。
(3)HornetQ拥有完善的错误处理机制,HornetQ提供服务器复制和故障自动转移功能,该功能可以消除消息丢失或多个重复信息导致服务器出错。
(4)HornetQ提供了灵活的集群功能,通过创建HornetQ集群,您可以享受到到消息的负载均衡带来的性能提升。您也可以通过集群,组成一个全球性的消息网络。您也可以灵活的配置消息路由。
(5)HornetQ拥有强大的管理功能。HornetQ提供了大量的管理API和监控服务器。它可以无缝的与应用程序服务器整合,并共同工作在一个HA环境中。
注:以上摘自百度百科http://baike.baidu.com/view/4895723.htm?fr=aladdin
3. Topic 和 Queue的主要区别
首先我们来了解几个概念:
(1)destination(目的地):即消息发送的目的地,也是消息传递的通道。可以是Queue或者Topic。
(2)producer(生产者):即消息的产生者。消息的生产者只负责向destination(目的地)发送消息,扔下后就不管了,去干其他的事。而消息最终会被怎么处理,则取决于消息的消费者。
(3)consumer(消费者):即消息的消费者。消费者可以订阅(或者监听)某个destination,当其中有消息的到达时,可以对消息进行相应的处理。也可以根据消息的特征判断是否要接受此消息。
其实消息服务的大体流程很简单,消息生产者向消息队列(或主题)中发送消息,消费者从队列(或主题)中获取消息。生产者和消费者互不干扰。
下面就说说Topic和Queue的不同:
Queue:即消息队列。一个Queue可以有多个consumer对其进行listen(监听)。但是一条消息仅能被一个consumer收到,即如果一条消息被某个consumer相应掉了,那么其他consumer则收不到这条消息。另外,如果一条消息没有可用的consumer,那么这条消息会一直保存在Queue中,直到有consumer对它进行响应,即Queue的消息不会被丢失。Queue的消息传递模式是Point-to-Point(点对点)的传递模式。
Topic:即主题。消息发布者向主题publish(发布)消息,消费者则对主题进行subscribe(订阅)。一个Topic可以有多个consumer对其订阅。发布者发布的消息将会进行广播,订阅了该主题的消费者都能够收到这条消息(持有该消息的拷贝),并都能对消息做出响应。Topic中不能保证每条消息都能够被消费者者接收到。如果一条消息没有消费者在监听,则Topic会丢失。Topic的消息传递模式是Publish/Subcribe(发布/订阅)的模式。
4. 简单应用(使用Immutant 的API)
**用Queue实现异步消息队列
(1)创建消息队列:
(require ‘[Immutant.messaging :as m])
(def q (m/queue “test-queue”)
(2)发布消息
(m/publish q “a message”)
(3)自定义消息处理函数, 这里只是将消息简单打印出来
(defn msg-handle
[msg]
(prn msg))
(3)监听消息队列,注册处理函数
(def listener (m/listen q #(msg-handle %)))
(4)关闭监听
(stop listner)
(.close listener)
**用Queue实现同步消息队列
(1)定义消息队列
(def sync-q (m/queue “sync-q”)
(2)自定义消息应答方式
(def respinder (m/respond sync-q inc))
(3)发送消息
@(m/request sync-q 1)
注:request会一直等到消息得到响应才会返回,否则一直堵着不动。
**用Topic实现异步消息
(1)定义Topic
(def t (m/topic “test-topic”))
(2)发布主题消息
(m/publish t “a message”)
(3)订阅主题,注册处理函数
(m/subscribe t “subscriber-name”#(msg-handle %)
(4)取消订阅
(m/unsubscribe t subscriber-name)
HornetQ Topic/Queue for Clojure