文档章节

Jedis 客户端及ShardedJedis 源码学习

 陆大侠
发布于 2015/12/27 15:49
字数 1093
阅读 39
收藏 0
点赞 0
评论 0

Jedis 源码比较简洁,只依赖了apache的commons-pool2 这个库。(当然还有JUnit)。

分析源码的最好方式,就是去github下载一份,然后用idea或则eclipse打开,调试单元测试代码。(其它开源项目均可以这样做)。 

redis.clients.jedis.Jedis 这个类实现了redis.clients.jedis.commands.Commands以及BinaryScriptingCommands鞥接口(包含几乎所有Jedis命令以及lua脚本操作),并且它有用一个redis.clients.jedis.Client这个类的实例。

Client及其基类也实现Commands等接口。Jedis是个代理。Client基类的基类Connection中,持有Socket对象,Socket关联的RedisOutputStream以及RedisInputStream。这两个流包装了Socket的output和input流。   redis.clients.jedis.Protocol协助Connection实现redis协议的解析。

整个通讯过程是同步的,Redis对象是非线程安全的。   协议很简单,可以参考这篇博客。

http://www.cnblogs.com/smark/p/3247620.html

看下最内部的Connection对象的主要成员吧:

public class Connection implements Closeable {

  private static final byte[][] EMPTY_ARGS = new byte[0][];

  private String host = Protocol.DEFAULT_HOST;//默认主机都有,值是localhost
  private int port = Protocol.DEFAULT_PORT; //6379
  private Socket socket;                    //这就是用于通讯的Socket了
  private RedisOutputStream outputStream;   //发送命令的output流
  private RedisInputStream inputStream;     //接受结果的input流
  private int connectionTimeout = Protocol.DEFAULT_TIMEOUT;   //tcp连接建立时候的超时设置
  private int soTimeout = Protocol.DEFAULT_TIMEOUT;         //tcp操作超时时间
  private boolean broken = false;          //发生异常时,这个设置为true。外部就知道如何处理,比如连接池就用到这个。

再看看关键方法:

  public void connect() { //方法很简单,没什么好说的。
    if (!isConnected()) {
      try {
        socket = new Socket();
        // ->@wjw_add
        socket.setReuseAddress(true); //允许tcp重复绑定
        socket.setKeepAlive(true); // Will monitor the TCP connection is
        // valid
        socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to
        // ensure timely delivery of data
        socket.setSoLinger(true, 0); // Control calls close () method,
        // the underlying socket is closed
        // immediately
        // <-@wjw_add

        socket.connect(new InetSocketAddress(host, port), connectionTimeout);
        socket.setSoTimeout(soTimeout);
        outputStream = new RedisOutputStream(socket.getOutputStream());
        inputStream = new RedisInputStream(socket.getInputStream());
      } catch (IOException ex) {
        broken = true;
        throw new JedisConnectionException(ex);
      }
    }
  }
  
    protected Connection sendCommand(final ProtocolCommand cmd, final byte[]... args) {
    try {
      connect();
      Protocol.sendCommand(outputStream, cmd, args);
      return this;
    } catch (JedisConnectionException ex) {
      。。。省略
      }
      broken = true;
      throw ex;
    }
  }

最好的操作到了Protocal方法:

  public static void sendCommand(final RedisOutputStream os, final ProtocolCommand command,
      final byte[]... args) {
    sendCommand(os, command.getRaw(), args);
  }

  private static void sendCommand(final RedisOutputStream os, final byte[] command,
      final byte[]... args) {
    try {
      os.write(ASTERISK_BYTE);
      os.writeIntCrLf(args.length + 1);
      os.write(DOLLAR_BYTE);
      os.writeIntCrLf(command.length);
      os.write(command);
      os.writeCrLf();

      for (final byte[] arg : args) {
        os.write(DOLLAR_BYTE);
        os.writeIntCrLf(arg.length);
        os.write(arg);
        os.writeCrLf();
      }
    } catch (IOException e) {
      throw new JedisConnectionException(e);
    }
  }

然后Redis这边同步的调用了Client的getBinaryBulkReply等等方法,从input流里面,获取结果,和以上类似。

先看类图的:redis.clients.util.Pool持有一个GenericObjectPool<T>的对象,实现了连接池。

再来看看ShardedJedis, 它层次如下:

