文档章节

Jedis 客户端及ShardedJedis 源码学习

 陆大侠
发布于 2015/12/27 15:49
字数 1093
阅读 42
收藏 0

Jedis 源码比较简洁,只依赖了apache的commons-pool2 这个库。(当然还有JUnit)。

分析源码的最好方式,就是去github下载一份,然后用idea或则eclipse打开,调试单元测试代码。(其它开源项目均可以这样做)。 

redis.clients.jedis.Jedis 这个类实现了redis.clients.jedis.commands.Commands以及BinaryScriptingCommands鞥接口(包含几乎所有Jedis命令以及lua脚本操作),并且它有用一个redis.clients.jedis.Client这个类的实例。

Client及其基类也实现Commands等接口。Jedis是个代理。Client基类的基类Connection中,持有Socket对象,Socket关联的RedisOutputStream以及RedisInputStream。这两个流包装了Socket的output和input流。   redis.clients.jedis.Protocol协助Connection实现redis协议的解析。

整个通讯过程是同步的,Redis对象是非线程安全的。   协议很简单,可以参考这篇博客。

http://www.cnblogs.com/smark/p/3247620.html

看下最内部的Connection对象的主要成员吧:

public class Connection implements Closeable {

  private static final byte[][] EMPTY_ARGS = new byte[0][];

  private String host = Protocol.DEFAULT_HOST;//默认主机都有,值是localhost
  private int port = Protocol.DEFAULT_PORT; //6379
  private Socket socket;                    //这就是用于通讯的Socket了
  private RedisOutputStream outputStream;   //发送命令的output流
  private RedisInputStream inputStream;     //接受结果的input流
  private int connectionTimeout = Protocol.DEFAULT_TIMEOUT;   //tcp连接建立时候的超时设置
  private int soTimeout = Protocol.DEFAULT_TIMEOUT;         //tcp操作超时时间
  private boolean broken = false;          //发生异常时,这个设置为true。外部就知道如何处理,比如连接池就用到这个。

再看看关键方法:

  public void connect() { //方法很简单,没什么好说的。
    if (!isConnected()) {
      try {
        socket = new Socket();
        // ->@wjw_add
        socket.setReuseAddress(true); //允许tcp重复绑定
        socket.setKeepAlive(true); // Will monitor the TCP connection is
        // valid
        socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to
        // ensure timely delivery of data
        socket.setSoLinger(true, 0); // Control calls close () method,
        // the underlying socket is closed
        // immediately
        // <-@wjw_add

        socket.connect(new InetSocketAddress(host, port), connectionTimeout);
        socket.setSoTimeout(soTimeout);
        outputStream = new RedisOutputStream(socket.getOutputStream());
        inputStream = new RedisInputStream(socket.getInputStream());
      } catch (IOException ex) {
        broken = true;
        throw new JedisConnectionException(ex);
      }
    }
  }
  
    protected Connection sendCommand(final ProtocolCommand cmd, final byte[]... args) {
    try {
      connect();
      Protocol.sendCommand(outputStream, cmd, args);
      return this;
    } catch (JedisConnectionException ex) {
      。。。省略
      }
      broken = true;
      throw ex;
    }
  }

最好的操作到了Protocal方法:

  public static void sendCommand(final RedisOutputStream os, final ProtocolCommand command,
      final byte[]... args) {
    sendCommand(os, command.getRaw(), args);
  }

  private static void sendCommand(final RedisOutputStream os, final byte[] command,
      final byte[]... args) {
    try {
      os.write(ASTERISK_BYTE);
      os.writeIntCrLf(args.length + 1);
      os.write(DOLLAR_BYTE);
      os.writeIntCrLf(command.length);
      os.write(command);
      os.writeCrLf();

      for (final byte[] arg : args) {
        os.write(DOLLAR_BYTE);
        os.writeIntCrLf(arg.length);
        os.write(arg);
        os.writeCrLf();
      }
    } catch (IOException e) {
      throw new JedisConnectionException(e);
    }
  }

然后Redis这边同步的调用了Client的getBinaryBulkReply等等方法,从input流里面,获取结果,和以上类似。

先看类图的:redis.clients.util.Pool持有一个GenericObjectPool<T>的对象,实现了连接池。

再来看看ShardedJedis, 它层次如下:

public class ShardedJedis extends BinaryShardedJedis implements JedisCommands, Closeable {

public class BinaryShardedJedis extends Sharded<Jedis, JedisShardInfo> implements BinaryJedisCommands

重要的方法都封装了在基类Sharded类中,BinaryShardedJedis封装的是基于byte[]的接口,ShardedJedis则封装了String接口。

public class Sharded<R, S extends ShardInfo<R>> {
  public static final int DEFAULT_WEIGHT = 1;//默认每个redis的权重
  private TreeMap<Long, S> nodes;//红黑树纪录hash值到redisInfo的Map
  private final Hashing algo;//hash算法,默认MurMurHash
  //纪录了nodes的item的值ShardInfo<Redis>,到Redis连接对象的Map。
  private final Map<ShardInfo<R>, R> resources = new LinkedHashMap<ShardInfo<R>, R>();

ShardedJedis以及BinaryShardedJedis回调用Sharded的getShard方法,返回一个Redis对象,然后操作这个对象。

在看Sharded的主要方法:1 构造函数。2 getShard方法。实现了一致性hash算法。

