首页 > 代码库 > ActiveMQ(15):Message Dispatch的消息游标与异步发送

ActiveMQ(15):Message Dispatch的消息游标与异步发送

一、消息游标

1.1 简介

ActiveMQ发送持久消息的典型处理方式是:当消息的消费者准备就绪时,消息发送系统把存储的消息按批次发送给消费者,

在发送完一个批次的消息后,指针的标记位置指向下一批次待发送消息的位置,进行后续的发送操作。这是一种比较健壮

和灵活的消息发送方式,但大多数情况下,消息的消费者不是一直处于这种理想的活跃状态。


因此,从ActiveMQ5.0.0版本开始,消息发送系统采用一种混合型的发送模式,当消息消费者处理活跃状态时,允许消息发

送系统直接把持久消息发送给消费者,当消费者处于不活跃状态下,切换使用Cursors(游标)来处理消息的发送。


当消息消费者处于活跃状态并且处理能力比较强时,被持久存储的消息直接被发送到与消费者关联的发送队列,见下图:

技术分享


当消息已经出现积压,消费者再开始活跃;或者消费者的消费速度比消息的发送速度慢时,消息将从Pending Cursor中提取,

并发送与消费者关联的发送队列。见下图:

技术分享

1.2 分类

Message Cursors分成三种类型:

 1: Store-based

 2:VM

 3:File-based

1.2.1 Store-based

从activemq5.0开始,默认使用此种类型的cursor,其能够满足大多数场景的使用要求。同时支持非持久消息的处理,Store-based内

嵌了File-based的模式,非持久消息直接被Non-Persistent Pending Cursor所处理。工作模式见下图:

技术分享

1.2.2 VM

相关的消息引用存储在内存中,当满足条件时,消息直接被发送到消费者与之相关的发送队列,处理速度非常快,但出现慢消费者或者

消费者长时间处于不活跃状态的情况下,无法适应。工作模式见下图 :

技术分享

1.2.3 File-based

当内存设置达到设置的限制,消息被存储到磁盘中的临时文件中。工作模式见下图 :

技术分享

1.3 配置

在缺省情况下,ActiveMQ会根据使用的Message Store来决定使用何种类型的Message Cursors,但是你可以根据destination来配

置Message Cursors,例如:

1.3.1 对Topic subscribers

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry topic="org.apache.>" producerFlowControl="false" memoryLimit="1mb">
                <dispatchPolicy>
                    <strictOrderDispatchPolicy />
                </dispatchPolicy>
                <deadLetterStrategy>
                    <individualDeadLetterStrategy topicPrefix="Test.DLQ." />
                </deadLetterStrategy>
                <pendingSubscriberPolicy>
                    <vmCursor />
                </pendingSubscriberPolicy>
                <pendingDurableSubscriberPolicy>
                    <vmDurableCursor/>
                </pendingDurableSubscriberPolicy>
            </policyEntry>
        </policyEntries>
    </policyMap>
</destinationPolicy>

配置说明:

  有效的Subscriber类型是vmCursor和fileCursor,缺省是store based cursor。有效的持久化Subscriber的cursor types是

  storeDurableSubscriberCursor, vmDurableCursor 和fileDurableSubscriberCursor,缺省是store based cursor。

1.3.2 对于Queues的配置

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry queue="org.apache.>">
                <deadLetterStrategy>
                    <individualDeadLetterStrategy queuePrefix="Test.DLQ."/>
                </deadLetterStrategy>
                <pendingQueuePolicy>
                    <vmQueueCursor />
                </pendingQueuePolicy>
            </policyEntry>
        </policyEntries>
    </policyMap>
</destinationPolicy>

配置说明:有效的类型是storeCursor, vmQueueCursor 和 fileQueueCursor

二、异步发送

2.1 简介

AciveMQ支持异步和同步发送消息,是可以配置的。通常对于快的消费者,是直接把消息同步发送过去,但对于一个Slow Consumer,

你使用同步发送消息可能出现Producer堵塞等现象,慢消费者适合使用异步发送。

2.1 配置使用

1:ActiveMQ默认设置dispatcheAsync=true是最好的性能设置。如果你处理的是Fast Consumer则使用dispatcheAsync=false

2:在Connection URI级别来配置使用Async Send

cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

3:在ConnectionFactory级别来配置使用Async Send

((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

4:在Connection级别来配置使用Async Send

((ActiveMQConnection)connection).setUseAsyncSend(true);

本文出自 “我爱大金子” 博客,请务必保留此出处http://1754966750.blog.51cto.com/7455444/1923058

ActiveMQ(15):Message Dispatch的消息游标与异步发送