public class ShardedJedis extends BinaryShardedJedis implements JedisCommands, Closeable {

public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo> implements BinaryJedisCommands

重要的方法都封装了在基类Sharded类中,BinaryShardedJedis封装的是基于byte[]的接口,ShardedJedis则封装了String接口。

public class Sharded<R, S extends ShardInfo<R>> {
  public static final int DEFAULT_WEIGHT = 1;//默认每个redis的权重
  private TreeMap<Long, S> nodes;//红黑树纪录hash值到redisInfo的Map
  private final Hashing algo;//hash算法,默认MurMurHash
  //纪录了nodes的item的值ShardInfo<Redis>,到Redis连接对象的Map。
  private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<ShardInfo<R>, R>();

ShardedJedis以及BinaryShardedJedis回调用Sharded的getShard方法,返回一个Redis对象,然后操作这个对象。

在看Sharded的主要方法:1 构造函数。2 getShard方法。实现了一致性hash算法。

  public Sharded(List<S> shards, Hashing algo, Pattern tagPattern) {
    this.algo = algo; //hash算法
    this.tagPattern = tagPattern;
    initialize(shards);
  }

  private void initialize(List<S> shards) {
    nodes = new TreeMap<Long, S>();
    for (int i = 0; i != shards.size(); ++i) {
      final S shardInfo = shards.get(i);
      //为每个shardInfo在TreeMap中创建160个hashCode到shardInfo的映射。
      //如果有name就用else里面的key做hash,没有就是上面的"SHARD-" + i + "-NODE-" + n作为键。
      if (shardInfo.getName() == null) 
      for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
        nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
      }
      else for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
        nodes.put(this.algo.hash(shardInfo.getName() + "*" + shardInfo.getWeight() + n), shardInfo);
      }
      //最后根据shardInfo创建Redis对象,并使用shardInfo作为Map的键。
      resources.put(shardInfo, shardInfo.createResource());
    }
  }
 子类就是根据这个方法得到正确的Jedis实例。
 public R getShard(byte[] key) {
    return resources.get(getShardInfo(key));
 }
 如果key是String 类型,有另外一个重载方法
 public S getShardInfo(String key) {
    return getShardInfo(SafeEncoder.encode(getKeyTag(key)));
 }
 Sharded最核心的代码,哈哈,一共5行。 利用了红黑树TreeMap。
 public S getShardInfo(byte[] key) {
    SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));//这个根据key的hash值,找到所有比它大的SortedMap,这个map的S为Redis对象。
    if (tail.isEmpty()) { //如果没有找到,就取nodes的第一个ShardInfo。
      return nodes.get(nodes.firstKey());
    }
    return tail.get(tail.firstKey()); //如果找到了tail,取tail第一个ShardInfo。
  }





© 著作权归作者所有

共有 人打赏支持
粉丝 2
博文 52
码字总数 18787
作品 0
浦东
Jedis之ShardedJedis一致性哈希分析

Jedis之ShardedJedis一致性哈希分析 ShardedJedis通过一致性哈希实现的的分布式缓存。主要思路: redis服务器节点划分:将每台服务器节点采用hash算法划分为160个虚拟节点(可以配置划分权重)...

秋风醉了 ⋅ 2015/03/25 ⋅ 0

jdedis的shardedjedis有什么用?

Jedis中的pool都是用commonpool来实现的,jedis的实现是通过把所有的shard信息放入shardedjedis,然后池化shardedjedis,但是这样的话只要有一个shardinfo构造的jedis有问题,那整个shardedje...

杨子国 ⋅ 2014/03/07 ⋅ 0

redis的Java客户端Jedis测试验证

edis.jar包:https://github.com/xetorthio/jedis/downloads commons-pool:http://commons.apache.org/pool/download_pool.cgi 添加到lib 我的版本:jedis-2.1.0.jar commons-pool-1.6-bin.......

cheese ⋅ 2013/01/08 ⋅ 0

Jedis 与 ShardedJedis 设计

Jedis设计 Jedis作为推荐的java语言redis客户端,其抽象封装为三部分: 对象池设计:Pool,JedisPool,GenericObjectPool,BasePoolableObjectFactory,JedisFactory 面向用户的redis操作封装...

tantexian ⋅ 2016/06/07 ⋅ 0

jedis(Redis)非切片池和切片池(集群)的 实例

package testRedis; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Set; import redis.clients.jedis.Jedis; import redis.clients.jed......

QH_C ⋅ 2015/04/26 ⋅ 0

ShardedJedis 错误使用

BEFORE:之前使用的单实例的JedisPool没有这个问题;JedisPool的connected_clients 一般不会超过80;AFTER:扩展后6个实例,每个实例可以达到10K,从80徒增到60w; STEP 0. Jedis使用的是apa...

