文档章节

使用Spring Jms连接ActiveMQ 重连问题的一点心得

杨延庆
 杨延庆
发布于 2016/09/30 17:44
字数 1317
阅读 2463
收藏 36
点赞 2
评论 0

我们现在做的Web项目遇到一个问题,项目启动时使用Failover方式连接某个ActiveMQ,brokerUrl类似

failOver:(tcp://xxx.xxx.xxx.xxx:61616),如果ActiveMQ启动正常,项目启动正常;

但如果ActiveMQ没有启动,项目将无法启动完成,一直卡在连接ActiveMQ阶段。

经过调查,发现是ActiveMQ提供的FailOver Transport方式导致的。

FailOver Transport有maxReconnectAttempts和startupMaxReconnectAttempts两个连接参数。

    前者是FailOver Transport最大的重连次数,后者是启动阶段的最大重连次数,由于我们原先的连接这两个参数没有设置值,它们都沿用默认值-1,表示无限次重连,直到连接成功为止。这样,只要我们在启动项目时不启动ActiveMQ,FailOver Transport将一直尝试重连,导致后续程序加载无法进行,从而项目启动卡住。

    找到了原因之后,我们在brokerUrl中设置startupMaxReconnectAttempts=2,形如:

    failOver:(tcp://xxx.xxx.xxx.xxx:61616)?startupMaxReconnectAttempts=2,这样启动时FailOverTransport仅仅重试2次,如果不成功,就不再尝试,继续加载后续程序,这样就解决了项目启动卡住的问题。

    然而还有一个问题没有解决,如果启动时没有连接ActiveMQ成功,在项目启动完成后,再启动ActiveMQ时,即使ActiveMQConnectionFactory一直重连ActiveMQ,直到连接成功。我们配置的DefaultMessageListenerContainer仍然会抛出以下异常:

ERROR DefaultMessageListenerContainer - Could not refresh JMS Connection for destination 'queue://xxxxxxx' - retrying in 5000 ms. Cause: The JMS connection has failed: Connection refused: connect.

   在StackOverflow上有人给出了解答,是因为DefaultMessageListenerContainer引用的

SingleConnectionFactory对象的reconnectOnException属性。

(http://stackoverflow.com/questions/17729066/spring-mq-jms-reconnect-configuration)

   这个属性的说明如下(基于Spring Framework 4.3.2 RELEASE版本)

   Specify whether the single Connection should be reset (to be subsequently renewed) when a JMSException is reported by the underlying Connection.

  这个属性指定是否在底层的连接(这里是指ActiveMQ连接)抛出JMSException的时候,对连接进行重置。

  它的默认值是false,意味着即使JMS异常抛出,SingleConnectionFactory自带的connection没有重置。

在调用getConnection()方法时,由于连接没有重置,this.connection != null,仍然返回旧的connection.

从而DefaultMessageListenerContainer引用的连接仍然失败连接,不断抛出异常

protected Connection getConnection() throws JMSException {
		synchronized (this.connectionMonitor) {
			if (this.connection == null) {
				initConnection();
			}
			return this.connection;
		}
	}

   当我们设置这个属性为true后,在抛出Jms异常时,SingleConnectionFactory对象绑定的ExceptionListner代理对象会回调SingleConnectionFactory对象的onException方法,将connection属性设置为null,这样在getConnection()方法时,会调用initConnection方法创建新的连接。这样就能保证ActiveMQ启动成功后,会有对应的成功连接被创建,从而使DefaultMessageListenerContainer引用到正常连接,不再抛出异常。

public class SingleConnectionFactory ......
{

    @Override
	public void onException(JMSException ex) {
		logger.warn("Encountered a JMSException - resetting the underlying JMS Connection", ex);
		resetConnection();
	}

    public void resetConnection() {
		synchronized (this.connectionMonitor) {
			if (this.connection != null) {
				closeConnection(this.connection);
			}
			this.connection = null;
		}
	}

    protected Connection getConnection() throws JMSException {
		synchronized (this.connectionMonitor) {
			if (this.connection == null) {
				initConnection();
			}
			return this.connection;
		}
	}

    public void initConnection() throws JMSException {
		if (getTargetConnectionFactory() == null) {
			throw new IllegalStateException(
					"'targetConnectionFactory' is required for lazily initializing a Connection");
		}
		synchronized (this.connectionMonitor) {
			if (this.connection != null) {
				closeConnection(this.connection);
			}
			this.connection = doCreateConnection();
			prepareConnection(this.connection);
			if (this.startedCount > 0) {
				this.connection.start();
			}
			if (logger.isInfoEnabled()) {
				logger.info("Established shared JMS Connection: " + this.connection);
			}
		}
	}
}

private class AggregatedExceptionListener implements ExceptionListener {

		final Set<ExceptionListener> delegates = new LinkedHashSet<ExceptionListener>(2);

		@Override
		public void onException(JMSException ex) {
			synchronized (connectionMonitor) {
				// Iterate over temporary copy in order to avoid ConcurrentModificationException,
				// since listener invocations may in turn trigger registration of listeners...
				for (ExceptionListener listener : new LinkedHashSet<ExceptionListener>(this.delegates)) {
					listener.onException(ex);
				}
			}
		}
	}



   上述代码基于Spring Framework 4.3.3.RELEASE版本

 具体的配置信息如下:

<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        ...................
        <property name="reconnectOnException" value="true"/> 
</bean>

     上面的修改方案虽然可以解决ActiveMQ服务器未启动时应用启动卡住的问题,但是每次重置JMS连接时都会输出异常信息,这是因为重置连接时会将抛出的异常信息输出到日志。     

package org.springframework.jms.connection;

public class SingleConnectionFactory implements ConnectionFactory, QueueConnectionFactory,
		TopicConnectionFactory, ExceptionListener, InitializingBean, DisposableBean {
.............
public void onException(JMSException ex) {
		logger.warn("Encountered a JMSException - resetting the underlying JMS Connection", ex);
		resetConnection();
	}
.............
}

       如果不想查看重置连接时抛出的异常信息,可以在日志配置文件中设置日志输出级别为ERROR(以logback为例) 

<logger name="org.springframework.jms.connection" level="ERROR" />

       

       此外在ActiveMQ没有启动的情况下,如果在项目中为ActiveMQ消息队列配置了DefaultMessageListenerContainer对象,它将会持续刷新JMS连接,输出异常信息,默认的刷新间隔是由DefaultMessageListenerContainer对象的backOff属性对象的interval属性确定的

public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer {
...........
/**
	 * The default recovery interval: 5000 ms = 5 seconds.
*/
public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, Long.MAX_VALUE);

public void setRecoveryInterval(long recoveryInterval) {
		this.backOff = new FixedBackOff(recoveryInterval, Long.MAX_VALUE);
}
...........
}

public class FixedBackOff implements BackOff {

private long interval = 5000L;
........
public FixedBackOff(long interval, long maxAttempts) {
        this.interval = interval;
        this.maxAttempts = maxAttempts;
    }
........
}

    如果在定义DefaultMessageListenerContainer对象时没有设定recoveryInterval,那它将使用默认的5000ms初始化backoff对象。实际运行时DefaultMessageListenerContainer对象将使用5000ms(5s)这个时间间隔刷新JMS连接,输出连接异常信息。如果定义的DefaultMessageListenerContainer对象数目很多,日志中的连接异常信息量将会很大,我们需要调整这个时间间隔。
    可以在定义DefaultMessageListenerContainer对象时设置recoveryInterval(下面的例子将recoveryInterval设置为30000ms,大家可以根据自己的需求进行调整。) 

<bean id="xxxContainer" 
        class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
        ..................
		<property name="recoveryInterval" value="30000" />
</bean>

 

 

© 著作权归作者所有

共有 人打赏支持
杨延庆
粉丝 30
博文 47
码字总数 44076
作品 0
浦东
架构师
Ubuntu下安装ActiveMQ

主要参考http://www.jmkg.co.uk/2010/08/31/installing-activemq-on-ubuntu/,略有补充 1. 下载安装包,建立activemq用户,安装 下载最新的activemq,写此文时为5.7。 cd /opt/tar xvzf xxx...

RuralHunter
2013/01/22
0
0
ActiveMQ集群方案(上)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51124749 目...

yunlielai
04/15
0
0
ActiveMQ:No operations allowed after statement ...

ActiveMQ版本:5.5.1 记录人:@郑昀现象: 系统现象:部分消息发送失败,失败频率不正常。 日志信息:activemq.log 中一直有这样的错误日志: 看上去又是 mq broker 失去了数据库连接,但代码...

旁观者-郑昀
2012/11/10
0
0
ActiveMQ:Communications link failure问题以及解决办法

ActiveMQ版本:5.5.1 MQ 所使用的 MySQL 是 InnoDB存储引擎 记录人:@郑昀现象: 业务表面现象:无。系统现象:无。 日志信息:业务系统发送 MQ 消息时,下面这种错误日志断断续续地一直都有...

旁观者-郑昀
2012/11/10
0
0
ActiveMQ集群方案(下)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51205822 目...

yunlielai
04/15
0
0
基于levelDB可复制master/slave(zookeeper+levelDB)

Leveldb是一个google实现的非常高效的kv数据库,是单进程的服务,能够处理十亿级别规模Key-Value型数据,占用内存小。 基于可复制LevelDB的集群方案,需要引入ZooKeeper。根据ZooKeeper的使用...

chaun
2015/08/17
0
0
关于ActiveMQ序列化对象爆“Forbidden class xxx! ...”问题的解决

如题所示,最开始使用了默认配置: 然后使用ActiveMQ对对象进行序列化时报了如下错误: Caused by: java.lang.ClassNotFoundException: Forbidde...

pangfc
06/26
0
0
Win7环境下安装ActiveMQ

参考ActiveMQ官方文档:http://activemq.apache.org/getting-started.html 安装ActiveMQ 近来要学习JMS,在网上查了些资料,发现ActiveMQ是比较流行的JMS开源框架,决定使用ActiveMQ来学习J...

纠结名字
2015/08/09
0
0
Linux 安装ActiveMQ(使用Mac远程访问)

阅读本文需要安装JDK 一 ActiveMQ简介 activemq是用java语言编写的一款开源消息总线 activemq是apache出品 activemq消息的传递有两种类型 一种是点对点(即一个生产者和一个消费者一一对应) 另...

梦三
07/15
0
0
ActiveMQ学习笔记(5)——使用Spring JMS收发消息

ActiveMQ学习笔记(四)http://my.oschina.net/xiaoxishan/blog/380446 中记录了如何使用原生的方式从ActiveMQ中收发消息。可以看出,每次收发消息都要写许多重复的代码,Spring 为我们提供了更...

郭寻抚
2015/03/02
0
10

没有更多内容

加载失败,请刷新页面

加载更多

下一页

c++ qt 组播总结

每个人都有不同的认知规律和习惯, 有的人喜欢搞一套严密的大理论, 论述起来滔滔不绝, 不管自己懂不懂, 反正读者/听者是没搞懂。 有的人喜欢从实践出发, 没看到代码, 不运行一下, 不看...

backtrackx
3分钟前
0
0
Sublime text2安装json格式化插件SublimePrettyJson[Windows]

一、下载SublimePrettyJson插件包 https://github.com/dzhibas/SublimePrettyJson 二、将下载的文件解压放到在package目录下面 C:\Users\lucky\AppData\Roaming\Sublime Text 3\Packages 每个......

lazy~
3分钟前
0
0
安装vue-cli 报4058错误

1. 4058是网络代理错误。 安装淘宝源修改一下就可以了: npm --registry https://registry.npm.taobao.org info underscore 改为cnpm执行: cnpm install --global vue-cli 安装成功: 试试版...

MrBoyce
4分钟前
0
0
CPU飙升分析

1、top -----看具体的进程 2、top -H -p pid ------该进程的线程 3、printf 0x%x 15248 ------将线程改为16进制 4、jstack 进程...

北极之北
7分钟前
1
0
新生代Eden与两个Survivor区的解释

聊聊JVM的年轻代 1.为什么会有年轻代 我们先来屡屡,为什么需要把堆分代?不分代不能完成他所做的事情么?其实不分代完全可以,分代的唯一理由就是优化GC性能。你先想想,如果没有分代,那我...

浮躁的码农
9分钟前
0
0
【JVM】JSTATD结合Java VisualVM进行远程监控JVM运行情况(二)

内存泄露指的是程序中动态分配内存给一些临时对象,但是对象不会被GC(java垃圾回收机制gabage collection)所回收,它始终占用内存。即被分配的对象很大但已无用; 内存溢出指的是程序运行过...

大白来袭
12分钟前
1
0
聊聊ribbon的超时时间设置

序 本文主要研究一下ribbon的超时时间设置 配置 实例 ribbon: ReadTimeout: 10000 ConnectTimeout: 10000 MaxAutoRetries: 0 MaxAutoRetriesNextServer: 1 eureka: enabled: ......

go4it
21分钟前
0
0
一行代码结果叹为观止,能做到这么极致的也只有python了

Python 这门语言非常的有趣,不仅可以做高大上的人工智能、大数据、机器学习。还可以用来做 Web、爬虫。还有其它很多的应用。今天我就给大家展示下一行 Python 代码都可以做些什么。 一行打印...

猫咪编程
24分钟前
2
0
KingShard使用

对于kingshard的功能,在git中可以看到明确的功能说明 主要功能: 1. 基础功能 支持SQL读写分离。 支持透明的MySQL连接池,不必每次新建连接。 支持平滑上线DB或下线DB,前端应用无感知。 支...

mickelfeng
26分钟前
0
0
Linux 下 查找某个字符串

如果你想在当前项目下 查找 "test" 这个字符串,可以这样: grep -rn "test" * * : 表示当前目录所有文件,也可以是某个文件名-r 是递归查找-n 是显示行号-R ...

nsns
26分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部