首页 > 代码库 > ActiveMQ(16):Message Dispatch的分发策略、消息批量确认和生产者流量控制

ActiveMQ(16):Message Dispatch的分发策略、消息批量确认和生产者流量控制

一、分发策略(Dispatch Policies)

1.1 严格顺序分发策略(Strict Order Dispatch Policy)

通常ActiveMQ会保证topic consumer以相同的顺序接收来自同一个producer的消息,但有时候也需要保证不同的topic consumer以

相同的顺序接收消息,然而,由于多线程和异步处理,不同的topic consumer可能会以不同的顺序接收来自不同producer的消息。

Strict order dispatch policy会保证每个topic consumer会以相同的顺序接收消息,代价是性能上的损失。以下是一个配置例子:

<policyEntry topic="ORDERS.>">
    <dispatchPolicy>
        <strictOrderDispatchPolicy />
    </dispatchPolicy>
</policyEntry>

对于Queue的配置为:

<policyEntry queue=">" strictOrderDispatch="false" />

1.2 轮询分发策略(Round Robin Dispatch Policy)

ActiveMQ的prefetch缺省参数是针对处理大量消息时的高性能和高吞吐量而设置的,所以缺省的prefetch参数比较大。而且缺省

的dispatch policies会尝试尽可能快的填满prefetch缓冲。

然而在有些情况下,例如只有少量的消息而且单个消息的处理时间比较长,那么在缺省的prefetch和dispatch policies下,这

些少量的消息总是倾向于被分发到个别的consumer上。这样就会因为负载的不均衡分配而导致处理时间的增加。


Round robin dispatch policy会尝试平均分发消息,以下是一个例子:

<policyEntry topic="ORDERS.>">
    <dispatchPolicy>
        <roundRobinDispatchPolicy/>
    </dispatchPolicy>
</policyEntry>

二、消息确认(Optimized Acknowledgement)

ActiveMQ缺省支持批量确认消息,由于批量确认会提高性能。如果希望在应用程序中禁止经过优化的确认方式,那么可以采用如下方法:

 1:在Connection URI 上启用Optimized Acknowledgements

    cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.optimizeAcknowledge=true");

 2:在ConnectionFactory 上启用Optimized Acknowledgements

    ((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(true);

 3:在Connection上启用Optimized Acknowledgements

    ((ActiveMQConnection)connection).setOptimizeAcknowledge(true);

 4:5.6以后的版本,还可以在Connection URI上设置setOptimizeAcknowledgeTimeOut参数,默认值为300ms,你可以设置自己要用的值,0表示禁用。

三、生产者流量控制(Producer Flow Control)

3.1 简介

流量控制的含义:当生产者产生消息过快,超过流量限制的时候,生产者将会被阻塞直到资源可以继续使用,或者抛出一个JMSException,可以通过

<systemUsage>来配置。

3.2 配置

同步发送消息的producer会自动使用producer flow control ;对于异步发送消息的producer,要使用producer flow control,你先要为

connection配置一个ProducerWindowSize参数,如下:

((ActiveMQConnectionFactory)cf).setProducerWindowSize(1024000);

ProducerWindowSize是producer在发送消息的过程中,收到broker对于之前发送消息的确认之前, 能够发送消息的最大字节数


可以禁用producer flow control,以下是ActiveMQ配置文件的一个例子:

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry topic="FOO.>" producerFlowControl="false"/>
        </policyEntries>
    </policyMap>
</destinationPolicy>

注意,自从ActiveMQ 5.x中引入新的消息游标之后,非持久化消息被分流到了临时文件存储中,以此来减少非持久化消息传送使用的内存总量。

结果就是,你可能会发现一个队列的内存限制永远达不到,因为游标不需要使用太多的内存。如果你真的想把所有的非持久化消息存放在内存

中,并在达到内存限制的时候停掉生产者,你需要配置<vmQueueCursor>,示例如下:

<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
    <pendingQueuePolicy>
        <vmQueueCursor/>
    </pendingQueuePolicy>
</policyEntry>

上面的配置可以保证所有的非持久化队列消息都保存在内存中,队列的内存限制为1Mb


3.3 配置客户端的异常

为了应对代理空间不足,而导致不确定的阻塞send()方法的一种替代方案,就是将其配置成客户端抛出的一个异常。通过将sendFailIfNoSpace

属性设置为true,代理将会引起send()方法失败,并抛出javax.jms.ResourceAllocationException异常,传播到客户端。下面是一个配置的

示例:

<systemUsage>
    <systemUsage sendFailIfNoSpace="true">
        <memoryUsage>
            <memoryUsage limit="20 mb"/>
        </memoryUsage>
    </systemUsage>
</systemUsage>

这么配置的好处是,客户端可以捕获javax.jms.ResourceAllocationException异常,稍等一下,并重试send()操作,而不是无限期地傻等下去。

3.4 sendFailIfNoSpaceAfterTimeout属性的加入

从5.3.1版本之后,sendFailIfNoSpaceAfterTimeout属性被加了进来。这个属性同样导致send()方法失败,并在客户端抛出异常,但仅当等待

了指定时间之后才触发。如果在配置的等待时间过去之后,代理上的空间仍然没有被释放,仅当这个时候send()方法才会失败,并且在客户端

抛出异常。示例:

<systemUsage>
    <systemUsage sendFailIfNoSpaceAfterTimeout="3000">
        <memoryUsage>
            <memoryUsage limit="20 mb"/>
        </memoryUsage>
    </systemUsage>
</systemUsage>

定义超时的单位是毫秒

3.5 System usage

可以通过<systemUsage>元素的一些属性来减慢生产者,如下例子:

<systemUsage>
    <systemUsage>
        <memoryUsage>
            <memoryUsage limit="64 mb" />
        </memoryUsage>
        <storeUsage>
            <storeUsage limit="100 gb" />
        </storeUsage>
        <tempUsage>
            <tempUsage limit="10 gb" />
        </tempUsage>
    </systemUsage>
</systemUsage>

你可以为非持久化的消息设置内存限制,为持久化消息设置磁盘空间,以及为临时消息设置总的空间,broker将在减慢生产者之前使用这些空间。

使用上述的默认设置,broker将会一直阻塞send()方法的调用,直至一些消息被消费,有了可用的空间。


本文出自 “我爱大金子” 博客,请务必保留此出处http://1754966750.blog.51cto.com/7455444/1923299

ActiveMQ(16):Message Dispatch的分发策略、消息批量确认和生产者流量控制