Howard.L.Huang ⋅ 2014/04/11 ⋅ 0

ShardedJedisPipeline 源码分析

一、什么是pipeline?什么是ShardedJedis? 由于pipeline和ShardedJedis的介绍和源码分析在网上已经有了,本文就不再赘述,直接给出链接: pipeline的介绍: http://blog.csdn.net/freebirdl...

yangbodong22011 ⋅ 2017/06/18 ⋅ 0

Jedis分片连接池(分布式)

一下内容来自网络,但是很多细节没有写出来,所以我经过自己琢磨,终于找到原因了。 Redis-2.4.15目前没有提供集群的功能,Redis作者在博客中说将在3.0中实现集群机制。目前Redis实现集群的方...

Zero零_度 ⋅ 2016/08/29 ⋅ 0

jedis shardedjedis 区别

jedis shardedjedis 区别是什么? redis设置成为主从同步,和jedis shardedjedis有什么关系? 应该如何使用?

badouyuren ⋅ 2014/11/12 ⋅ 1

shardedJedis无法设置DB的问题

本人一直从事java研发,java版本的redis客户端通常使用的是jedis。在之前生产开发过程中,redis主要以单机形式或者主备读写分离形式使用,并未涉及到分片等高级功能。最近,由于业务量激增,...

萧十一郎君 ⋅ 2016/10/21 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

SpringBoot集成Druid的最简单的小示例

参考网页 https://blog.csdn.net/king_is_everyone/article/details/53098350 建立maven工程 Pom文件 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM......

karma123 ⋅ 58分钟前 ⋅ 0

Java虚拟机基本结构的简单记忆

Java堆:一般是放置实例化的对象的地方,堆分新生代和老年代空间,不断未被回收的对象越老,被放入老年代空间。分配最大堆空间:-Xmx 分配初始堆空间:-Xms,分配新生代空间:-Xmn,新生代的大小一...

算法之名 ⋅ 今天 ⋅ 0

OSChina 周日乱弹 —— 这么好的姑娘都不要了啊

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @TigaPile :分享曾惜的单曲《讲真的》 《讲真的》- 曾惜 手机党少年们想听歌,请使劲儿戳(这里) @首席搬砖工程师 :怎样约女孩子出来吃饭,...

小小编辑 ⋅ 今天 ⋅ 8

Jenkins实践3 之脚本

#!/bin/sh# export PROJ_PATH=项目路径# export TOMCAT_PATH=tomcat路径killTomcat(){pid=`ps -ef | grep tomcat | grep java|awk '{print $2}'`echo "tom...

晨猫 ⋅ 今天 ⋅ 0

Spring Bean的生命周期

前言 Spring Bean 的生命周期在整个 Spring 中占有很重要的位置,掌握这些可以加深对 Spring 的理解。 首先看下生命周期图: 再谈生命周期之前有一点需要先明确: Spring 只帮我们管理单例模...

素雷 ⋅ 今天 ⋅ 0

zblog2.3版本的asp系统是否可以超越卢松松博客的流量[图]

最近访问zblog官网,发现zlbog-asp2.3版本已经进入测试阶段了,虽然正式版还没有发布,想必也不久了。那么作为aps纵横江湖十多年的今天,blog2.2版本应该已经成熟了,为什么还要发布这个2.3...

原创小博客 ⋅ 今天 ⋅ 0

聊聊spring cloud的HystrixCircuitBreakerConfiguration

序 本文主要研究一下spring cloud的HystrixCircuitBreakerConfiguration HystrixCircuitBreakerConfiguration spring-cloud-netflix-core-2.0.0.RELEASE-sources.jar!/org/springframework/......

go4it ⋅ 今天 ⋅ 0

二分查找

二分查找,也称折半查找、二分搜索,是一种在有序数组中查找某一特定元素的搜索算法。搜素过程从数组的中间元素开始,如果中间元素正好是要查找的元素,则搜素过程结束;如果某一特定元素大于...

人觉非常君 ⋅ 今天 ⋅ 0

VS中使用X64汇编

需要注意的是,在X86项目中,可以使用__asm{}来嵌入汇编代码,但是在X64项目中,再也不能使用__asm{}来编写嵌入式汇编程序了,必须使用专门的.asm汇编文件来编写相应的汇编代码,然后在其它地...

simpower ⋅ 今天 ⋅ 0

ThreadPoolExecutor

ThreadPoolExecutor public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, ......

4rnold ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部