文档章节

ActiveMQ 持久化(数据库),查询队列剩余消息数、出队数的实现

cookqq
 cookqq
发布于 2013/03/04 09:57
字数 754
阅读 3857
收藏 23
点赞 0
评论 0

《ActiveMQ 持久化(文件),查询队列剩余消息数、出队数的实现》分析了消息队列持久化保存,假如activemq服务器突然停止,服务器启动后,还可以继续查找队列中的消息。现在分析队列中的消息使用数据库持久化。

本人博客开始迁移,博客整个架构自己搭建及编码http://www.cookqq.com/

消息生产者:

package com.activemq.mysql;

import java.io.File;
import java.util.Properties;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.sql.DataSource;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.adapter.MySqlJDBCAdapter;
import org.apache.commons.dbcp.BasicDataSourceFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/**
 * 消息持久化到数据库
 *
 */
public class MessageProductor {
	  private static Logger logger=LogManager.getLogger(MessageProductor.class);
	  private String username=ActiveMQConnectionFactory.DEFAULT_USER;
	  private String password=ActiveMQConnectionFactory.DEFAULT_PASSWORD;
	  private  String url=ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
	  
	  public static String queueName="acticemq_queue";
	  private BrokerService brokerService;
	  protected static final int messagesExpected = 3;
	  
	  protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
			    username,password,
	            "tcp://localhost:61616?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected);
	/***
	 * 创建Broker服务对象
	 * @return
	 * @throws Exception
	 */
	public BrokerService createBroker()throws Exception{
			BrokerService  broker=new BrokerService();
			JDBCPersistenceAdapter jdbc=createJDBCPersistenceAdapter();
			broker.setPersistenceAdapter(jdbc);
			jdbc.setDataDirectory(System.getProperty("user")+
					File.separator+"data"+File.separator);
			jdbc.setAdapter(new MySqlJDBCAdapter());
			broker.setPersistent(true);
			broker.addConnector("tcp://localhost:61616");
			//broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
		return broker;
	}
	/**
	 * 创建Broken的持久化适配器
	 * @return
	 * @throws Exception
	 */
	public JDBCPersistenceAdapter createJDBCPersistenceAdapter() throws Exception{
		JDBCPersistenceAdapter jdbc=new JDBCPersistenceAdapter();
		DataSource datasource=createDataSource();
		jdbc.setDataSource(datasource);
		jdbc.setUseDatabaseLock(false);
		//jdbc.deleteAllMessages();
		return jdbc;
	}
	/**
	 * 创建数据源
	 * @return
	 * @throws Exception
	 */
	public DataSource createDataSource() throws Exception{
		Properties props=new Properties();
		props.put("driverClassName", "com.mysql.jdbc.Driver");
		props.put("url", "jdbc:mysql://localhost:3306/activemq");
		props.put("username", "root");
		props.put("password", "16ds");
		DataSource datasource=BasicDataSourceFactory.createDataSource(props);
		return datasource;
	}
	/**
	 * 启动BrokerService进程
	 * @throws Exception
	 */
	public void init() throws Exception{
		createBrokerService();
		start();
	}
	
	public void start() throws Exception{
		if(brokerService!=null){
			brokerService.start();
		}
	}
	public BrokerService createBrokerService() throws Exception{
		if(brokerService==null){
			brokerService=createBroker();
		}
		return brokerService;
	}
	
	public void sendMessage() throws JMSException{
		Connection connection=connectionFactory.createConnection();
		connection.start();
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	    Destination destination = session.createQueue(queueName);        
	    MessageProducer producer = session.createProducer(destination);
	    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
		for(int i=0;i<messagesExpected;i++){
			 logger.debug("Sending message " + (i+1) + " of " + messagesExpected);
	         producer.send(session.createTextMessage("test message " + (i+1)));
		}
		connection.close();
	}
	public String getUrl() {
		return url;
	}
	public void setUrl(String url) {
		this.url = url;
	}
	public String getUsername() {
		return username;
	}
	public void setUsername(String username) {
		this.username = username;
	}
}

消息消费者:

package com.activemq.mysql;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
/***
 * 消息持久化到数据库
 */
public class MessageCustomer {
	private static Logger logger=LogManager.getLogger(MessageProductor.class);
	  protected static final int messagesExpected = 5;
	  
	/***
	 * 创建Broker服务对象
	 * @return
	 * @throws Exception
	 */
	public BrokerService createBroker()throws Exception{
		BrokerService  broker=new BrokerService();
	    broker.addConnector(ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL);
		return broker;
	}

