ActiveMQ Messages Store(未完待续)

原创
2016/05/04 21:46
阅读数 400

                                                                       ActiveMQ   Messages Store

    ActiveMQ消息的持久化,在很多需要同步的消息的下很关键,当一个消费者挂了后,确保以前消息能够接受,这个时候就必须用到消息的持久化存储。ActiveMQ主要提供了以下几种策略进行配置:

    1.The KahaDB message store

    先看一下java 代码的编写

        BrokerService broker = new BrokerService();
        File dataFileDir = new File("../amq-in-action/kahadb1");
        KahaDBStore kaha = new KahaDBStore();
        kaha.setDirectory(dataFileDir);
        kaha.setJournalMaxFileLength(1024*1204*10);
        kaha.setConcurrentStoreAndDispatchTopics(true);
        kaha.setIndexWriteBatchSize(100);
        kaha.setEnableIndexWriteAsync(false);
        broker.setPersistenceAdapter(kaha);//使用KahaDB,做持久化存储
        broker.addConnector("tcp://localhost:61618");

     很明显我们可以看到,KahaDB 是一个基于文件的持久化策略。看一些内部实现的原理:

                                                   

    从上图我们可以看到,我们指定的目录下,生成了几个文件:db.data db.redo db-1.log。

    在db-xx.log文件中记录的是:一个消息和命令滚动日志(如事务边界和信息删除等),当超过文件存储最大长度就会再建一个心得db.log文件。

    db.data文件是:消息转变成持久性B树索引的数据。

    db.redo 文件是:用于重新修改db.data中B树索引结构的,用于回收db-xx.log的这些文件。

    值得一体的是,cache是用来缓存数据的,没有第一时间写入磁盘,有多个生成者的时候,在对共同消息修改时,会写到磁盘。在消费者立刻响应收到的情况下,不写入磁盘立即丢弃。

    一些比较关键的参数配置:

Property name Default value Description
directory activemq-data 设置的目录位置及名字
indexWriteBatchSize 1000 批处理磁盘上写的索引页的数量
indexCacheSize 10000 在内存中缓存页面的数量
enableIndexWriteAsync false 能否异步写入索引
journalMaxFileLength 32mb 每一个db-xx.log日志文件的大小
enableJournalDiskSyncs true 磁盘同步写数据
cleanupInterval 30000 多少时间去清理一些存储但不再使用的文件数据
checkpointInterval 5000 检测内部关键点(不知道用处)
ignoreMissingJournalfiles false  如果启用,将忽略丢失的日志文件
checkForCorruptJournalFiles false 如果启用,在启动时会验证消息数据日志没有被损坏。
checksumJournalFiles false

如果启用,将提供一个校验和对每个消息数据日志

archiveDataLogs false 如果启用,将将消息数据记录到归档目录,而不是删除它
directoryArchive null 定义目录以将数据记录移动到包含已被消耗的所有消息
databaseLockedWaitDelay 10000 限制时间内获得数据库锁
maxAsyncJobs 10000 将最大排队的数量的消息进行异步存储
concurrentStoreAndDispatchTransactions true 客户端进行事务存储
concurrentStoreAndDispatchTopics true 使客户进行话题信息存储
concurrentStoreAndDispatchQueues true 使客户进行队列信息存储

   2.The AMQ message store

    暂时略过,和第一个很想,有时间再补上。

    3.The JDBC message store

//配置数据源
		BasicDataSource datasource=new BasicDataSource();
		datasource.setDriverClassName("com.mysql.jdbc.Driver");
		datasource.setUrl("jdbc:mysql://localhost/activemq?relaxAutoCommit=true");
		datasource.setUsername("root");
		datasource.setPassword("xxxx");
		datasource.setMaxActive(20);
		datasource.setPoolPreparedStatements(true);
		JDBCPersistenceAdapter  jpa=new JDBCPersistenceAdapter(); 
		jpa.setDataSource(datasource);
	/*	DefaultJDBCAdapter dja=new DefaultJDBCAdapter();*/
		/*JDBCTopicMessageStore jts=new JDBCTopicMessageStore(jpa,dja, null, null, null);*/
		BrokerService broker = new BrokerService();
		broker.setPersistenceAdapter(jpa);
		broker.addConnector("tcp://localhost:61618");
		broker.start();

    4.The memory message store

展开阅读全文
打赏
2
2 收藏
分享
加载中
更多评论
打赏
0 评论
2 收藏
2
分享
返回顶部
顶部