CanalMetaManager核心逻辑解析

原创
2021/12/26 16:38
阅读数 355

简介

CanalMetaManager的核心在于管理客户端的订阅,以及控制binlog数据的消费进度。

CanalMetaManager的关系如下图所示

undefined

基本功能

CanalMetaManager封装了一系列的操作meta数据的方法,主要包括:

  1. 客户端订阅和取消订阅
  2. 按批次拉取消费数据
  3. 按批次确认消费数据
  4. 回滚消费位点

本文主要介绍他的内存实现版本(包括文件和内存的混合实现版本)

MemoryMetaManager:CanalMetaManager的内存版本实现基础

Map<String, List<ClientIdentity>>    destinations;   //存储客户端信息
Map<ClientIdentity, MemoryClientIdentityBatch> batches; //存储每一个客户端已经拉取但是还没确认的位点,可能是多个
Map<ClientIdentity, Position>            cursors;   // 存储每一个客户端已经确认过消费成功的最新位置

上文中列举的三个变量是CanalMetaManager的内存实现版本实现其核心功能的基础

下面本文将通过分析客户端和服务端的交互过程,整体详细地介绍CanalMetaManager的核心功能

1.客户端订阅和取消订阅

包含订阅和取消订阅两个主要逻辑

undefined

API

CanalConnector

void subscribe() ;//增加一个 client订阅 , 如果 client已经存在,则不做任何修改

void unsubscribe();//取消client订阅



CanalMetaManager

void subscribe(ClientIdentity clientIdentity) ;//增加一个 client订阅 , 如果 client已经存在,则不做任何修改

boolean hasSubscribe(ClientIdentity clientIdentity);//判断是否订阅

void unsubscribe(ClientIdentity clientIdentity);//取消client订阅

List<ClientIdentity> listAllSubscribeInfo(String destination);//根据指定的destination列出当前所有的clientIdentity信息

2.按批次拉取最新的未消费数据

当客户端需要消费binlog数据时,会通过API从canal服务端拉取一个批次的数据,拉取后并不直接确认,核心流程如下图所示

API

CanalConnector

// 按批次大小拉取消费数据,不回复,阻塞等待直到拉取到足够的数据  
Message getWithoutAck(int batchSize) throws CanalServerException;
//按批次大小拉取消费数据,不回复 阻塞等待直到拉取到足够的数据  有超时时间,超时时间到后,有多少返回多少
Message getWithoutAck(int batchSize, Long timeout, TimeUnit unit);

CanalMetaManager

//获取客户端当前游标 
Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException;
// 获得该client最新的一个位置
PositionRange getFirstBatch(ClientIdentity clientIdentity) throws CanalMetaManagerException;
//获得该clientId最新的一个位置
PositionRange getLastestBatch(ClientIdentity clientIdentity) throws CanalMetaManagerException;
//为 client 产生一个唯一、递增的id
Long addBatch(ClientIdentity clientIdentity, PositionRange positionRange) throws CanalMetaManagerException;
//指定batchId,插入batch数据
void addBatch(ClientIdentity clientIdentity, PositionRange positionRange, Long batchId)

3.按批次确认

按照batchId确认已经消费成功的数据,需要按照拉取的先后顺序提交确认

undefined

注意点

  1. 在客canal服务端对外提供的API的描述中写到:

确认之后,小于等于此 batchId 的 Message 都会被确认

​ 实际上,通过流程图可以发现,在服务端确认时,会有严格的顺序校验,如果当前需要确认的batchId不是最小的那一个,服务端会拒绝确认,下面是具体的源码

//确认/回滚时,移除batches中位点的判断逻辑    
public synchronized PositionRange removePositionRange(Long batchId) {
            if (batches.containsKey(batchId)) {
                Long minBatchId = Collections.min(batches.keySet());
                if (!minBatchId.equals(batchId)) {
                    // 检查一下提交的ack/rollback,必须按batchId分出去的顺序提交,否则容易出现丢数据
                    throw new CanalMetaManagerException(String.format("batchId:%d is not the firstly:%d",
                        batchId,
                        minBatchId));
                }
                return batches.remove(batchId);
            } else {
                return null;
            }
        }