	/**
	 * 启动BrokerService进程
	 * @throws Exception
	 */
	public void init() throws Exception{
		BrokerService brokerService=createBroker();
		brokerService.start();
	}
	/**
	 * 接收的信息
	 * @return
	 * @throws Exception
	 */
	public int receiveMessage() throws Exception{
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
	            "tcp://localhost:61616?jms.prefetchPolicy.all=0&jms.redeliveryPolicy.maximumRedeliveries="+messagesExpected);
		Connection connection=connectionFactory.createConnection();
		connection.start();
		Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
		return receiveMessages(messagesExpected,session);
	}
	

	/**
	 * 接受信息的方法
	 * @param messagesExpected
	 * @param session
	 * @return
	 * @throws Exception
	 */
	protected int receiveMessages(int messagesExpected, Session session) throws Exception {
        int messagesReceived = 0;
        for (int i=0; i<messagesExpected; i++) {
            Destination destination = session.createQueue(MessageProductor.queueName);
            MessageConsumer consumer = session.createConsumer(destination);
            Message message = null;
            try {
            	logger.debug("Receiving message " + (messagesReceived+1) + " of " + messagesExpected);
                message = consumer.receive(2000);
                logger.info("Received : " + message);
                System.out.println("Received : " + message);
                if (message != null) {
                    session.commit();
                    messagesReceived++;
                }
            } catch (Exception e) {
            	logger.debug("Caught exception " + e);
                session.rollback();
            } finally {
                if (consumer != null) {
                    consumer.close();
                }
            }
        }
        return messagesReceived;
    }

}

生产者测试类:

package com.activemq.mysql;

public class MessageProductorTest {
	
	public static void main(String[] args) throws Exception {
		MessageProductor  productor =new MessageProductor();
		productor.init();
		productor.sendMessage();
		//productor.createBrokerService().stop();
	}

}

消费者测试类:

package com.activemq.mysql;


public class MessageCustomerTest {
  public static void main(String[] args) throws Exception {
	  MessageCustomer  customer=new MessageCustomer();
	  //customer.init();  //当两台机器在不同的服务器上启动客户端的broker进程
	  customer.receiveMessage();
	  
}
}

数据库形式:

activemq_acks:ActiveMQ的签收信息。

activemq_lock:ActiveMQ的锁信息。

activemq_msgs:ActiveMQ的消息的信息



参照博客: http://topmanopensource.iteye.com/blog/1066383

© 著作权归作者所有

共有 人打赏支持
cookqq

cookqq

粉丝 117
博文 268
码字总数 156096
作品 0
海淀
技术主管
ActiveMQ发消息、收消息、持久化,查询队列剩余消息数、出队数的实现

初次发博文,勿喷~~ 最近老大让我使用ActiveMQ实现这么个东东:1.查询消息队列中还有多少任务没有执行;2.消息队列的持久化; 真是愁杀我也,以前没见过啊,于是又看文档,又百度又google的,...

JingHaiChao
2012/05/14
0
7
ActiveMQ 持久化(文件),查询队列剩余消息数、出队数的实现

本人博客开始迁移,博客整个架构自己搭建及编码 http://www.cookqq.com/listBlog.action 《ActiveMQ发消息和收消息》详细介绍了ActiveMQ发消息和收消息,消息保存在消息队列(queue)中,消息...

cookqq
2013/03/03
0
1
ActiveMq笔记2-消息持久化

为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制。 ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB, 无论使用哪种持久化方式,消息...

狂小白
02/25
0
0
爬虫架构 | 消息队列应用场景及ActiveMQ、RabbitMQ、RocketMQ、Kafka对比

前言:在之前的业务中,使用了Kafka和RabbitMQ两种消息队列,这篇文章来做一个总结。 消息队列中间件是分布式系统中重要的组件,主要实现异步消息,应用解耦,流量削峰及消息通讯等功能。 下...

小怪聊职场
04/26
0
0
深入浅出 消息队列 ActiveMQ

一、 概述与介绍 ActiveMQ 是Apache出品,最流行的、功能强大的即时通讯和集成模式的开源服务器。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。提供客户端支持跨语言和...

亮liang
2015/05/27
0
0
ActiveMQ学习记录 之 消息持久化

