文档章节

zookeeper curator 学习及源码解析

 陆大侠
发布于 2015/12/05 22:03
字数 1207
阅读 146
收藏 0

 分两段,先分析InterProcessMutex这个类,再分析CuratorFramework。因为CuratorFramework复杂一点,是整个curator 包实现的基石。

InterProcessMutex 是一个分布式锁的实现。调试代码用的官方的2.9.1的example代码,改了一些参数,这样便于调试。

构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

client是curator的CuratorFramework, path就是lock的路径,这个同一个lock的路径必须相同。

每个InterProcessMutex 有个LockInternals对象,是实现分布式的关键。

LockInternals有个StandardLockInternalsDriver作为辅助。

InterProcessMutex 主要有

acquire(),boolean acquire(long time, TimeUnit unit), release()。

accquire方法:

1.  调用了LockInternals的attemptLock 方法,如果lockPath返回不为null,则表示获取成功。

     String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());

    这个InterProcessMutex是可重入的,方法是在线程本地纪录了一个重入的次数,每次重入加1,每次释放减1.

2.internals.attemptLock中,调用自己的私有方法internalLockLoop,无限循环,知道获取锁,或则异常。

while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
   List<String>        children = getSortedChildren(); //得到所有lock Path的子节点
   String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
   
PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases); //判断自己是否得到锁,就是按名称排序之后,自己的节点的index是否为0
   if ( predicateResults.getsTheLock() ) //如果得到锁就返回
   {
       haveTheLock = true;
   }
   else
   
{    //没有拿到锁,获得前一个节点的名称,并且下面的地方添加watcher
       String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
       synchronized(this)
       {
           try
           
{
               // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
               
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
               if ( millisToWait != null )//如果设置了时间,就等待一段时间后,删除自己的节点,并且返回false,表示没有拿到锁。返回在方法的最后,没有贴出。
               {
                   millisToWait -= (System.currentTimeMillis() - startMillis);
                   startMillis = System.currentTimeMillis();
                   if ( millisToWait <= 0 )
                   {
                       doDelete = true;    // timed out - delete our node
                       
break;
                   }

                   wait(millisToWait);
               }
               else
               
{   //没有设置时间,则一直等待。当前序节点释放的时候,watcher代码会吊用notifyAll方法,让以上的wait方法放回,再次尝试获取锁。
                   wait();
               }
           }
           catch ( KeeperException.NoNodeException e )
           {
               // it has been deleted (i.e. lock released). Try to acquire again
           
}
       }
   }
}

release方法很简单:

每次释放,线程本地纪录的次数减1,等于0的时候,就把自己的节点删除了。


下面分析 CuratorFramework接口以及实现类

这个类比较复杂,涉及重试机制,短线重连。分析这个类,首先需要对zookeeper官方的client包有所了解。

CuratorFrameworkImpl这个类是默认实现,有一下成员

private final CuratorZookeeperClient client; //封装关于zookeeper client的实现
private final ListenerContainer<CuratorListener> listeners; //监听器列表
private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
private final ThreadFactory threadFactory;
private final int maxCloseWaitMs; //关闭时候等待的ms数
private final BlockingQueue<OperationAndData<?>> backgroundOperations;//后台操作以及返回队列
private final NamespaceImpl namespace; //命名空间
private final ConnectionStateManager connectionStateManager; //连接状态管理器
private final List<AuthInfo> authInfos; //认证信息
...以下省略

CuratorFramework接口主要方法有:create(),delete(),checkExists(),inTransaction(),sync(),getChildren()以及start()和close()等等方法,调用这个方法会返回各种Builder接口的实现对象,基本都是实现了Pathable接口的。

Pathable接口有一个 forPath方法,这一连续的办法,实现了CuratorFramework的流式操作。

@Override
public void start()
{    。。。。。省略的代码
   try
   
{    
       connectionStateManager.start(); // ordering dependency - must be called before client.start()

       
final ConnectionStateListener listener = new ConnectionStateListener()
       {
           @Override
           
public void stateChanged(CuratorFramework client, ConnectionState newState)
           {
               if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
               {
                   logAsErrorConnectionErrors.set(true);
               }
           }
       };
       this.getConnectionStateListenable().addListener(listener); //注册一个state的监听器
       client.start(); //CuratorZookeeperClient的start,调用了ConnectState的start方法。开启了一个zookeeper连接
       executorService = Executors.newFixedThreadPool(2, threadFactory);  // 1 for listeners, 1 for background ops 提交到background操作的线程池。
       
executorService.submit(new Callable<Object>()
       {
           @Override
           
public Object call() throws Exception
           {
               backgroundOperationsLoop();
               return null;
           }
       });
   }
   catch ( Exception e )
   {
       handleBackgroundOperationException(null, e);
   }
}

CuratorZookeeperClient对象,有个ConnectionState成员变量,后者有个HandleHolder。这个HandleHolder持有了一个内部的interface,真正持有了一个ZooKeeper对象:

