初识TiDB的增量数据同步工具TiCDC

原创
02/06 00:00
阅读数 41

作者: 数据源的TiDB学习之路 原文来源:https://tidb.net/blog/e933b8b8

TiDB提供了多款数据同步或迁移工具,之前介绍的Data Migration主要适用于MySQL系的数据源往TiDB目标库的全量/增量迁移,详见 初识TiDB Data Migration迁移工具及实践 (qq.com)。本篇幅则主要介绍TiDB的另外一款增量数据同步工具TiCDC,它是一款以TiDB为数据源往MySQL、TiDB、Kafka等目标进行增量同步的工具。

一. 什么是CDC

在介绍TiCDC之前,先初步了解一下CDC的概念。CDC的全称是“Change Data Capture”,翻译过来即“变化数据捕获”,是一种获取增量变化数据的技术。CDC一般有两类,一种是侵入式的(指CDC操作会给源系统带来性能影响),另外一种是非侵入式的(指对源系统不具有侵入性)。CDC变化数据捕获一般有以下几种方式,1~3属于侵入式,4属于非侵入式。

  1. **时间戳。**需要源系统有相应的数据列表示最后的数据变化。

  2. **快照。**使用数据库自带的机制实现,如Oracle的物化视图技术。

  3. **触发器。**源表上创建触发器会在执行DML语句时触发,触发器中的逻辑用于捕获数据变化。

  4. **日志。**可使用应用日志或系统日志,对源系统无侵入性,但需要额外的日志解析工作。

二. TiCDC的主要能力

TiCDC,就是TiDB提供的一种CDC技术,它通过拉取上游TiKV的数据变更日志,将数据解析为有序的行级变更数据输出到下游。对照上面的CDC介绍,TiCDC属于是非侵入式CDC。TiCDC主要应用的场景有:实现多个TiDB集群间的高可用容灾方案、将TiDB数据实时同步到异构系统进行服务。TiCDC的核心能力主要有:

  1. 提供TiDB->TiDB间的数据容灾复制,实现秒级RPO和分钟级RTO

  2. 提供TiDB之间的双向复制,实现多写多活的TiDB集群

  3. 提供TiDB->MySQL的低延迟增量数据同步

  4. 提供TiDB->Kafka增量数据同步

  5. 提供TiDB->存储服务(如S3、GCS、NFS)增量数据同步

  6. 支持过滤库、表、DML、DDL能力

  7. 高可用架构;动态添加、删除TiCDC能力

  8. 通过Open API管理TiCDC集群

关于数据同步一致性,TiCDC无法保证下游事务执行顺序和上游完全一致,但可以保证单行的更新与上游更新顺序一致。在使用TiCDC实现TiDB容灾恢复的时候,可以开启redo功能保证数据的最终一致性

三. TiCDC的基本架构

TiCDC是一个高可用的架构,由多个运行TiCDC实例的节点组成。每个TiCDC实例都运行了一个Capture进程,这些Capture中有一个是Owner,负责完成TiCDC集群的负载调度、DDL语句和其它管理任务。

image.png

每个Capture进程包含一个或多个Processor线程,用于同步上游TiDB中的表数据,每个Processor包含多条table pipeline,每条pipeline包含Puller(从TiKV获取DDL和行变更信息)、Sorter(将变更数据在TiCDC内按时间戳排序)、Mounter(将变更数据转换成TiCDC可以处理的格式)和Sink模块(将变更应用到下游系统)。

image.png

在高可用方面,每个TiCDC节点定期向PD汇报自己的状态,并选举其中一个TiCDC为Owner。如果Processor所在节点出现异常,集群会将表调度到其它节点。如果Owner节点异常,其他节点的Capture进程会选举新的Owner。

