简介
CanalMetaManager的核心在于管理客户端的订阅,以及控制binlog数据的消费进度。
CanalMetaManager的关系如下图所示
基本功能
CanalMetaManager封装了一系列的操作meta数据的方法,主要包括:
- 客户端订阅和取消订阅
- 按批次拉取消费数据
- 按批次确认消费数据
- 回滚消费位点
本文主要介绍他的内存实现版本(包括文件和内存的混合实现版本)
MemoryMetaManager:CanalMetaManager的内存版本实现基础
Map<String, List<ClientIdentity>> destinations; //存储客户端信息
Map<ClientIdentity, MemoryClientIdentityBatch> batches; //存储每一个客户端已经拉取但是还没确认的位点,可能是多个
Map<ClientIdentity, Position> cursors; // 存储每一个客户端已经确认过消费成功的最新位置
上文中列举的三个变量是CanalMetaManager的内存实现版本实现其核心功能的基础
下面本文将通过分析客户端和服务端的交互过程,整体详细地介绍CanalMetaManager的核心功能
1.客户端订阅和取消订阅
包含订阅和取消订阅两个主要逻辑
API
CanalConnector
void subscribe() ;//增加一个 client订阅 , 如果 client已经存在,则不做任何修改
void unsubscribe();//取消client订阅
CanalMetaManager
void subscribe(ClientIdentity clientIdentity) ;//增加一个 client订阅 , 如果 client已经存在,则不做任何修改
boolean hasSubscribe(ClientIdentity clientIdentity);//判断是否订阅
void unsubscribe(ClientIdentity clientIdentity);//取消client订阅
List<ClientIdentity> listAllSubscribeInfo(String destination);//根据指定的destination列出当前所有的clientIdentity信息
2.按批次拉取最新的未消费数据
当客户端需要消费binlog数据时,会通过API从canal服务端拉取一个批次的数据,拉取后并不直接确认,核心流程如下图所示
API
CanalConnector
// 按批次大小拉取消费数据,不回复,阻塞等待直到拉取到足够的数据
Message getWithoutAck(int batchSize) throws CanalServerException;
//按批次大小拉取消费数据,不回复 阻塞等待直到拉取到足够的数据 有超时时间,超时时间到后,有多少返回多少
Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit);
CanalMetaManager
//获取客户端当前游标
Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException;
// 获得该client最新的一个位置
PositionRange getFirstBatch(ClientIdentity clientIdentity) throws CanalMetaManagerException;
//获得该clientId最新的一个位置
PositionRange getLastestBatch(ClientIdentity clientIdentity) throws CanalMetaManagerException;
//为 client 产生一个唯一、递增的id
Long addBatch(ClientIdentity clientIdentity, PositionRange positionRange) throws CanalMetaManagerException;
//指定batchId,插入batch数据
void addBatch(ClientIdentity clientIdentity, PositionRange positionRange, Long batchId)
3.按批次确认
按照batchId确认已经消费成功的数据,需要按照拉取的先后顺序提交确认
注意点
- 在客canal服务端对外提供的API的描述中写到:
确认之后,小于等于此 batchId 的 Message 都会被确认
实际上,通过流程图可以发现,在服务端确认时,会有严格的顺序校验,如果当前需要确认的batchId不是最小的那一个,服务端会拒绝确认,下面是具体的源码
//确认/回滚时,移除batches中位点的判断逻辑
public synchronized PositionRange removePositionRange(Long batchId) {
if (batches.containsKey(batchId)) {
Long minBatchId = Collections.min(batches.keySet());
if (!minBatchId.equals(batchId)) {
// 检查一下提交的ack/rollback,必须按batchId分出去的顺序提交,否则容易出现丢数据
throw new CanalMetaManagerException(String.format("batchId:%d is not the firstly:%d",
batchId,
minBatchId));
}
return batches.remove(batchId);
} else {
return null;
}
}
API
CanalConnector
// 按 batch id 确认
void ack(long batchId);
CanalMetaManager
//获取客户端当前游标
Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException;
//更新客户端游标
void updateCursor(ClientIdentity clientIdentity, Position position);
// 对一个batch的确认
PositionRange removeBatch(ClientIdentity clientIdentity, Long batchId);
4.回滚
当客户端消费异常时,可以对已经拉取的数据进行回滚,支持按批次回滚和全部回滚
注意点 从流程图中可以发现,当前canal在回滚时
- 指定batchId回滚时,只允许从最小的batrchId开始回滚(原因与上文的顺序确认逻辑相同)
- 无论是全部回滚还是指定batchId回滚,存储层都是全部回滚(源码中注释:未提交ack的所有状态信息重新归位,减少出错时数据全部重来的成本)
/**
* 回滚到未进行 {@link #ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link #ack} 的地方开始拿
*/
@Override
public void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException {
//...省略部分代码
synchronized (canalInstance) {
// 清除batch信息
PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity,
batchId);
if (positionRanges == null) { // 说明是重复的ack/rollback
throw new CanalServerException(String.format("rollback error, clientId:%s batchId:%d is not exist , please check",
clientIdentity.getClientId(),
batchId));
}
// TODO 后续rollback到指定的batchId位置,
canalInstance.getEventStore().rollback();// rollback 当前rollback是不指定batchId的
// eventStore中的状态信息
logger.info("rollback successfully, clientId:{} batchId:{} position:{}",
clientIdentity.getClientId(),
batchId,
positionRanges);
}
}
API
CanalConnector
//全部回滚
void rollback() throws CanalServerException;
//按批次回滚
void rollback( Long batchId) throws CanalServerException;
CanalMetaManager
//获取客户端当前游标
Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException;
// 对一个batch的删除
PositionRange removeBatch(ClientIdentity clientIdentity, Long batchId);
// 清除对应客户端的所有的batch信息
void clearAllBatchs(ClientIdentity clientIdentity) ;
常用对象
CanalMetaManager提供了很多对象用于数据的记录,下面将具体介绍这些对象
ClientIdentity
ClientIdentity主要记录了canal实例(instance)和客户端的ID,包含以下几个参数:
字段名称 | 含义 |
---|---|
destination | 指canal需要监控的目标数据库,针对该目标数据库(destination),canal会创建一个实例,伪装成目标数据库的从节点,接收binlog数据,因此也可以简单理解为canal实例 |
clientId | 客户端ID,默认是1001 |
Filter | 过滤规则,在instance.properties配置文件中,有部分针对实例的table的规则,比如可以通过规则,过滤掉不需要监听的表 |
PositionRange
PositionRange是客户端拉取消费数据的一个批次,保存一段binlog日志的开始位置、结束位置和可以被ack的位置
字段名称 | 含义 |
---|---|
start | 开始位置 |
ack | 用于记录一个可被ack的位置,保证每次提交到cursor中的位置是一个完整事务的结束 |
end | 结束位置 |
endSeq | 用于精确记录ringbuffer中的位点 |
MemoryClientIdentityBatch
batches 实际上存储了一个客户端,所有的批次号(batchId)和已拉取但未确认的位点(PositionRange )
字段名称 | 含义 |
---|---|
clientIdentity | 表示一个canal实例的客户端 |
batches | 保存着batchId和PositionRange 的对应关系。 |
atomicMaxBatchId | 记录了最大的batchId |