private interface Helper
{
   ZooKeeper getZooKeeper() throws Exception;
   String getConnectionString();
}

ConnectionState实现了zookeeper的Watcher接口,并把自己注册到了ZooKeeper连接中。

如果 event.getType() == Watcher.Event.EventType.None,则证明是连接状态发生了变化,如果是Expired,则调用reset方法,重新建立一个连接。旧的引用,没有做处理,可能是个隐患。

其他zookeeper的操作方法,操作会被封装成一个Callable<T> proc,提交到RetryLoop这个类static方法callWithRetry做重试及处理。

callWithRetry方法中,回调用CuratorZookeeperClient的blockUntilConnectedOrTimedOut,这个方法如果发现ConnectionState的状态不是连接上的,会注册一个tempWatcher,阻塞当前线程到连接成功或则超时。

CuratorFramework给我提供出来的功能就是如上述:

1. 连接超时的重连机制。(KeeperState.Unknown;KeeperState.NoSyncConnected,情况下也可能重连

2. 操作的时候的重试机制。(等待连接成功才会操作,失败可以有重试策略)

有个特别的流式操作,就是inTransaction(),它的实现,利用了zookeeper client支持MultiTransactionRecord对象,可以容纳多个操作,作为一个request。

zookeeper的ZAB协议

http://blog.csdn.net/chen77716/article/details/7309915


© 著作权归作者所有

上一篇: quartz mysql 集成
下一篇: JAVA并发探讨
粉丝 2
博文 54
码字总数 18787
作品 0
浦东
私信 提问
使用Curator实现的zookeeper分布式锁出现的Unimplemented for {root.path}

问题描述 Curator使用 ZooKeeper 作为分布式锁,启动时发生该异常。 Curator 客户端版本:curator-recipes-2.10.0 ZooKeeper 服务器版本:3.4.13 异常日志 以及 问题分析 UnimplementedExcep...

loubobooo
02/27
121
0
Apache Curator 3.0.0 发布,ZooKeeper 客户端简化

Apache Curator 3.0.0 发布,该版本带来的对 ZooKeeper 的新的动态配置 APIs 的兼容,更新内容如下: 子任务 [CURATOR-160] - Support Dynamic Reconfig [CURATOR-161] - Support Watcher R...

oschina
2015/10/15
2K
3
Apache Curator 2.6.0 发布

Apache Curator 2.6.0 发布,此版本包括了一些新特性(curator RPC)和 bug 修复: Bug 修复: [CURATOR-110] - LeaderLatch does not complete if it is started without a connection to Z......

oschina
2014/07/13
1K
0
ZooKeeper学习笔记六 ZooKeeper开源客户端Curator

本文学习资源来自《从Paxos到ZooKeeper分布式一致性原理与实践》 Curator Curator是Netflix公司开源的一套ZooKeeper客户端框架,作者是Jordan Zimmerman。 和ZkClient一样,Curator解决了很多...

xundh
2018/04/28
0
0
Apache Curator 2.7.1 发布,zookeeper 客户端简化

Apache Curator 2.7.1 发布了,zookeeper 的客户端调用过于复杂,Apache Curator 就是为了简化zookeeper客户端调用而生,利用它,可以更好的使用zookeeper。 改进记录: Bug [CURATOR-175] ...

oschina
2015/01/17
3.9K
1

没有更多内容

加载失败,请刷新页面

加载更多

关于AsyncTask的onPostExcute方法是否会在Activity重建过程中调用的问题

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/XG1057415595/article/details/86774575 假设下面一种情况...

shzwork
今天
6
0
object 类中有哪些方法?

getClass(): 获取运行时类的对象 equals():判断其他对象是否与此对象相等 hashcode():返回该对象的哈希码值 toString():返回该对象的字符串表示 clone(): 创建并返此对象的一个副本 wait...

happywe
今天
6
0
Docker容器实战(七) - 容器中进程视野下的文件系统

前两文中,讲了Linux容器最基础的两种技术 Namespace 作用是“隔离”,它让应用进程只能看到该Namespace内的“世界” Cgroups 作用是“限制”,它给这个“世界”围上了一圈看不见的墙 这么一...

JavaEdge
今天
8
0
文件访问和共享的方法介绍

在上一篇文章中,你了解到文件有三个不同的权限集。拥有该文件的用户有一个集合,拥有该文件的组的成员有一个集合,然后最终一个集合适用于其他所有人。在长列表(ls -l)中这些权限使用符号...

老孟的Linux私房菜
今天
7
0
面试套路题目

作者:抱紧超越小姐姐 链接:https://www.nowcoder.com/discuss/309292?type=3 来源:牛客网 面试时候的潜台词 抱紧超越小姐姐 编辑于 2019-10-15 16:14:56APP内打开赞 3 | 收藏 4 | 回复24 ...

MtrS
今天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部