四. TiCDC中几个比较重要的概念

  • Capture。TiCDC节点的运行进程,负责TiKV数据变更同步,包括接收和主动拉取两种方式,并向下游同步数据。

  • Capture Owner。每个TiCDC集群同一时刻最多只有一个Owner角色,负责集群内部调度。

  • Processor。Capture内部逻辑线程,每个Processor负责处理一个或多个table。一个Capture节点可以运行多个Processor。

  • ChangeFeed。由用户启动的从上游TiDB同步到下游的任务,其中包含多个Task,Task分布在不同的Capture节点同步处理

  • ResolvedTS。在TiKV和TiCDC中都存在。ResolvedTS代表这个时间戳之前不存在未提交的事务。在监控界面有一个Changefeed resolved ts lag指标,代表上游TiDB与TiCDC节点之间的数据延迟,lag上升,说明Changefeed无法及时拉取上游产生的数据变更image.png

  • CheckpointTS。只在TiCDC中存在,表示TiCDC认为这个时间戳之前的数据已经同步到下游系统。在监控界面有一个Changefeed checkpoint ts lag指标,代表上游TiDB和下游之间的数据复制延迟image.png

<!---->

  • BarrierTS。发生DDL变更或使用TiCDC的Syncpoint时产生的时间戳。TiCDC保证只有小于BarrierTS的数据被复制到下游。

  • Changefeed。TiCDC中的单个同步任务。将一个TiDB集群中数张表的变更数据输出到指定下游。

五. TiCDC安装部署

如果你有使用TiUP安装部署集群的经验,对TiCDC的安装很容易理解。TiCDC可以当成是TiDB集群的一个组件,可以在部署集群时一起部署,也可以在现有集群的基础上通过扩容的方式安装。这里我们假设集群已经存在,通过扩容的方式安装TiCDC。

首先,我们需要准备TiCDC组件安装的配置文件,下述配置表示将要安装两个节点的TiCDC。

[tidb@tidb53 ~]$ cat addticdc.yaml
cdc_servers:
  - host: 172.20.12.52
    gc-ttl: 86400
    data_dir: "/data1/cdc-data"
  - host: 172.20.12.53
    gc-ttl: 86400
    data_dir: "/data1/cdc-data"

其次使用tiup cluster scale-out扩容命令进行在线安装TiCDC,安装完成后,集群状态的输出中便可以看到cdc组件,如下图所示:

image.png

另外我们可以使用**tiup cdc cli capture list --server=<address>**来查看cdc集群的状态,这里的<address>可以填任意一个cdc的地址,输出结果如下图所示,其中有一个cdc节点为owner角色。

image.png

六. TiCDC迁移实践

TiCDC的一个典型应用场景是实现TiDB主备集群的高可用容灾方案。在提前准备好主备两套集群以及TiCDC的情况下,我们具体应该怎么使用TiCDC来实现迁移呢?这就要用到上述提到的一个关键词Changefeed。我们可以在TiCDC中创建不同的Changefeed,在Changefeed中定义具体要同步的对象和操作。Changefeed的详情可以参考官网文档 Changefeed 概述 | PingCAP 文档中心,这里不作具体描述。

一个最基本的创建changefeed的命令需要指定cdc的地址、cdc的目标库以及changefeed名称,示例如下,

 cdc cli changefeed create --server=http://172.20.12.52:8300 --sink-uri="mysql://root:root@172.20.12.53:8100/" --changefeed-id="simple-replication-task"

以上命令默认将会对源库中的所有具有主键或唯一索引的用户表进行同步,不满足要求的表将会被忽略,因为无主键或唯一索引的表在异常情况下可能会主备库不一致。如果必须要同步,可以在changefeed的配置文件中设置force-replicate

image.png

创建完changfeed后,我们可以使用命令cdc cli changefeed list来查看changfeed任务状态,

image.png

如果想查看任务的具体详情,可以使用cdc cli changefeed query,一个完整的输出示例为:

