背景
我们最近在使用spring-data-redis进行redis cluster模式测试时,发现jedis在进行expiredAt命令调用时有bug,最终调用的是pexpire命令,这个bug会导致key过期时间很长,导致redis内存溢出等问题。spring-data-redis中expiredAt命令调用栈如下:
由于这个类属于jedis,因此升级jedis至最新(2.8.1 -> 2.9.0),依然存在此代码。在GitHub issue里我发布了关于这个问题的讨论,jedis的开发者回复确实是个bug,在jedis 2.9.1中才会解决,目前jedis 2.9.1还未发布。在这里我对该命令的代码解析了一下。
环境
jar版本: spring-data-redis-1.8.4-RELEASE.jar、jedis-2.9.0.jar
测试环境: Redis 3.2.8,八个集群节点
applicationContext-redis-cluster.xml 配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">
<!-- 连接池配置. -->
<bean id="jedisPoolConfig" class="redis.clients.jedis.JedisPoolConfig">
<!-- 连接池中最大连接数。高版本:maxTotal,低版本:maxActive -->
<property name="maxTotal" value="8" />
<!-- 连接池中最大空闲的连接数. -->
<property name="maxIdle" value="4" />
<!-- 连接池中最少空闲的连接数. -->
<property name="minIdle" value="1" />
<!-- 当连接池资源耗尽时,调用者最大阻塞的时间,超时将跑出异常。单位,毫秒数;默认为-1.表示永不超时。高版本:maxWaitMillis,低版本:maxWait -->
<property name="maxWaitMillis" value="5000" />
<!-- 连接空闲的最小时间,达到此值后空闲连接将可能会被移除。负值(-1)表示不移除. -->
<property name="minEvictableIdleTimeMillis" value="300000" />
<!-- 对于“空闲链接”检测线程而言,每次检测的链接资源的个数。默认为3 -->
<property name="numTestsPerEvictionRun" value="3" />
<!-- “空闲链接”检测线程,检测的周期,毫秒数。如果为负值,表示不运行“检测线程”。默认为-1. -->
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<!-- testOnBorrow:向调用者输出“链接”资源时,是否检测是有有效,如果无效则从连接池中移除,并尝试获取继续获取。默认为false。建议保持默认值. -->
<!-- testOnReturn:向连接池“归还”链接时,是否检测“链接”对象的有效性。默认为false。建议保持默认值. -->
<!-- testWhileIdle:向调用者输出“链接”对象时,是否检测它的空闲超时;默认为false。如果“链接”空闲超时,将会被移除。建议保持默认值. -->
<!-- whenExhaustedAction:当“连接池”中active数量达到阀值时,即“链接”资源耗尽时,连接池需要采取的手段, 默认为1(0:抛出异常。1:阻塞,直到有可用链接资源。2:强制创建新的链接资源) -->
</bean>
<bean id="n1" class="org.springframework.data.redis.connection.RedisNode">
<constructor-arg value="127.0.0.1" />
<constructor-arg value="6379" type="int" />
</bean>
<bean id="n2" class="org.springframework.data.redis.connection.RedisNode">
<constructor-arg value="127.0.0.1" />
<constructor-arg value="6380" type="int" />
</bean>
<bean id="n3" class="org.springframework.data.redis.connection.RedisNode">
<constructor-arg value="127.0.0.1" />
<constructor-arg value="6381" type="int" />
</bean>
<bean id="redisClusterConfiguration"
class="org.springframework.data.redis.connection.RedisClusterConfiguration">
<property name="clusterNodes">
<set>
<ref bean="n1" />
<ref bean="n2" />
<ref bean="n3" />
</set>
</property>
<property name="maxRedirects" value="5" />
</bean>
<bean id="jedisConnectionFactory"
class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
<constructor-arg ref="redisClusterConfiguration" />
<constructor-arg ref="jedisPoolConfig" />
</bean>
<!-- Spring提供的访问Redis类. -->
<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="jedisConnectionFactory" />
<property name="KeySerializer">
<bean
class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
<property name="ValueSerializer">
<bean
class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
<property name="hashKeySerializer">
<bean
class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
<property name="hashValueSerializer">
<bean
class="org.springframework.data.redis.serializer.StringRedisSerializer" />
</property>
</bean>
<!-- Redis配置结束 -->
</beans>
相关源码解析
现在我们项目中主要使用spring-data-redis来进行redis操作,其中使用RedisTemplate,再配合jedis完成redis相关命令操作。集群环境下,关键类及实现流程如下:
redis集群配置类,负责保存集群配置:
org.springframework.data.redis.connection.RedisClusterConfiguration
redis连接工厂类,负责创建集群连接:
org.springframework.data.redis.connection.jedis.JedisConnectionFactory
集群连接Connection类,负责连接redis集群:
org.springframework.data.redis.connection.jedis.JedisClusterConnection
集群操作类,负责与redis集群进行命令交互:
redis.clients.jedis.JedisCluster
集群操作过程:
当spring容器随应用启动的时候,JedisConnectionFactory会根据RedisClusterConfiguration的配置创建redis集群的配置、连接类实例,最后使用JedisCluster类完成redis命令操作。
JedisConnectionFactory主要源码如下:
/**
* 主要根据集群配置初始化cluster对象
**/
public void afterPropertiesSet() {
if (shardInfo == null) {
shardInfo = new JedisShardInfo(hostName, port);
if (StringUtils.hasLength(password)) {
shardInfo.setPassword(password);
}
if (timeout > 0) {
setTimeoutOn(shardInfo, timeout);
}
}
if (usePool && clusterConfig == null) {
this.pool = createPool();
}
//因为我们的集群配置不为空,因此这里就会创建redis.clients.jedis.JedisCluster类对象
if (clusterConfig != null) {
this.cluster = createCluster();
}
}
createCluster()源码如下:
/**
* 创建JedisCluster对象,它负责与redis集群进行命令交互
**/
private JedisCluster createCluster() {
JedisCluster cluster = createCluster(this.clusterConfig, this.poolConfig);
this.clusterCommandExecutor = new ClusterCommandExecutor(
new JedisClusterConnection.JedisClusterTopologyProvider(cluster),
new JedisClusterConnection.JedisClusterNodeResourceProvider(cluster), EXCEPTION_TRANSLATION);
return cluster;
}
从上面方法的源码我们已经看到了JedisClusterConnection类,该类在JedisConnectionFactory创建的代码如下:
/**
* 获得JedisClusterConnection对象
**/
public RedisConnection getConnection() {
//如果JedisCluster对象不为空,则返回JedisClusterConnection对象
if (cluster != null) {
return getClusterConnection();
}
Jedis jedis = fetchJedisConnector();
JedisConnection connection = (usePool ? new JedisConnection(jedis, pool, dbIndex, clientName)
: new JedisConnection(jedis, null, dbIndex, clientName));
connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
return postProcessConnection(connection);
}
/**
* 创建JedisClusterConnection对象
**/
public RedisClusterConnection getClusterConnection() {
if (cluster == null) {
throw new InvalidDataAccessApiUsageException("Cluster is not configured!");
}
return new JedisClusterConnection(cluster, clusterCommandExecutor);
}
JedisClusterConnection类的cluster属性定义如下:
private final JedisCluster cluster;
接着我们看一下JedisCluster类中各种key过期设置的实现源码:
@Override
public Long expire(final String key, final int seconds) {
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {
@Override
public Long execute(Jedis connection) {
return connection.expire(key, seconds);
}
}.run(key);
}
@Override
public Long pexpire(final String key, final long milliseconds) {
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {
@Override
public Long execute(Jedis connection) {
return connection.pexpire(key, milliseconds);
}
}.run(key);
}
@Override
public Long expireAt(final String key, final long unixTime) {
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {
@Override
public Long execute(Jedis connection) {
return connection.expireAt(key, unixTime);
}
}.run(key);
}
@Override
public Long pexpireAt(final String key, final long millisecondsTimestamp) {
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {
@Override
public Long execute(Jedis connection) {
return connection.pexpireAt(key, millisecondsTimestamp);
}
}.run(key);
}
在JedisCluster类中过期操作的四个方法expire、expireAt、pexpire和pexpireAt调用的命令没有错,而它的父类BinaryJedisCluster的pexpireAt方法调用的命令是pexpire,从而导致了bug。
redisTemplate expireAt操作流程
在这里我重点分析redisTemplate expireAt操作流程以及bug产生过程。
现在我们看下redisTemplate中expireAt方法的实现:
public Boolean expireAt(K key, final Date date) {
//获取key的字节数组
final byte[] rawKey = rawKey(key);
return execute(new RedisCallback<Boolean>() {
public Boolean doInRedis(RedisConnection connection) {
try {
//redis 2.6以上执行该方法
return connection.pExpireAt(rawKey, date.getTime());
} catch (Exception e) {
//redis 2.6以下执行该方法
return connection.expireAt(rawKey, date.getTime() / 1000);
}
}
}, true);
}
/**
* 将key转换为字节数组
**/
@SuppressWarnings("unchecked")
private byte[] rawKey(Object key) {
Assert.notNull(key, "non null key required");
if (keySerializer == null && key instanceof byte[]) {
return (byte[]) key;
}
return keySerializer.serialize(key);
}
在redisTemplate的expireAt方法中,有一个关键的地方:
//获取key的字节数组
final byte[] rawKey = rawKey(key);
获取key的字节数组之后,再调用RedisConnection接口的pExpireAt或expireAt方法。由于redis从3.0开始才支持集群,因此这里调用的是RedisConnection接口的pExpireAt方法。在这里RedisConnection接口的实现类是JedisClusterConnection类,它的pExpireAt方法实现如下:
/*
* (non-Javadoc)
* @see org.springframework.data.redis.connection.RedisKeyCommands#pExpireAt(byte[], long)
*/
@Override
public Boolean pExpireAt(byte[] key, long unixTimeInMillis) {
try {
//调用JedisCluster对象的pexpireAt方法,最终调用的是父类BinaryJedisCluster的pexpireAt方法
return JedisConverters.toBoolean(cluster.pexpireAt(key, unixTimeInMillis));
} catch (Exception ex) {
throw convertJedisAccessException(ex);
}
}
在这里调用JedisCluster对象的pexpireAt方法时,由于key参数是字节数组类型,而JedisCluster类没有对应的pexpireAt(final byte[] key, final long millisecondsTimestamp)方法,因此会调用父类BinaryJedisCluster的pexpireAt方法,它的父类BinaryJedisCluster中pexpireAt方法的定义如下:
@Override
public Long pexpireAt(final byte[] key, final long millisecondsTimestamp) :
return new JedisClusterCommand<Long>(connectionHandler, maxAttempts) {
@Override
public Long execute(Jedis connection) {
// 调用的是pexpire命令而不是pexpireAt命令
return connection.pexpire(key, millisecondsTimestamp);
}
}.runBinary(key);
}
因此bug就产生了。pexpireat命令与pexpire命令都是以毫秒形式来设置key的过期时间,它们的不同如下:
命令名称 | 说明 |
---|---|
PEXPIRE | 以毫秒为单位设置 key 的生存时间 |
PEXPIREAT | 以毫秒为单位设置 key 的过期 unix 时间戳 |
比如我们当前使用的时间是2017/10/12 09:41:56,它的unix时间戳为1507772516000毫秒,当我们使用PEXPIREAT命令时,由于是过去的时间,相应的key会立即过期。而我们误用了PEXPIRE命令时,key不会立即过期,而是等到1507772516000毫秒后才过期,key过期时间会相当长,从而可能导致redis内存溢出、服务器崩溃等问题。
测试结果
测试及追踪源码发现,在集群环境下,使用spring-data-redis的RedisTemplate类与redis进行交互时:
-
如果key为String类型,底层最终调用的是JedisCluster类的相关方法进行redis命令操作
-
如果key为byte[]字节数组类型,底层最终调用的是BinaryJedisCluster类的相关方法进行redis命令操作
解决办法
在jedis 2.9.0及以下版本中,解决办法有两种,第一种是直接使用RedisTemplate类的execute方法调用expireAt命令,代码如下:
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 执行expireAt命令
*/
public void expireAt(String key, Date deadLine){
byte[] rawKey = rawKey(key);
redisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection) {
return connection.expireAt(rawKey, deadLine.getTime() / 1000);
}
}, true);
}
/**
* 获取key序列化方式
*/
@SuppressWarnings("rawtypes")
private RedisSerializer keySerializer() {
return redisTemplate.getKeySerializer();
}
/**
* 使用相应的key序列化方式获取key的byte[]数组
*/
@SuppressWarnings("unchecked")
private byte[] rawKey(Object key) {
if (keySerializer() == null && key instanceof byte[]) {
return (byte[]) key;
}
return keySerializer().serialize(key);
}
第二种是直接使用JedisCluster,使用JedisCluster来进行expireAt命令操作,可以参考分布式缓存技术redis学习系列(七)——spring整合jediscluster来整合JedisCluster。
问题讨论
以上就是我的整个分析过程,我已在GitHub和Stack Overflow上提交了issue,jedis的开发者已经做了回复:
-
GitHub issue:BinaryJedisCluster pexpireAt bug
-
Stack Overflow:Jedis BinaryJedisCluster pexpireAt bug