首页 > 代码库 > ActiveMQ(09):ActiveMQ消息存储持久化
ActiveMQ(09):ActiveMQ消息存储持久化
一、简介
1.1 描述
ActiveMQ不仅支持persistent和non-persistent两种方式,还支持消息的恢复(recovery)方式、重新投递等
1.2 PTP与PUB/SUB
1.2.1 PTP
对于持久化订阅主题,每一个消费者将获得一个消息的复制。
1.2.2 PUB/SUB
对于持久化订阅主题,每一个消费者将获得一个消息的复制。
1.3 有效的消息存储
ActiveMQ提供了一个插件式的消息存储,类似于消息的多点传播,主要实现了如下几种:
1:AMQ消息存储-基于文件的存储方式,是以前的默认消息存储
2:KahaDB消息存储-提供了容量的提升和恢复能力,是现在的默认存储方式
3:JDBC消息存储-消息基于JDBC存储的
4:Memory 消息存储-基于内存的消息存储
二、持久化
2.1 KahaDB消息存储
1、KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。
在:activemq/conf/activemq.xml中可以查看
2、KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。在Kaha中,数据被追加到data logs中。当不再需要log文
件中的数据的时候,log文件会被丢弃。
3、KahaDB基本配置例子:
<persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter>
可用的属性有:
1:director:KahaDB存放的路径,默认值activemq-data
2:indexWriteBatchSize: 批量写入磁盘的索引page数量,默认值1000
3:indexCacheSize:内存中缓存索引page的数量,默认值10000
4:enableIndexWriteAsync:是否异步写出索引,默认false
5:journalMaxFileLength:设置每个消息data log的大小,默认是32MB
6:enableJournalDiskSyncs:设置是否保证每个没有事务的内容,被同步写入磁盘,JMS持久化的时候需要,默认为true
7:cleanupInterval:在检查到不再使用的消息后,在具体删除消息前的时间,默认30000
8:checkpointInterval:checkpoint的间隔时间,默认5000
9:ignoreMissingJournalfiles:是否忽略丢失的消息日志文件,默认false
10:checkForCorruptJournalFiles:在启动的时候,将会验证消息文件是否损坏,默认false
11:checksumJournalFiles:是否为每个消息日志文件提供checksum,默认false
12:archiveDataLogs: 是否移动文件到特定的路径,而不是删除它们,默认false
13:directoryArchive:定义消息已经被消费过后,移动data log到的路径,默认null
14:databaseLockedWaitDelay:获得数据库锁的等待时间 (used by shared master/slave),默认10000
15:maxAsyncJobs:设置最大的可以存储的异步消息队列,默认值10000,可以和concurrentMessageProducers 设置成一样的值
16:concurrentStoreAndDispatchTransactions:是否分发消息到客户端,同时事务存储消息,默认true
17:concurrentStoreAndDispatchTopics:是否分发Topic消息到客户端,同时进行存储,默认true
18:concurrentStoreAndDispatchQueues:是否分发queue消息到客户端,同时进行存储,默认true
2.2 AMQ消息存储
1、AMQ Message Store是ActiveMQ5.0缺省的持久化存储,它是一个基于文件、事务存储设计为快速消息存储的一个结构,该结构是以流的形式来
进行消息交互的。
2、这种方式中,Messages被保存到data logs中,同时被reference store进行索引以提高存取速度。Date logs由一些单独的data log文件组成,缺省的
文件大小是32M,如果某个消息的大小超过了data log文件的大小,那么可以修改配置以增加data log文件的大小。如果某个data log文件中所有的消
息都被成功消费了,那么这个data log文件将会被标记,以便在下一轮的清理中被删除或者归档。
3、配置示例
<broker brokerName="broker" persistent="true" useShutdownHook="false"> <persistenceAdapter> <amqPersistenceAdapter directory="${activemq.base}/data" maxFileLength="32mb"/> </persistenceAdapter> </broker>
2.3 JDBC消息存储
ActiveMQ支持使用JDBC来持久化消息
2.3.1 预定义的表
1:消息表,缺省表名为ACTIVEMQ_MSGS,queue和topic都存在里面,结构如下:
2:ACTIVEMQ_ACKS表存储持久订阅的信息和最后一个持久订阅接收的消息ID,结构如下:
3:锁定表,缺省表名为ACTIVEMQ_LOCK,用来确保在某一时刻,只能有一个ActiveMQ broker实例来访问数据库 ,结构如下:
2.3.2 配置activemq/conf/activemq.xml
<persistenceAdapter> <!-- <kahaDB directory="${activemq.data}/kahadb"/> --> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter> ... ... </broker> <bean name="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close"> <property name="driverClassName" value="http://www.mamicode.com/com.mysql.jdbc.Driver" /> <property name="url" value="http://www.mamicode.com/jdbc:mysql://192.168.91.4:3306/db_activemq?useUnicode=true&characterEncoding=UTF-8" /> <property name="username" value="http://www.mamicode.com/root" /> <property name="password" value="http://www.mamicode.com/123456"/> </bean>
注意:此处需要上传数据库驱动包到/opt/activemq/lib下,我这里连接池用的是阿里的,所以还要上传druid包
重启activemq
测试发送3条队列消息:
补充:JDBC Message Store with ActiveMQ Journal
这种方式克服了JDBC Store的不足,使用快速的缓存写入技术,大大提高了性能。 配置将persistenceAdapter注释掉,再上persistenceFactory,示例如下:
<!-- <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter> --> <persistenceFactory> <journalPersistenceAdapterFactory journalLogFiles="4" journalLogFileSize="32768" useJournal="true" useQuickJournal="true" dataSource="#mysql-ds" dataDirectory="activemq-data" /> </persistenceFactory>
JDBC Store和JDBC Message Store with ActiveMQ Journal的区别:
1:Jdbc with journal的性能优于jdbc
2:Jdbc用于master/slave模式的数据库分享
3:Jdbc with journal不能用于master/slave模式
4:一般情况下,推荐使用jdbc with journal
2.4 Memory消息存储
内存消息存储主要是存储所有的持久化的消息在内存中。这里没有动态的缓存存在,所以你必须注意设置你的broker所在的JVM和内存限制。
配置示例:
<broker ... persistent="false" ...></broker>
本文出自 “我爱大金子” 博客,请务必保留此出处http://1754966750.blog.51cto.com/7455444/1916408
ActiveMQ(09):ActiveMQ消息存储持久化