API

CanalConnector

// 按 batch id 确认
void ack(long batchId);

CanalMetaManager

//获取客户端当前游标 
Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException;
//更新客户端游标
void updateCursor(ClientIdentity clientIdentity, Position position);
// 对一个batch的确认
PositionRange removeBatch(ClientIdentity clientIdentity, Long batchId);

4.回滚

当客户端消费异常时,可以对已经拉取的数据进行回滚,支持按批次回滚和全部回滚

undefined

注意点 从流程图中可以发现,当前canal在回滚时

  1. 指定batchId回滚时,只允许从最小的batrchId开始回滚(原因与上文的顺序确认逻辑相同)
  2. 无论是全部回滚还是指定batchId回滚,存储层都是全部回滚(源码中注释:未提交ack的所有状态信息重新归位,减少出错时数据全部重来的成本)
  /**
     * 回滚到未进行 {@link #ack} 的地方,下次fetch的时候,可以从最后一个没有 {@link #ack} 的地方开始拿
     */
    @Override
    public void rollback(ClientIdentity clientIdentity, Long batchId) throws CanalServerException {
      //...省略部分代码
        synchronized (canalInstance) {
            // 清除batch信息
            PositionRange<LogPosition> positionRanges = canalInstance.getMetaManager().removeBatch(clientIdentity,
                batchId);
            if (positionRanges == null) { // 说明是重复的ack/rollback
                throw new CanalServerException(String.format("rollback error, clientId:%s batchId:%d is not exist , please check",
                    clientIdentity.getClientId(),
                    batchId));
            }

            // TODO 后续rollback到指定的batchId位置,
            canalInstance.getEventStore().rollback();// rollback 当前rollback是不指定batchId的
                                                     // eventStore中的状态信息
            logger.info("rollback successfully, clientId:{} batchId:{} position:{}",
                clientIdentity.getClientId(),
                batchId,
                positionRanges);
        }
    }

API

CanalConnector

//全部回滚
void rollback() throws CanalServerException;
//按批次回滚
void rollback( Long batchId) throws CanalServerException;

CanalMetaManager

//获取客户端当前游标 
Position getCursor(ClientIdentity clientIdentity) throws CanalMetaManagerException;
// 对一个batch的删除
PositionRange removeBatch(ClientIdentity clientIdentity, Long batchId);
// 清除对应客户端的所有的batch信息
void clearAllBatchs(ClientIdentity clientIdentity) ;

常用对象

CanalMetaManager提供了很多对象用于数据的记录,下面将具体介绍这些对象

ClientIdentity

ClientIdentity主要记录了canal实例(instance)和客户端的ID,包含以下几个参数:

字段名称 含义
destination 指canal需要监控的目标数据库,针对该目标数据库(destination),canal会创建一个实例,伪装成目标数据库的从节点,接收binlog数据,因此也可以简单理解为canal实例
clientId 客户端ID,默认是1001
Filter 过滤规则,在instance.properties配置文件中,有部分针对实例的table的规则,比如可以通过规则,过滤掉不需要监听的表

PositionRange

PositionRange是客户端拉取消费数据的一个批次,保存一段binlog日志的开始位置、结束位置和可以被ack的位置

字段名称 含义
start 开始位置
ack 用于记录一个可被ack的位置,保证每次提交到cursor中的位置是一个完整事务的结束
end 结束位置
endSeq 用于精确记录ringbuffer中的位点

MemoryClientIdentityBatch

batches 实际上存储了一个客户端,所有的批次号(batchId)和已拉取但未确认的位点(PositionRange )

字段名称 含义
clientIdentity 表示一个canal实例的客户端
batches 保存着batchId和PositionRange的对应关系。
atomicMaxBatchId 记录了最大的batchId
展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部