文档章节

redisson的理解和使用-调用流程

果核里
 果核里
发布于 2017/06/05 10:05
字数 2339
阅读 95
收藏 1

 

redisson是一个用于连接redis的java客户端工作,相对于jedis,是一个采用异步模型,大量使用netty promise编程的客户端框架。

0     代码示例

//创建配置信息
        Config config = new Config();
        config.useSingleServer().setAddress("localhost:6379").setConnectionPoolSize(5);

        Redisson redisson = Redisson.create(config);

        //测试 list
        List<String> strList = redisson.getList("strList");
        strList.clear(); //清除所有数据
        strList.add("测试中文1");
        strList.add("test2");

        redisson.shutdown();

从代码上来看,其基本的使用非常简单,在最后的使用当中。除与redisson打交道之外(获取各种数据结构),完全感觉不到与redis的信息连接。甚至于返回于上层直接不需要考虑下层的实现,一切均由redisson进行了封装。

1     创建初始连接结构

1.1     配置信息config

config信息由4种不同的连接配置和其它属性进行组合,如下所示:

private SentinelServersConfig sentinelServersConfig;//基本哨兵server的配置
    private MasterSlaveServersConfig masterSlaveServersConfig;//基于主从的配置
    private SingleServerConfig singleServerConfig;//基于单台连接的配置
    private ClusterServersConfig clusterServersConfig;//基于集群的配置

    //这里即在整个命令处理过程中,总共的线程数,可以理解为所有的底层运行都是由这些线程来运行(而不是用户线程)
    private int threads = 0; // 0 = current_processors_amount * 2 

    private RedissonCodec codec;//用于实现对象编码的实现(从object->byte[],或byte[]->object) ,默认为jackson

针对上面的4种config,又是基于同一个继承体系,如下所示:

如上所示,在baseConfig中定义了基本的属性,然后除单机器配置之外,其它的都基于一个主从的配置,类似多节点地址的配置信息。在后面的实现中,我们将看到更多这种设计,即通过继承来完成多种不同场景的实现。

然后,通过 Redisson.create来完成一个类似于client的创建,可以认为创建的对象是一个单例。在后面的调用中,都使用此实例即可。其内部结构如下所示:

public class Redisson implements RedissonClient {

    private final ConnectionManager connectionManager;
    private final Config config;
}

其中config即刚才我们定义的配置对象,而connectionManager即可以理解为通过刚才的不同类型的连接配置实现了不同的连接管理器,与配置信息一致,相对象的连接管理器也是按照继承体系来实现的,如下所示:

上面的实现与配置一致,不过这里采用了主从管理的基本实现,其它实现通过override相应的方法来提供不同的子实现。其中单机实现,可以理解为没有slaver的连接处理。连接管理器,实现了连接信息管理的机制,并且实现了如何选择合适的连接对象来执行不同的操作的语义。即向外封装了connection的调用,入口都通过connectionManager来处理。正因为redisson向外提供的是数据结构语义,因此也不需要暴露connection信息,因此这种实现是值得的。

connectionManager的定义如下所示:

//获取相应值,一个封装调用
	<V> V get(Future<V> future);
/** --------------------- 基本同步的操作实现,不同的调用方法 最终都返回具体的值 start -------------- */
	<V, R> R read(String key, SyncOperation<V, R> operation);
	<V, R> R read(SyncOperation<V, R> operation);
	<V, R> R write(String key, SyncOperation<V, R> operation);
	<V, R> R write(SyncOperation<V, R> operation);
	<V, R> R write(String key, AsyncOperation<V, R> asyncOperation);
	<V, R> R write(AsyncOperation<V, R> asyncOperation);
	<V, T> Future<T> writeAllAsync(AsyncOperation<V, T> asyncOperation);
	<V, T> T read(String key, AsyncOperation<V, T> asyncOperation);
	<V, T> T read(AsyncOperation<V, T> asyncOperation);
/** --------------------- 基本同步的操作实现,不同的调用方法 最终都返回具体的值 end -------------- */
/** --------------------- 基本异步的操作实现,返回future 而业务自行决定如何处理 start -------------- */
	<V, T> Future<T> readAsync(String key, AsyncOperation<V, T> asyncOperation);
	<V, T> Future<T> readAsync(AsyncOperation<V, T> asyncOperation);
	<V, T> Future<T> writeAsync(String key, AsyncOperation<V, T> asyncOperation);
	<V, T> Future<T> writeAsync(AsyncOperation<V, T> asyncOperation);