[tidb@tidb53 ~]$ tiup cdc cli changefeed query --server=http://172.20.12.52:8300 --changefeed-id=simple-replication-task
Checking updates for component cdc... Timedout (after 2s)
Starting component cdc: /home/tidb/.tiup/components/cdc/v7.6.0/cdc cli changefeed query --server=http://172.20.12.52:8300 --changefeed-id=simple-replication-task
{
  "upstream_id": 7326866262952730232,
  "namespace": "default",
  "id": "simple-replication-task",
  "sink_uri": "mysql://root:xxxxx@172.20.12.53:8100/",
  "config": {
    "memory_quota": 1073741824,
    "case_sensitive": false,
    "force_replicate": false,
    "ignore_ineligible_table": true,
    "check_gc_safe_point": true,
    "enable_sync_point": false,
    "enable_table_monitor": false,
    "bdr_mode": false,
    "sync_point_interval": 600000000000,
    "sync_point_retention": 86400000000000,
    "filter": {
      "rules": [
        "*.*"
      ]
    },
    "mounter": {
      "worker_num": 16
    },
    "sink": {
      "delete_only_output_handle_key_columns": null,
      "content_compatible": null,
      "advance_timeout": 150,
      "send_bootstrap_interval_in_sec": 120,
      "send_bootstrap_in_msg_count": 10000,
      "debezium_disable_schema": false
    },
    "consistent": {
      "level": "none",
      "max_log_size": 64,
      "flush_interval": 2000,
      "meta_flush_interval": 200,
      "encoding_worker_num": 16,
      "flush_worker_num": 8,
      "use_file_backend": false,
      "memory_usage": {
        "memory_quota_percentage": 50,
        "event_cache_percentage": 0
      }
    },
    "scheduler": {
      "enable_table_across_nodes": false,
      "region_threshold": 100000,
      "write_key_threshold": 0
    },
    "integrity": {
      "integrity_check_level": "none",
      "corruption_handle_level": "warn"
    },
    "changefeed_error_stuck_duration": 1800000000000,
    "sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION",
    "synced_status": {
      "synced_check_interval": 300,
      "checkpoint_interval": 15
    }
  },
  "create_time": "2024-02-06 17:42:19.125",
  "start_ts": 447535523171139595,
  "resolved_ts": 447535589546262529,
  "target_ts": 0,
  "checkpoint_tso": 447535589546262529,
  "checkpoint_time": "2024-02-06 17:46:29.712",
  "state": "normal",
  "creator_version": "v8.0.0-alpha-43-g79d5246",
  "task_status": [
    {
      "capture_id": "091ec87b-5eee-456f-95d8-c3f67c52854e",
      "table_ids": [
        110,
        184,
        204,
        210,
        222,
        106,
        112,
        128,
        136,
        152
      ]
    },
    {
      "capture_id": "bf566d1d-cff3-434b-a6bf-ee8fd8f7379b",
      "table_ids": [
        132,
        134,
        142,
        167,
        202,
        206,
        130,
        138,
        140,
        215
      ]
    }
  ]
}

如果想要停止一个changefeed任务,可以使用cdc cli changefeed pause命令; 如果想要恢复一个停止的任务,可以使用cdc cli changefeed resume命令;删除任务可以使用cdc cli changefeed remove命令;还可以使用cdc cli changefeed update来更新任务。以下截图表示我们先暂停一个changefeed任务,然后再删除这个任务。

image.png

在真实的环境中,很多情况我们不会一股脑儿的同步所有表,也有时候我们不想同步所有的语句类型(比如想把drop、truncate之类的高危操作过滤)。在创建changefeed的命令中可以通过—config来指定一个事先准备好的配置文件,在配置文件中制定相应的过滤规则,实现自定义的数据同步规则。具体的配置参数可参考 TiCDC Changefeed 命令行参数和配置参数 | PingCAP 文档中心 介绍。

本文简单初步介绍TiCDC的相关概念,TiCDC还有很多细节内容由于篇幅原因本文暂不叙述,比如数据正确性的检验、双向复制功能等。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部