ActiveMQ Messages Store
ActiveMQ消息的持久化,在很多需要同步的消息的下很关键,当一个消费者挂了后,确保以前消息能够接受,这个时候就必须用到消息的持久化存储。ActiveMQ主要提供了以下几种策略进行配置:
1.The KahaDB message store
先看一下java 代码的编写
BrokerService broker = new BrokerService();
File dataFileDir = new File("../amq-in-action/kahadb1");
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(dataFileDir);
kaha.setJournalMaxFileLength(1024*1204*10);
kaha.setConcurrentStoreAndDispatchTopics(true);
kaha.setIndexWriteBatchSize(100);
kaha.setEnableIndexWriteAsync(false);
broker.setPersistenceAdapter(kaha);//使用KahaDB,做持久化存储
broker.addConnector("tcp://localhost:61618");
很明显我们可以看到,KahaDB 是一个基于文件的持久化策略。看一些内部实现的原理:
从上图我们可以看到,我们指定的目录下,生成了几个文件:db.data db.redo db-1.log。
在db-xx.log文件中记录的是:一个消息和命令滚动日志(如事务边界和信息删除等),当超过文件存储最大长度就会再建一个心得db.log文件。
db.data文件是:消息转变成持久性B树索引的数据。
db.redo 文件是:用于重新修改db.data中B树索引结构的,用于回收db-xx.log的这些文件。
值得一体的是,cache是用来缓存数据的,没有第一时间写入磁盘,有多个生成者的时候,在对共同消息修改时,会写到磁盘。在消费者立刻响应收到的情况下,不写入磁盘立即丢弃。
一些比较关键的参数配置:
Property name | Default value | Description |
directory | activemq-data | 设置的目录位置及名字 |
indexWriteBatchSize | 1000 | 批处理磁盘上写的索引页的数量 |
indexCacheSize | 10000 | 在内存中缓存页面的数量 |
enableIndexWriteAsync | false | 能否异步写入索引 |
journalMaxFileLength | 32mb | 每一个db-xx.log日志文件的大小 |
enableJournalDiskSyncs | true | 磁盘同步写数据 |
cleanupInterval | 30000 | 多少时间去清理一些存储但不再使用的文件数据 |
checkpointInterval | 5000 | 检测内部关键点(不知道用处) |
ignoreMissingJournalfiles | false | 如果启用,将忽略丢失的日志文件 |
2.The AMQ message store
暂时略过,和第一个很想,有时间再补上。
3.The JDBC message store
//配置数据源
BasicDataSource datasource=new BasicDataSource();
datasource.setDriverClassName("com.mysql.jdbc.Driver");
datasource.setUrl("jdbc:mysql://localhost/activemq?relaxAutoCommit=true");
datasource.setUsername("root");
datasource.setPassword("xxxx");
datasource.setMaxActive(20);
datasource.setPoolPreparedStatements(true);
JDBCPersistenceAdapter jpa=new JDBCPersistenceAdapter();
jpa.setDataSource(datasource);
/* DefaultJDBCAdapter dja=new DefaultJDBCAdapter();*/
/*JDBCTopicMessageStore jts=new JDBCTopicMessageStore(jpa,dja, null, null, null);*/
BrokerService broker = new BrokerService();
broker.setPersistenceAdapter(jpa);
broker.addConnector("tcp://localhost:61618");
broker.start();
4.The memory message store