Path Cache
Path Cache其实就是用于对zk节点的监听。不论是子节点的新增、更新或者移除的时候,Path Cache都能对子节点集合的状态和数据变化做出响应。
1. 关键 API
org.apache.curator.framework.recipes.cache.PathChildrenCache
org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent
org.apache.curator.framework.recipes.cache.PathChildrenCacheListener
org.apache.curator.framework.recipes.cache.ChildData
2. 机制说明
PathChildrenCache内部使用一个命令模式来封装各种操作:
- 操作接口:
org.apache.curator.framework.recipes.cache.Operation
- 刷新操作:
org.apache.curator.framework.recipes.cache.RefreshOperation
- 触发事件操作:
org.apache.curator.framework.recipes.cache.EventOperation
- 获取数据操作:
org.apache.curator.framework.recipes.cache.GetDataOperation
- 刷新操作:
而这些操作对象,都在构造器中接受PathChildrenCache
引用,这样可以在操作中,处理cache(回调):
EventOperation(PathChildrenCache cache, PathChildrenCacheEvent event)
{
this.cache = cache;
this.event = event;
}
GetDataOperation(PathChildrenCache cache, String fullPath)
{
this.cache = cache;
this.fullPath = PathUtils.validatePath(fullPath);
}
RefreshOperation(PathChildrenCache cache, PathChildrenCache.RefreshMode mode)
{
this.cache = cache;
this.mode = mode;
}
而这些操作,还使用了一个单线程的线程池来调用,从而形成了异步调用。
- 使用了一个
private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());
来作为线程池的任务接收队列- 使用set,避免了并发情况下重复操作
- 由于单线程,使得各种操作都是按序执行的
- 所以为了避免curator的监听机制阻塞
- 在
childrenWatcher
以及dataWatcher
中,都使用异步执行命令的方式
- 在
触发操作:
void offerOperation(final Operation operation)
{
if ( operationsQuantizer.add(operation) )
{
submitToExecutor
(
new Runnable()
{
@Override
public void run()
{
try
{
operationsQuantizer.remove(operation);
operation.invoke();
}
catch ( InterruptedException e )
{
//We expect to get interrupted during shutdown,
//so just ignore these events
if ( state.get() != State.CLOSED )
{
handleException(e);
}
Thread.currentThread().interrupt();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleException(e);
}
}
}
);
}
}
private synchronized void submitToExecutor(final Runnable command)
{
if ( state.get() == State.STARTED )
{
executorService.submit(command);
}
}
- 考虑到了各种操作的中断
- 考虑到了状态
- 统一操作的异常处理
- 投递方法
submitToExecutor
使用了synchronized
- 因为可能监听器触发,所以需要对状态进行检查
- 如先关闭,然后再被某个监听器回掉,导致不必要的操作
- 而检查动作不是原子的,所以需要同步锁
- 因为可能监听器触发,所以需要对状态进行检查
3. 用法
3.1 创建
public PathChildrenCache(CuratorFramework client,
String path,
boolean cacheData)
- cacheData
- 如果设置true,是否需要缓存数据
3.2 使用
- Cache必须在使用前调用
start()
方法- 有两个
start()
方法void start()
- 无参
void start(PathChildrenCache.StartMode mode)
- 可以通过参数,选择如何初始化
- StartMode
- NORMAL
- BUILD_INITIAL_CACHE
- POST_INITIALIZED_EVENT
- 有两个
- 使用完成后需要调用
close()
方法 - 任何时候,调用
getCurrentData()
都可以得到状态信息 - 可以添加监听器,当数据发生变动时回调执行
public void addListener(PathChildrenCacheListener listener)
4. 错误处理
PathChildrenCache实例会通过ConnectionStateListener
监听链接状态。 如果链接状态发生变化,缓存会被重置(PathChildrenCacheListener
会受到一个RESET
事件)
5. 源码分析
5.1 类定义
public class PathChildrenCache implements Closeable{}
- 实现了
java.io.Closeable
接口
5.2 成员变量
public class PathChildrenCache implements Closeable
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorFramework client;
private final String path;
private final CloseableExecutorService executorService;
private final boolean cacheData;
private final boolean dataIsCompressed;
private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap();
private final AtomicReference<Map<String, ChildData>> initialSet = new AtomicReference<Map<String, ChildData>>();
private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
private final EnsureContainers ensureContainers;
private enum State
{
LATENT,
STARTED,
CLOSED
}
private static final ChildData NULL_CHILD_DATA = new ChildData("/", null, null);
private static final boolean USE_EXISTS = Boolean.getBoolean("curator-path-children-cache-use-exists");
private volatile Watcher childrenWatcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));
}
};
private volatile Watcher dataWatcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
try
{
if ( event.getType() == Event.EventType.NodeDeleted )
{
remove(event.getPath());
}
else if ( event.getType() == Event.EventType.NodeDataChanged )
{
offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleException(e);
}
}
};
@VisibleForTesting
volatile Exchanger<Object> rebuildTestExchanger;
private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
handleStateChange(newState);
}
};
private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache");
}
- log
- client
- path
- 缓存对应的zk节点路径
- executorService
org.apache.curator.utils.CloseableExecutorService
- 线程池
- 用以执行各种操作
- 参见第2章节
- cacheData
- 是否需要缓存数据
- dataIsCompressed
- 数据是否已压缩
- listeners
org.apache.curator.framework.listen.ListenerContainer
- 监听器容器(管理多个监听器)
- 业务监听器
- 可以添加自己的监听器
- currentData
java.util.concurrent.ConcurrentMap
- 当前数据
<String, ChildData>
- 存放着多个
org.apache.curator.framework.recipes.cache.ChildData
- initialSet
AtomicReference
- 初始化集合
- 放置节点,以此来跟踪各个节点是否初始化
- 如果全部节点都初始化完成,则会触发
PathChildrenCacheEvent.Type.INITIALIZED
事件
- 如果全部节点都初始化完成,则会触发
- operationsQuantizer
- 相当于线程池的任务接收队列
- state
- 状态
AtomicReference
- ensureContainers
org.apache.curator.framework.EnsureContainers
- 可以线程安全的创建path节点
- State
- 内部枚举
- LATENT
- STARTED
- CLOSED
- 内部枚举
- NULL_CHILD_DATA
- 私有常量
- 空数据节点
- USE_EXISTS
- 私有常量
- 使用系统配置中
curator-path-children-cache-use-exists
的值
- childrenWatcher
- volatile
- 子节点变动的监听器
- dataWatcher
- volatile
- 数据变动监听器
- rebuildTestExchanger
java.util.concurrent.Exchanger
- 用于并发线程间传值
- 在重建缓存时通过此对象传递一个信号对象
- 用于测试
- connectionStateListener
- 链接状态监听器
- defaultThreadFactory
- 线程工厂
5.3 构造器
public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode)
{
this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
}
public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode, ThreadFactory threadFactory)
{
this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
}
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
{
this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
}
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory)
{
this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
}
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory)
{
this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
}
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)
{
this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService));
}
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)
{
this.client = client;
this.path = PathUtils.validatePath(path);
this.cacheData = cacheData;
this.dataIsCompressed = dataIsCompressed;
this.executorService = executorService;
ensureContainers = new EnsureContainers(client, path);
}
有7个构造器,最终都是调用最后一个。不过从中也可以看出:
- 默认使用
newSingleThreadExecutor
单线程线程池 - 默认不对数据进行压缩处理
5.4 启动
缓存在使用前需要调用start()
public enum StartMode
{
NORMAL,
BUILD_INITIAL_CACHE,
POST_INITIALIZED_EVENT
}
public void start() throws Exception
{
start(StartMode.NORMAL);
}
@Deprecated
public void start(boolean buildInitial) throws Exception
{
start(buildInitial ? StartMode.BUILD_INITIAL_CACHE : StartMode.NORMAL);
}
public void start(StartMode mode) throws Exception
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
mode = Preconditions.checkNotNull(mode, "mode cannot be null");
client.getConnectionStateListenable().addListener(connectionStateListener);
switch ( mode )
{
case NORMAL:
{
offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
break;
}
case BUILD_INITIAL_CACHE:
{
rebuild();
break;
}
case POST_INITIALIZED_EVENT:
{
initialSet.set(Maps.<String, ChildData>newConcurrentMap());
offerOperation(new RefreshOperation(this, RefreshMode.POST_INITIALIZED));
break;
}
}
}
private void processChildren(List<String> children, RefreshMode mode) throws Exception
{
Set<String> removedNodes = Sets.newHashSet(currentData.keySet());
for ( String child : children ) {
removedNodes.remove(ZKPaths.makePath(path, child));
}
for ( String fullPath : removedNodes )
{
remove(fullPath);
}
for ( String name : children )
{
String fullPath = ZKPaths.makePath(path, name);
if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) )
{
getDataAndStat(fullPath);
}
updateInitialSet(name, NULL_CHILD_DATA);
}
maybeOfferInitializedEvent(initialSet.get());
}
- 无参的
start()
- 默认使用
StartMode.NORMAL
策略
- 默认使用
- 不建议使用的
start(boolean buildInitial)
- true
- 使用
StartMode.BUILD_INITIAL_CACHE
策略
- 使用
- false
- 使用
StartMode.NORMAL
策略
- 使用
- true
- 启动时添加了链接状态的监听器
可以看到启动过程有三种策略:
NORMAL
模式- 执行刷新命令
org.apache.curator.framework.recipes.cache.RefreshOperation
(命令模式)- 使用
RefreshMode.STANDARD
刷新模式 - 调用
org.apache.curator.framework.recipes.cache.PathChildrenCache#refresh
方法- 调用
org.apache.curator.framework.EnsureContainers#ensure
创建节点 - 在节点上添加
childrenWatcher
监听器 - 回调触发
org.apache.curator.framework.recipes.cache.PathChildrenCache#processChildren
进行刷新- 清理掉已缓存在本地的数据中的其他节点
- 筛选出不是本cache的数据节点
- 从本地初始集合中清理掉
- 如果缓存节点还没用同步到本地,或者指定为
RefreshMode.FORCE_GET_DATA_AND_STAT
模式- 则立即同步节点数据与状态
- 如果不需要缓存数据,则只检查节点是否存在(只缓存节点以及状态,不含数据)
- 否则读取数据(如果需要解压则解压数据)并构建
ChildData
缓存- 新数据放入
currentData
- 根据情况触发事件(唤起监听器)
PathChildrenCacheEvent.Type.CHILD_ADDED
事件PathChildrenCacheEvent.Type.CHILD_UPDATED
事件
- 更新
initialSet
数据(将未同步的NULL_CHILD_DATA
数据替换成读取的数据)
- 新数据放入
- 则立即同步节点数据与状态
- 更新
initialSet
- 如果
initialSet
的Map不为空NORMAL
模式下,这里为空- 可以参见
POST_INITIALIZED_EVENT
模式
- 如果
- 清理掉已缓存在本地的数据中的其他节点
- 调用
- 使用
- 执行刷新命令
BUILD_INITIAL_CACHE
模式- 调用
rebuild
方法(此方法会阻塞执行)- 重新查询所有需要的数据
- 不会触发任何事件
- 安全创建path
- 清空
currentData
缓存 - 重新加载path下子节点,逐个结点重构缓存
- 逐个读取节点数据和状态
- 构建
ChildData
放入currentData
- 通过
rebuildTestExchanger
发送要给信号对象
- 调用
POST_INITIALIZED_EVENT
模式- 初始化
initialSet
- 以
RefreshMode.POST_INITIALIZED
模式刷新缓存- 参见
NORMAL
模式,但不同的是- 更新
initialSet
时- 如果
initialSet
的Map不为空POST_INITIALIZED_EVENT
模式下,这里已经初始化了Map
- 如果
initialSet
中的数据都已经同步完成(都不等于NULL_CHILD_DATA
)- 将
initialSet
制空 - 触发
PathChildrenCacheEvent.Type.INITIALIZED
事件
- 将
- 如果
- 更新
- 参见
- 初始化
5.5 节点发生变化
在启动start()
已经给path
上增加了一个监听器childrenWatcher
private volatile Watcher childrenWatcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));
}
};
- 以
RefreshMode.STANDARD
模式刷新缓存- 会对本地的缓存数据和zk节点做比较
- 只是处理新的缓存数据
- 注意操作的参数
PathChildrenCache.this
this
不同了
5.6 数据发生变化
在每次获取缓存数据时(getDataAndStat
方法),在每个缓存上添加了监听器dataWatcher
:
private volatile Watcher dataWatcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
try
{
if ( event.getType() == Event.EventType.NodeDeleted )
{
remove(event.getPath());
}
else if ( event.getType() == Event.EventType.NodeDataChanged )
{
offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));
}
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleException(e);
}
}
};
- 节点删除时
- 清理缓存
- 触发
PathChildrenCacheEvent.Type.CHILD_REMOVED
事件
- 数据发生变化时
- 执行
GetDataOperation
操作- 也就是再次执行
getDataAndStat
方法
- 也就是再次执行
- 执行
- 注意操作的参数
PathChildrenCache.this
this
不同了
5.7 获取当前数据
public List<ChildData> getCurrentData()
{
return ImmutableList.copyOf(Sets.<ChildData>newTreeSet(currentData.values()));
}
public ChildData getCurrentData(String fullPath)
{
return currentData.get(fullPath);
}
都是从本地数据中获取
5.8 清理
5.8.1 清理缓存
public void clear()
{
currentData.clear();
}
public void clearAndRefresh() throws Exception
{
currentData.clear();
offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
}
清空本地数据
如果需要则使用RefreshMode.STANDARD
模式,刷新
5.8.2 清理缓存数据
public void clearDataBytes(String fullPath)
{
clearDataBytes(fullPath, -1);
}
public boolean clearDataBytes(String fullPath, int ifVersion)
{
ChildData data = currentData.get(fullPath);
if ( data != null )
{
if ( (ifVersion < 0) || (ifVersion == data.getStat().getVersion()) )
{
if ( data.getData() != null )
{
currentData.replace(fullPath, data, new ChildData(data.getPath(), data.getStat(), null));
}
return true;
}
}
return false;
}
保留缓存信息,但是数据部分制空
5.9 链接状态变化
在启动时(start()
)中为链接添加了connectionStateListener
监听器:
private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
handleStateChange(newState);
}
};
private void handleStateChange(ConnectionState newState)
{
switch ( newState )
{
case SUSPENDED:
{
offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED, null)));
break;
}
case LOST:
{
offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_LOST, null)));
break;
}
case CONNECTED:
case RECONNECTED:
{
try
{
offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED, null)));
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleException(e);
}
break;
}
}
}
主要都是根据链接状态,触发不同的操作,以及触发业务监听器来执行。
- 由于数据都是缓存,所以在链接丢失,中断时,仅仅时触发事件,并没有将数据置为不可用
- 当链接建立
CONNECTED
,以及恢复时RECONNECTED
都触发了一次RefreshMode.FORCE_GET_DATA_AND_STAT
模式的刷新操作。
5.10 关闭
在使用完之后,需要调用close()
方法:
public void close() throws IOException
{
if ( state.compareAndSet(State.STARTED, State.CLOSED) )
{
client.getConnectionStateListenable().removeListener(connectionStateListener);
listeners.clear();
executorService.close();
client.clearWatcherReferences(childrenWatcher);
client.clearWatcherReferences(dataWatcher);
// TODO
// This seems to enable even more GC - I'm not sure why yet - it
// has something to do with Guava's cache and circular references
connectionStateListener = null;
childrenWatcher = null;
dataWatcher = null;
}
}
- 原子操作,将状态更新为
CLOSED
- 移除链接状态监听器
- 清空业务监听器
- 关闭线程池
- 清空节点监听器
- 清空数据监听器
6. 小结
PathChildrenCache虽然名字带有Cache。 但其实并不是一个完整的缓存。
应该说,它仅仅是对path下诸多节点进行统一的管理。 当这些节点发生变动,或者数据发生变化时,都可以被PathChildrenCache发现,并同步到本地Map中。以此来达到一个缓存的概念。
从API中也能发现,它只能获取数据。至于放置缓存,则需要另外实现。
- 其实也简单,直接向path下新建节点并写入数据就行
可以通过getListenable().addListener(listener);
添加自定义监听器,从而实现对缓存进行更细致的控制。
7. 示例
这里可以参考官方的示例