文档章节

消息中间件-Activemq之Master-Slaver

ksfzhaohui
 ksfzhaohui
发布于 2016/01/02 18:28
字数 1296
阅读 629
收藏 17

#程序员薪资揭榜#你做程序员几年了?月薪多少?发量还在么?>>>

什么叫中间件?
中间件为软件应用提供了操作系统所提供的服务之外的服务,可以把中间件描述为"软件胶水"。中间件不是操作系统的一部分,不是数据库操作系统,也不是应用软件的一部分,而是能够让软件开发者方便的处理通信、输入和输出,能够专注自己应用的部分。

消息中间件解决了应用之间的消息传递、解耦、异步的问题。
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

一般中间件都提供了横向扩展和纵向扩展,横向扩展就是我们经常说的负载均衡,纵向扩展提供了Master-Slaver;

负载均衡:提供负载均衡的中间件都对外提供服务
Master-Slaver:同时只有一个中间件对外提供服务,当Master出现挂机等问题,Slaver会自动接管

看一个整合的简图:

当然activemq也提供了以上2中方式,分别是:Master-Slave和Broker Cluster

准备:
jdk1.6,apache-activemq-5.10.0,mysql5.1,zookeeper-3.4.3

先来看看Master-Slave模式

1).Shared File System Master Slave
基于ActiveMQ的默认数据库kahaDB完成的,kahaDB的底层是文件系统。这种方式的集群,Slave的个数没有限制,哪个ActiveMQ实例先获取共享文件的锁,那个实例就是Master,其它的ActiveMQ实例就是Slave,当当前的Master失效,其它的Slave就会去竞争共享文件锁,谁竞争到了谁就是Master

本次测试在同一台机器上:
首先更改配置conf/activemq,做如下修改:

<persistenceAdapter>
        <!--<kahaDB directory="${activemq.data}/kahadb"/>-->
	<kahaDB directory="D:/kahaDB"/>
 </persistenceAdapter>
将activemq拷贝3份,分别:apache-activemq-5.10.0-M1, apache-activemq-5.10.0-M2,apache-activemq-5.10.0-M3,分别启动activemq命令,启动的日志分别是:

表示当前进程是Master


表示当前进程没有获取到锁,作为Slaver

测试:

下面的例子中分别提供了Producer(Sender类)Consumer(Receiver类)
我们首先用Producer发送消息给activemq,然后停止Master,然后再用Consumer接受消息,测试结果是可以接受到数据的。

2).JDBC Master Slave
JDBC Master Slave模式和Shared File Sysytem Master Slave模式的原理是一样的,只是把共享文件系统换成了共享数据库。

修改配置文件conf/activemq

<persistenceAdapter>
        <!--<kahaDB directory="${activemq.data}/kahadb"/>-->
	<!--<kahaDB directory="D:/kahaDB"/>-->
        <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/> 
  </persistenceAdapter>
添加数据源:
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
	<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
	<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
	<property name="username" value="root"/>
	<property name="password" value="root"/>
	<property name="poolPreparedStatements" value="true"/>
    </bean>
注:这里使用的是mysql,所以需要mysql驱动程序: mysql-connector-java-5.1.18,讲jar包放入lib文件夹下面,驱动版本不对,会出现如下错误: Database lock driver override not found for : [mysql_connect ...


分别拷贝到其他几个文件夹下,分别启动,启动成功后我们可以看到数据库中多了几张表:


测试方式同上;
官网手册:http://activemq.apache.org/jdbc-master-slave.html

3).Replicated LevelDB Store
这种方式是ActiveMQ5.9以后才新增的特性,使用ZooKeeper协调选择一个node作为master

修改配置文件conf/activemq:

<!--<persistenceAdapter>
         <kahaDB directory="${activemq.data}/kahadb"/>
         <kahaDB directory="D:/kahaDB"/>
         <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/> 
 </persistenceAdapter>-->
 <persistenceAdapter>
	<replicatedLevelDB
		directory="${activemq.data}/leveldb"
		replicas="3"
		bind="tcp://0.0.0.0:0"
		zkAddress="127.0.0.1:2181"
		hostname="127.0.0.1"
		sync="local_disk"
		zkPath="/activemq/leveldb-stores"/>
 </persistenceAdapter>
首先启动zookeeper,这里没有做集群处理,默认端口是2181,然后分别启动activemq,
启动之后报错:"activemq LevelDB IOException handler"。 

原因:版本5.10.0存在的依赖冲突。

解决方案:
(1)移除lib目录中的pax-url-aether-1.5.2.jar包
(2)注释掉配置文件中的日志配置; 

<!-- Allows accessing the server log
    <bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
          lazy-init="false" scope="singleton"
          init-method="start" destroy-method="stop">
    </bean>
-->

测试方式同上;

提供java版的例子:

public class Sender {
	private static final int SEND_NUMBER = 5;