  public Sharded(List<S> shards, Hashing algo, Pattern tagPattern) {
    this.algo = algo; //hash算法
    this.tagPattern = tagPattern;
    initialize(shards);
  }

  private void initialize(List<S> shards) {
    nodes = new TreeMap<Long, S>();
    for (int i = 0; i != shards.size(); ++i) {
      final S shardInfo = shards.get(i);
      //为每个shardInfo在TreeMap中创建160个hashCode到shardInfo的映射。
      //如果有name就用else里面的key做hash,没有就是上面的"SHARD-" + i + "-NODE-" + n作为键。
      if (shardInfo.getName() == null) 
      for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
        nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
      }
      else for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
        nodes.put(this.algo.hash(shardInfo.getName() + "*" + shardInfo.getWeight() + n), shardInfo);
      }
      //最后根据shardInfo创建Redis对象,并使用shardInfo作为Map的键。
      resources.put(shardInfo, shardInfo.createResource());
    }
  }
 子类就是根据这个方法得到正确的Jedis实例。
 public R getShard(byte[] key) {
    return resources.get(getShardInfo(key));
 }
 如果key是String 类型,有另外一个重载方法
 public S getShardInfo(String key) {
    return getShardInfo(SafeEncoder.encode(getKeyTag(key)));
 }
 Sharded最核心的代码,哈哈,一共5行。 利用了红黑树TreeMap。
 public S getShardInfo(byte[] key) {
    SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));//这个根据key的hash值,找到所有比它大的SortedMap,这个map的S为Redis对象。
    if (tail.isEmpty()) { //如果没有找到,就取nodes的第一个ShardInfo。
      return nodes.get(nodes.firstKey());
    }
    return tail.get(tail.firstKey()); //如果找到了tail,取tail第一个ShardInfo。
  }





© 著作权归作者所有

共有 人打赏支持
粉丝 2
博文 54
码字总数 18787
作品 0
浦东
私信 提问
jdedis的shardedjedis有什么用?

Jedis中的pool都是用commonpool来实现的,jedis的实现是通过把所有的shard信息放入shardedjedis,然后池化shardedjedis,但是这样的话只要有一个shardinfo构造的jedis有问题,那整个shardedje...

杨子国
2014/03/07
2.3K
0
Jedis 与 ShardedJedis 设计

Jedis设计 Jedis作为推荐的java语言redis客户端,其抽象封装为三部分: 对象池设计:Pool,JedisPool,GenericObjectPool,BasePoolableObjectFactory,JedisFactory 面向用户的redis操作封装...

tantexian
2016/06/07
49
0
ShardedJedisPipeline 源码分析

一、什么是pipeline?什么是ShardedJedis? 由于pipeline和ShardedJedis的介绍和源码分析在网上已经有了,本文就不再赘述,直接给出链接: pipeline的介绍: http://blog.csdn.net/freebirdl...

yangbodong22011
2017/06/18
0
0
shardedJedis无法设置DB的问题

本人一直从事java研发,java版本的redis客户端通常使用的是jedis。在之前生产开发过程中,redis主要以单机形式或者主备读写分离形式使用,并未涉及到分片等高级功能。最近,由于业务量激增,...

萧十一郎君
2016/10/21
318
0
Jedis分片连接池(分布式)

一下内容来自网络,但是很多细节没有写出来,所以我经过自己琢磨,终于找到原因了。 Redis-2.4.15目前没有提供集群的功能,Redis作者在博客中说将在3.0中实现集群机制。目前Redis实现集群的方...

Zero零_度
2016/08/29
4
0

没有更多内容

加载失败,请刷新页面

加载更多

/etc/profile和/etc/environment的区别

/etc/profile 文件 当一个用户登录Linux系统或使用 su 命令切换到另一个用户时,设置用户环境第一个读取的文件就是 /etc/profile ,此文件为系统全局变量配置文件,且仅仅在第一次登录系统时...

calmsnow
17分钟前
2
0
rabbitMQ日常管理(转)

一、网页登录方法 http://127.0.0.1:15672/ 用户名和密码默认为guest/guest 用java代码去连接rabbitmq用的端口是5672 二、rabbitMQ基本概念 RabbitMQ是一个开源的AMQP实现,服务器端用Erlan...

__HuWei
23分钟前
1
0
gitlab cicd

https://docs.gitlab.com/ee/ci/docker/using_docker_build.html

kut
24分钟前
1
0
使用Prometheus+Grafana监控

一、介绍Prometheus Prometheus(普罗米修斯)是一套开源的监控&报警&时间序列数据库的组合,起始是由SoundCloud公司开发的。随着发展,越来越多公司和组织接受采用Prometheus,社会也十分活...

xtof
24分钟前
3
0
EOS RPC API官方文档中文版【1.5版】

EOS RPC API是应用访问EOS区块链上智能合约的必备开发接口,根据所实现插件的不同,EOS RPC API被归入不同的分组: CHAIN:由chain_api_plugin实现,主要提供区块链数据的访问功能 HISTORY:...

汇智网教程
26分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部