首页 > 代码库 > ActiveMQ(16):Message Dispatch的分发策略、消息批量确认和生产者流量控制
ActiveMQ(16):Message Dispatch的分发策略、消息批量确认和生产者流量控制
一、分发策略(Dispatch Policies)
1.1 严格顺序分发策略(Strict Order Dispatch Policy)
通常ActiveMQ会保证topic consumer以相同的顺序接收来自同一个producer的消息,但有时候也需要保证不同的topic consumer以
相同的顺序接收消息,然而,由于多线程和异步处理,不同的topic consumer可能会以不同的顺序接收来自不同producer的消息。
Strict order dispatch policy会保证每个topic consumer会以相同的顺序接收消息,代价是性能上的损失。以下是一个配置例子:
<policyEntry topic="ORDERS.>"> <dispatchPolicy> <strictOrderDispatchPolicy /> </dispatchPolicy> </policyEntry>
对于Queue的配置为:
<policyEntry queue=">" strictOrderDispatch="false" />
1.2 轮询分发策略(Round Robin Dispatch Policy)
ActiveMQ的prefetch缺省参数是针对处理大量消息时的高性能和高吞吐量而设置的,所以缺省的prefetch参数比较大。而且缺省
的dispatch policies会尝试尽可能快的填满prefetch缓冲。
然而在有些情况下,例如只有少量的消息而且单个消息的处理时间比较长,那么在缺省的prefetch和dispatch policies下,这
些少量的消息总是倾向于被分发到个别的consumer上。这样就会因为负载的不均衡分配而导致处理时间的增加。
Round robin dispatch policy会尝试平均分发消息,以下是一个例子:
<policyEntry topic="ORDERS.>"> <dispatchPolicy> <roundRobinDispatchPolicy/> </dispatchPolicy> </policyEntry>
二、消息确认(Optimized Acknowledgement)
ActiveMQ缺省支持批量确认消息,由于批量确认会提高性能。如果希望在应用程序中禁止经过优化的确认方式,那么可以采用如下方法:
1:在Connection URI 上启用Optimized Acknowledgements
cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.optimizeAcknowledge=true");
2:在ConnectionFactory 上启用Optimized Acknowledgements
((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(true);
3:在Connection上启用Optimized Acknowledgements
((ActiveMQConnection)connection).setOptimizeAcknowledge(true);
4:5.6以后的版本,还可以在Connection URI上设置setOptimizeAcknowledgeTimeOut参数,默认值为300ms,你可以设置自己要用的值,0表示禁用。
三、生产者流量控制(Producer Flow Control)
3.1 简介
流量控制的含义:当生产者产生消息过快,超过流量限制的时候,生产者将会被阻塞直到资源可以继续使用,或者抛出一个JMSException,可以通过
<systemUsage>来配置。
3.2 配置
同步发送消息的producer会自动使用producer flow control ;对于异步发送消息的producer,要使用producer flow control,你先要为
connection配置一个ProducerWindowSize参数,如下:
((ActiveMQConnectionFactory)cf).setProducerWindowSize(1024000);
ProducerWindowSize是producer在发送消息的过程中,收到broker对于之前发送消息的确认之前, 能够发送消息的最大字节数
可以禁用producer flow control,以下是ActiveMQ配置文件的一个例子:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="FOO.>" producerFlowControl="false"/> </policyEntries> </policyMap> </destinationPolicy>
注意,自从ActiveMQ 5.x中引入新的消息游标之后,非持久化消息被分流到了临时文件存储中,以此来减少非持久化消息传送使用的内存总量。
结果就是,你可能会发现一个队列的内存限制永远达不到,因为游标不需要使用太多的内存。如果你真的想把所有的非持久化消息存放在内存
中,并在达到内存限制的时候停掉生产者,你需要配置<vmQueueCursor>,示例如下:
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb"> <pendingQueuePolicy> <vmQueueCursor/> </pendingQueuePolicy> </policyEntry>
上面的配置可以保证所有的非持久化队列消息都保存在内存中,队列的内存限制为1Mb
3.3 配置客户端的异常
为了应对代理空间不足,而导致不确定的阻塞send()方法的一种替代方案,就是将其配置成客户端抛出的一个异常。通过将sendFailIfNoSpace
属性设置为true,代理将会引起send()方法失败,并抛出javax.jms.ResourceAllocationException异常,传播到客户端。下面是一个配置的
示例:
<systemUsage> <systemUsage sendFailIfNoSpace="true"> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> </systemUsage> </systemUsage>
这么配置的好处是,客户端可以捕获javax.jms.ResourceAllocationException异常,稍等一下,并重试send()操作,而不是无限期地傻等下去。
3.4 sendFailIfNoSpaceAfterTimeout属性的加入
从5.3.1版本之后,sendFailIfNoSpaceAfterTimeout属性被加了进来。这个属性同样导致send()方法失败,并在客户端抛出异常,但仅当等待
了指定时间之后才触发。如果在配置的等待时间过去之后,代理上的空间仍然没有被释放,仅当这个时候send()方法才会失败,并且在客户端
抛出异常。示例:
<systemUsage> <systemUsage sendFailIfNoSpaceAfterTimeout="3000"> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> </systemUsage> </systemUsage>
定义超时的单位是毫秒
3.5 System usage
可以通过<systemUsage>元素的一些属性来减慢生产者,如下例子:
<systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="64 mb" /> </memoryUsage> <storeUsage> <storeUsage limit="100 gb" /> </storeUsage> <tempUsage> <tempUsage limit="10 gb" /> </tempUsage> </systemUsage> </systemUsage>
你可以为非持久化的消息设置内存限制,为持久化消息设置磁盘空间,以及为临时消息设置总的空间,broker将在减慢生产者之前使用这些空间。
使用上述的默认设置,broker将会一直阻塞send()方法的调用,直至一些消息被消费,有了可用的空间。
本文出自 “我爱大金子” 博客,请务必保留此出处http://1754966750.blog.51cto.com/7455444/1923299
ActiveMQ(16):Message Dispatch的分发策略、消息批量确认和生产者流量控制