/** --------------------- 基本异步的操作实现,返回future 而业务自行决定如何处理 end -------------- */
/** 创建一个新的连接对象(用于读操作),即实现连接池语义,写操作由子类具体实现 */
	<K, V> RedisConnection<K, V> connectionReadOp(int slot);

2     创建数据连接

创建连接根据当前操作是读还是写来进行,因为如果是读操作,可以通过主从由从机器进行连接。这里仅考虑创建写操作的连接,其在主机器上进行。因此,我们直接查看相应的实现。

protected <K, V> RedisConnection<K, V> connectionWriteOp(int slot) {
        return getEntry(slot).connectionWriteOp();
    }

转交由entry,即masterSlaveEntry来处理,entry可以理解为一个具体的表示单个机器的连接配置对象。

public <K, V> RedisConnection<K, V> connectionWriteOp() {
//这里通过计数信号量来简单进行判定连接数是否已达上线,如果已达上线,则强制阻塞当前线程
	acquireMasterConnection();
//从已创建好的队列中获取,如未创建,则进行创建
	RedisConnection<K, V> conn = masterEntry.getConnections().poll();
	if (conn != null) {
	    return conn;
	}
//具体的创建过程
	    conn = masterEntry.getClient().connect(codec);
......
	    return conn;
    }

从具体的connect方法来看,即创建一个 RedisAsyncConnection 对象,通过注册入连接远程端口的channel中,通过相应的active事件完成对channel的绑定。后续的操作均通过对channel发送事件来完成。具体的代码如下所示:

connect = bootstrap.handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(watchdog, handler, connection);
                    }
                }).connect();
            }
            connect.sync();

从上可以看出,在连接的时候指定了3个处理器,即IN OUT的处理器

watchlog负责处理连接断开的时候是否进行重连

handler负责处理最终进行命令的底层发送和接收

connection则负责进行命令的转发,包括各项对redis命令的封装调用。可以理解为handler工作在网络层,而connection则工作在应用层。

3     发送调用命令

3.1     发送的准备操作

如在redissonList中的size操作,即通过调用connection的dispatch命令来操作。如下所示:

public synchronized <T> Promise<T> dispatch(CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args)

type表示指定的指令,如LLEN

output表示在数据返回之后,由commandHandler将相应的结果反序列化入output中

args即表示指令具体的参数

因为在output数据返回之后,需要通知到外转的信息,因此需要有一个相对应的promise,其被作为listner加入到execute(connection)中的结果回调当中。

3.2     数据命令发送

相应的代码在connection中的dispatch中,主要由如下代码构成:

Command<K, V, T> cmd = new Command<K, V, T>(type, output, args, multi != null, promise);
        queue.put(cmd);
        channel.writeAndFlush(cmd);

然后,这里需要由commandHander来负责具体的数据发送操作(因为,它是注册了channel的相应处理器), 具体代码如下所示:

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
	Command<?, ?, ?> cmd = (Command<?, ?, ?>) msg;
	ByteBuf buf = ctx.alloc().heapBuffer();
	cmd.encode(buf);
	ctx.write(buf, promise);
}

3.3     数据结果的解析

接下来的操作,由commandHandler来接收相应的数据,如下所示:

//由channelRead转向decode
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer) throws InterruptedException {
    while (true) {
	Command<K, V, ?> cmd = queue.peek();
	if (cmd == null
		|| !rsm.decode(buffer, cmd.getOutput())) {
	    break;
	}
	cmd = queue.take();
	cmd.complete();
    }
}

这里,因为在发送命令时,将每个命令相应的顺序入queue,因此在结果时,也是按照queue相对应的结果对应起来。在这里,直接就认为最先的结果对应于最先的cmd。通过rsm.decode,负责将相应buffer的结果反序列化output中,最后调用cmd.complete来通知进一步的promise。如下所示:

public void complete() {
	completeAmount--;
	if (completeAmount == 0) {
		Object res = output.get();
		...... 略去错误处理
			promise.setSuccess((T)res);
		}
	}
}

3.4     数据结构的反向处理

在上一步的处理中,当相应cmd的complete的处理中,之前传递的promise被设置了result,因此在其上绑定的promise也会进一步设置数据值。如果在最上面,我们使用如下的代码时:

mainPromise.awaitUninterruptibly().getNow()

就会最终获取到相应的结果,其值最终体现在业务代码中。

4     与netty的整合

在上面,我们可以看到,整个redisson和netty的协议进行了完全的整合,包括handler的介入,eventGroup的使用等,都使用了全套的netty体系。然后,由于netty提供了promise功能,这里也大量使用了相应的异步模型来进行数据处理。

5     总结

在redisson中,各个部分均采用了最新的一些技术栈,包括java 5线程语义,Promise编程语义,在技术的学习上有很高的学习意义。相比jedis,其支持的特性并不是很高,但对于日常的使用还是没有问题的。其对集合的封装,编解码的处理,都达到了一个开箱即用的目的。相比jedis,仅完成了一个基本的redis网络实现,可以理解为redisson是一个完整的框架,而jedis即完成了语言层的适配。其次,redisson在设计模式,以及编码上,都有完整的测试示例,代码可读性也非常好,很值得进行源码级学习。

如果在项目中已经使用了netty,那么如果需要集成redis,那么使用redisson是最好的选择了,都不需要另外增加依赖信息。

本文转载自:  http://www.iflym.com/index.php/code/201503290001.html

共有 人打赏支持
果核里
粉丝 2
博文 39
码字总数 10055
作品 0
朝阳
程序员
私信 提问
redisson实现分布式锁原理

Redisson分布式锁 之前的基于注解的锁有一种锁是基本redis的分布式锁,锁的实现我是基于redisson组件提供的RLock,这篇来看看redisson是如何实现锁的。 不同版本实现锁的机制并不相同 引用的...

技术小阿哥
2017/11/27
0
0
Redisson源码赏析 - 简介

转自官方wiki概述 Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(, , , ...

3jin
2018/01/05
0
0
聊聊redisson的DelayedQueue

序 本文主要研究一下redisson的DelayedQueue maven 实例 这里使用了两个queue,对delayedQueue的offer操作是直接进入delayedQueue,但是delay是作用在目标队列上,这里就是RBlockingQueue 源...

go4it
2018/09/22
0
0
Redis客户端redisson实战

redis 学习问题总结 http://aperise.iteye.com/blog/2310639 ehcache memcached redis 缓存技术总结 http://aperise.iteye.com/blog/2296219 redis-stat 离线安装 http://aperise.iteye.com......

哲别0
2018/05/21
0
0
程序对批量数据写入数据库的优化--引入Redis并通过定时器来触发

没错,还是上篇文章提到的那个SpringMVC+Mybatis的项目,在客户调我方接口,疯狂的给我们insert数据的时候,应该想到一些优化方案,于是Redis就被引用了。 关于Redis的客户端,服务端的一些用...

Pig-man
2016/03/21
88
0

没有更多内容

加载失败,请刷新页面

加载更多

Java生成二维码图片

maven配置jar包 <dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.3.3</version></dependency><dependency><groupId>com.google.z......

骑羊放狼灬
1分钟前
0
0
oracle 修改字段类型

1.varchar2 类型修改 例子:alter table T_Node modify (NODE_CONTEXT varchar2(4000)); 2.varchar2 修改为clob 例子: alter table T_Node add hehe clob; update T_Node set hehe=NODE_CO......

qimh
4分钟前
0
0
别再写 bug 了,避免空指针的 5 个案例!

空指针是我们 Java 开发人员经常遇到的一个基本异常,这是一个极其普遍但似乎又无法根治的问题。 本文,栈长将带你了解什么是空指针,还有如何有效的避免空指针。 什么是空指针? 当一个变量...

Java技术栈
8分钟前
0
0
FastJson对BigDecimal保留两位小数(valueFilter)

实现ValueFilter public class BigDecimalValueFilter implements ValueFilter { @Override public Object process(Object o, String name, Object value) {//o是待转换的对象,n......

石日天
10分钟前
0
0
android 颜色透明度参照比

##透明度参照表: 00%=FF(不透明) 5%=F2 10%=E5 15%=D8 20%=CC 25%=BF 30%=B2 35%=A5 40%=99 45%=8c 50%=7F 55%=72 60%=66 65%=59 70%=4c 75%=3F 80%=33 85%=21 90%=19 95%=0c 100%=00(全透......

东街小霸王
11分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部