	public static void main(String[] args) {
		ConnectionFactory connectionFactory;
		Connection connection = null;
		Session session;
		Destination destination;
		MessageProducer producer;
		connectionFactory = new ActiveMQConnectionFactory(
			ActiveMQConnection.DEFAULT_USER,
			ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
		try {
			connection = connectionFactory.createConnection();
			connection.start();
			session = connection.createSession(Boolean.TRUE,
					Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("FirstQueue");
			producer = session.createProducer(destination);
			producer.setDeliveryMode(DeliveryMode.PERSISTENT);
			sendMessage(session, producer);
			session.commit();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				if (null != connection)
					connection.close();
			} catch (Throwable ignore) {
			}
		}
	}

	public static void sendMessage(Session session, MessageProducer producer)
			throws Exception {
		for (int i = 1; i <= SEND_NUMBER; i++) {
			TextMessage message = session.createTextMessage("发送的消息"
					+ i);
			System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
			producer.send(message);
		}
	}
}
public class Receiver {
	public static void main(String[] args) {
		ConnectionFactory connectionFactory;
		Connection connection = null;
		Session session;
		Destination destination;
		MessageConsumer consumer;
		connectionFactory = new ActiveMQConnectionFactory(
			ActiveMQConnection.DEFAULT_USER,
			ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
		try {
			connection = connectionFactory.createConnection();
			connection.start();
			session = connection.createSession(Boolean.FALSE,
					Session.AUTO_ACKNOWLEDGE);
			destination = session.createQueue("FirstQueue");
			consumer = session.createConsumer(destination);
			while (true) {
				TextMessage message = (TextMessage) consumer.receive(100000);
				if (null != message) {
					System.out.println("收到消息" + message.getText());
				} else {
					break;
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				if (null != connection)
					connection.close();
			} catch (Throwable ignore) {
			}
		}
	}
}

© 著作权归作者所有

ksfzhaohui

ksfzhaohui

粉丝 426
博文 167
码字总数 261352
作品 3
南京
高级程序员
私信 提问
加载中

评论(0)

ActiveMQ基于共享文件系统的HA方案

配置NFS服务器 yum install nfs-utils rpcbind 设置共享目录,编辑/etc/exports /home/mqsharedata 192.168.41.199(rw,sync,norootsquash) /home/mqsharedata 192.168.41.199(rw,sync,noroot......

姬风
2014/04/20
5.4K
24
ActiveMQ(八)——ActiveMQ的集群

一、队列消费者集群(Queue consumer clusters) ActiveMQ支持Consumer对消息高可靠性的负载平衡消费,如果一个Consumer死掉,该消息会转发到其他的Consumer消费的Queue上。如果一个Consume...

mazongfei
2019/06/30
0
0
Java中间件-ActiveMQ

为什么需要使用消息中间件? 系统解耦 异步 横向扩展 安全可靠 顺序保证 什么是中间件?   非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统...

osc_cnuamvdq
2018/06/20
8
0
ActiveMQ集群方案(下)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51205822 目...

yunlielai
2018/04/15
0
0
Java消息中间件入门笔记 - ActiveMQ篇

入门 消息中间件带来的好处: 栗子: 通过服务调用让其它系统感知事件发生 通过消息中间件解耦服务调用 1.Linux安装消息中间件ActiveMQ 1.下载安装包 解压 2.启动与关闭 3.安装验证 4.Maven依...

巅峰小学生
2018/04/30
0
0

没有更多内容

加载失败,请刷新页面

加载更多

二、netcore跨平台之 Linux部署nginx代理webapi

原文: 二、netcore跨平台之 Linux部署nginx代理webapi 上一章,我们讲了在linux上安装netcore环境,以及让netcore在linux上运行。 这一章我们开始讲在linux上配置nginx,以及让nginx反向代理...

osc_jo2m8l1r
16分钟前
10
0
CAD怎么转PDF文件?使用这款编辑器一键转换、批量转换

CAD怎么转换成PDF文件呢?小伙伴们不妨使用这款CAD编辑器,把CAD文件一键、批量转换成PDF文件哦。 有许多小伙伴应该都知道,为了满足各种学习、工作的需求,文件之间是经常需要相互转换格式的...

真不莲
16分钟前
19
0
详解Microsoft.AspNetCore.CookiePolicy

原文: 详解Microsoft.AspNetCore.CookiePolicy 详解Asp.Net Core中的Cookie策略 目录 详解Asp.Net Core中的Cookie策略 功能介绍 使用Cookie策略 从UseCookiePolicy方法入手 实现IResponseCo...

osc_0vd38ylb
18分钟前
16
0
怎么找到自己收藏过的思维导图模板?迅捷画图教你详细步骤!

怎么找到自己收藏过的思维导图模板?大家在刷视频的时候,遇到自己喜欢的视频,都会点个微信或者关注,用电脑看网页的时候,遇到有意思的网站,也会点击添加书签进行收藏,方便以后阅读或者是...

赛利亚大姐大
19分钟前
6
0
Microsoft.AspNetCore.Authentication.Cookies从入门到精通 (一)

原文: Microsoft.AspNetCore.Authentication.Cookies从入门到精通 (一) Microsoft.AspNetCore.Authentication.Cookies从入门到精通 (一) 目录 Microsoft.AspNetCore.Authentication.Cook......

osc_t5nbj8ds
19分钟前
10
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部