1:前言 这一段给公司开发消息总线有机会研究ActiveMQ,今天撰文给大家介绍一下他的持久化消息。本文只介绍三种方式,分别是持久化为文件,MYSql,Oracle。下面逐一介绍。 A:持久化为文件 这...

李格尔楞
2017/11/01
0
0
Activemq Spring 嵌入整合及通过数据库来验证连接权限

一、Activemq简介 详情请参考 开源中国 二、为什么选择Activemq 1、支持的语言最多 2、Apache出品 3、文档也还行 三、如何实现 Activemq 与 Spring 整合,activemq不单独安装,而是嵌入使用。...

HuaChen
2012/12/29
0
0
JMS配置说明-----activeMQ-5.6

1 简介 activeMQ是一个完全支持JMS1.1 和J2EE规范的JMS Provider实现; 尽管规范出台已经是很久的事情了,但JMS在当今的J2EE应用中仍然扮演着特殊的地位; 特性列表 多种语言和协议编写客户端...

次渠龙哥
06/26
0
0
ActiveMQ集群方案(上)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51124749 目...

yunlielai
04/15
0
0
ActiveMQ集群方案(下)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51205822 目...

yunlielai
04/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

eclipse SVN 项目重新定位

SVN 重新定位 1.方法一 首先:在Eclipse中选择Windows-> Show View->others 就会出现【SVN资源库/SVN Repositories】,选中后,点击确认; 然后:选中原有的地址,选择【重新定位/Relocate】...

qimh
9分钟前
0
0
Linux 第29课 ——Linux集群架构(下)

Linux集群架构(下) 八、DR模式搭建 8.1 准备工作 试验需求三台机器: 分发器,也叫调度器(简写为dir) 192.168.112.136 ying01 rs1 192.168.112.138 ying02 rs2 192.168.112.139 ying03 vip...

feng-01
14分钟前
0
0
轻松搭建svn版本管理工具+svnmanager管理客户端

前面的文章有写过svn版本管理工具的安装是基于svn的安装包进行安装,对于svn与apache的结合还得下svn和apache的模块进行结合过程比较繁琐,今天来介绍下通过centos的yum来安装svn能够快速安装...

javazyw
23分钟前
0
0
keepalived配置高可用集群

Linux集群概述 根据功能划分为两大类:高可用和负载均衡 高可用集群通常为两台服务器,一台工作,另外一台作为冗余,当提供服务的机器宕机,冗余将接替继续提供服务 实现高可用的开源软件有:...

TaoXu
28分钟前
0
0
mysql联表批处理操作

1 概述 mysql中的单表增删改查操作,可以说是基本中的基本. 实际工作中,常常会遇到一些基本用法难以处理的数据操作,譬如遇到主从表甚至多级关联表的情况(如一些历史问题数据的批量处理),考虑到...

社哥
31分钟前
0
0
IntelliJ IDEA 详细图解最常用的配置,适合刚刚用的新人。

刚刚使用IntelliJ IDEA 编辑器的时候,会有很多设置,会方便以后的开发,磨刀不误砍柴工。 比如:设置文件字体大小,代码自动完成提示,版本管理,本地代码历史,自动导入包,修改注释,修改...

kim_o
46分钟前
0
0
Google Java编程风格指南

目录 前言 源文件基础 源文件结构 格式 命名约定 编程实践 Javadoc 后记 前言 这份文档是Google Java编程风格规范的完整定义。当且仅当一个Java源文件符合此文档中的规则, 我们才认为它符合...

niithub
48分钟前
0
0
java.net.MalformedURLException异常说明

1.异常片段 Java代码中,在进行URL url = new URL(urllink)操作时,提示以下异常信息,该类异常主要问题出在参数urllink上面。 异常片段1 java.net.MalformedURLException at java.ne...

lqlm
48分钟前
1
0
CentOS7修改mysql5.6字符集

解决办法:CentOS7下修改MySQL数据库字符编码为UTF-8,UTF-8包含全世界所有国家所需要的字符集,是国际编码。 具体操作如下: 1.进入MySQL [root@tianqi-01 ~]# mysql -uroot -p Enter passw...

河图再现
50分钟前
0
0
DevExpress v18.1新版亮点——WPF篇(一)

用户界面套包DevExpress v18.1日前终于正式发布,本站将以连载的形式为大家介绍各版本新增内容。本文将介绍了DevExpress WPF v18.1 的新功能,快来下载试用新版本!点击下载>> Accordion Co...

Miss_Hello_World
52分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部