文档章节

TiDB Binlog 源码阅读系列文章(四)Pump server 介绍

TiDB
 TiDB
发布于 08/23 10:40
字数 2266
阅读 19
收藏 0

作者: satoru

上篇文章 中,我们介绍了 TiDB 如何通过 Pump client 将 binlog 发往 Pump,本文将继续介绍 Pump server 的实现,对应的源码主要集中在 TiDB Binlog 仓库的 pump/server.go 文件中。

启动 Pump Server

Server 的启动主要由两个函数实现:NewServer(*Server).Start

NewServer 依照传入的配置项创建 Server 实例,初始化 Server 运行所必需的字段,以下简单说明部分重要字段:

  1. metrics:一个 MetricClient,用于定时向 Prometheus Pushgateway 推送 metrics。

  2. clusterID:每个 TiDB 集群都有一个 ID,连接到同一个 TiDB 集群的服务可以通过这个 ID 识别其他服务是否属于同个集群。

  3. pdCliPD Client,用于注册、发现服务,获取 Timestamp Oracle。

  4. tiStore:用于连接 TiDB storage engine,在这里主要用于查询事务相关的信息(可以通过 TiDB 中的对应 interface 描述 了解它的功能)。

  5. storage:Pump 的存储实现,从 TiDB 发过来的 binlog 就是通过它保存的,下一篇文章将会重点介绍。

Server 初始化以后,就可以用 (*Server).Start 启动服务。为了避免丢失 binlog,在开始对外提供 binlog 写入服务之前,它会将当前 Server 注册到 PD 上,确保所有运行中的 Drainer 都已经观察到新增的 Pump 节点。这一步除了启动对外的服务,还开启了一些 Pump 正常运作所必须的辅助机制,下文会有更详细的介绍。

Pump Server API

Pump Server 通过 gRPC 暴露出一些服务,这些接口定义在 tipb/pump.pb.go,包含两个接口 WriteBinlogPullBinlogs

WriteBinlog

顾名思义,这是用于写入 binlog 的接口,上篇文章中 Pump client 调用的就是这个。客户端传入的请求,是以下的格式:

type WriteBinlogReq struct {
  // The identifier of tidb-cluster, which is given at tidb startup.
  // Must specify the clusterID for each binlog to write.
  ClusterID uint64 `protobuf:"varint,1,opt,name=clusterID,proto3" json:"clusterID,omitempty"`
  // Payload bytes can be decoded back to binlog struct by the protobuf.
  Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
}

其中 Payload 是一个用 Protobuf 序列化的 binlog,WriteBinlog 的 主要流程 就是将请求中的 Payload 解析成 binlog 实例,然后调用 storage.WriteBinlog 保存下来。storage.WriteBinlog 将 binlog 持久化存储,并对 binlog 按 start TS / commit TS 进行排序,详细的实现将在下章展开讨论。

PullBinlogs

PullBinlogs 是为 Drainer 提供的接口,用于按顺序获取 binlog。这是一个 streaming 接口,客户端请求后得到一个 stream,可以从中不断读取 binlog。请求的格式如下:

type PullBinlogReq struct {
  // Specifies which clusterID of binlog to pull.
  ClusterID uint64 `protobuf:"varint,1,opt,name=clusterID,proto3" json:"clusterID,omitempty"`
  // The position from which the binlog will be sent.
  StartFrom Pos `protobuf:"bytes,2,opt,name=startFrom" json:"startFrom"`
}

// Binlogs are stored in a number of sequential files in a directory.
// The Pos describes the position of a binlog.
type Pos struct {
  // The suffix of binlog file, like .000001 .000002
  Suffix uint64 `protobuf:"varint,1,opt,name=suffix,proto3" json:"suffix,omitempty"`
  // The binlog offset in a file.
  Offset int64 `protobuf:"varint,2,opt,name=offset,proto3" json:"offset,omitempty"`
}

从名字可以看出,这个请求指定了 Drainer 要从什么时间点的 binlog 开始同步。虽然 Pos 中有 SuffixOffset 两个字段,目前只有 Offset 字段是有效的,我们把它用作一个 commit TS,表示只拉取这个时间以后的 binlog。

PullBinlogs 的 主要流程,是调用 storage.PullCommitBinlogs 得到一个可以获取序列化 binlog 的 channel,将这些 binlog 通过 stream.Send 接口逐个发送给客户端。

辅助机制

上文提到 Pump 的正常运作需要一些辅助机制,本节将逐一介绍这些机制。

fake binlog

《TiDB-Binlog 架构演进与实现原理》 一文中,对 fake binlog 机制有以下说明:

“Pump 会定时(默认三秒)向本地存储中写入一条数据为空的 binlog,在生成该 binlog 前,会向 PD 中获取一个 tso,作为该 binlog 的 start_tscommit_ts,这种 binlog 我们叫作 fake binlog。

……Drainer 通过如上所示的方式对 binlog 进行归并排序,并推进同步的位置。那么可能会存在这种情况:某个 Pump 由于一些特殊的原因一直没有收到 binlog 数据,那么 Drainer 中的归并排序就无法继续下去,正如我们用两条腿走路,其中一只腿不动就不能继续前进。我们使用 Pump 一节中提到的 fake binlog 的机制来避免这种问题,Pump 每隔指定的时间就生成一条 fake binlog,即使某些 Pump 一直没有数据写入,也可以保证归并排序正常向前推进。”

genForwardBinlog 实现了这个机制,它里面是一个定时循环,每隔一段时间(默认 3 秒,可通过 gen-binlog-interval 选项配置)检查一下是否有新的 binlog 写入,如果没有,就调用 writeFakeBinlog 写一条假的 binlog。

判断是否有新的 binlog 写入,是通过 lastWriteBinlogUnixNano 这个变量,每次有新的写入都会 将这个变量设置为当前时间

垃圾回收

由于存储容量限制,显然 Pump 不能无限制地存储收到的 binlog,因此需要有一个 GC (Garbage Collection) 机制来清理没用的 binlog 释放空间,gcBinlogFile 就负责 GC 的调度。有两个值会影响 GC 的调度:

  1. gcInterval:控制 GC 检查的周期,目前写死在代码里的设置是 1 小时

  2. gcDuration:binlog 的保存时长,每次 GC 检查就是 通过当前时间和 gcDuration 计算出 GC 时间点,在这个时间点之前的 binlog 将被 GC 在 gcBinlogFile 的循环中,用 select 监控着 3 种情况:

select {
case <-s.ctx.Done():
  log.Info("gcBinlogFile exit")
  return
case <-s.triggerGC:
  log.Info("trigger gc now")
case <-time.After(gcInterval):
}

3 个 case 分别对应:server 退出,外部触发 GC,定时检查这三种情况。其中 server 退出的情况我们直接退出循环。另外两种情况都会继续,计算 GC 时间点,交由 storage.GC 执行。

Heartbeat

心跳机制用于定时(默认两秒)向 PD 发送 Server 最新状态,由 (*pumpNode).HeartBeat 实现。状态是由 JSON 编码的 Status 实例,主要记录 NodeIDMaxCommitTS 之类的信息。

HTTP API 实现

Pump Server 通过 HTTP 方式暴露出一些 API,主要提供运维相关的接口。

路径 Handler 说明
GET /status Status 返回所有 Pump 节点的状态。
PUT /state/{nodeID}/{action} ApplyAction 支持 pause 和 close 两种 action,可以暂停和关闭 server。接到请求的 server 会确保用户指定的 nodeID 跟自己的 nodeID 相匹配,以防误操作。
GET /drainers AllDrainers 返回通过当前 PD 服务可以发现的所有 Drainer 的状态,一般用于调试时确定 Pump 是否能如预期地发现 Drainer。
GET /debug/binlog/{ts} BinlogByTS 通过指定的 timestamp 查询 binlog,如果查询结果是一条 Prewrite binlog,还会额外输出 MVCC 相关的信息。
POST /debug/gc/trigger TriggerGC 手动触发一次 GC,如果 GC 已经在运行中,请求将被忽略。

下线 Pump Server

下线一个 Pump server 的流程通常由 binlogctl 命令发起,例如:

bin/binlogctl -pd-urls=localhost:2379 -cmd offline-pump -node-id=My-Host:8240

binlogctl 先通过 nodeID 在 PD 发现的 Pump 节点中找到指定的节点,然后调用上一小节中提到的接口 PUT /state/{nodeID}/close

在 Server 端,ApplyAction 收到 close 后会将节点状态置为 Closing(Heartbeat 进程会定时将这类状态更新到 PD),然后另起一个 goroutine 调用 CloseClose 首先调用 cancel,通过 context 将关停信号发往协作的 goroutine,这些 goroutine 主要就是上文提到的辅助机制运行的 goroutine,例如在 genForwardBinlog 中设计了在 context 被 cancel 时退出:

for {
  select {
  case <-s.ctx.Done():
     log.Info("genFakeBinlog exit")
     return

ClosewaitGroup 等待这些 goroutine 全部退出。这时 Pump 仍然能正常提供 PullBinlogs 服务,但是写入功能 已经停止Close 下一行调用了 commitStatus,这时节点的状态是 Closing,对应的分支调用了 waitSafeToOffline 来确保到目前为止写入的 binlog 都已经被所有的 Drainer 读到了。waitSafeToOffline 先往 storage 中写入一条 fake binlog,由于此时写入功能已经停止,可以确定这将是这个 Pump 最后的一条 binlog。之后就是在循环中定时检查所有 Drainer 已经读到的 Binlog 时间信息,直到这个时间已经大于 fake binlog 的 CommitTS

waitSafeToOffline 等待结束后,就可以关停 gRPC 服务,释放其他资源。

小结

本文介绍了 Pump server 的启动、gRPC API 实现、辅助机制的设计以及下线服务的流程,希望能帮助大家在阅读源码时有一个更清晰的思路。在上面的介绍中,我们多次提到 storage 这个实体,用来存储和查询 binlog 的逻辑主要封装在这个模块内,这部分内容将在下篇文章为大家作详细介绍。

原文阅读https://pingcap.com/blog-cn/tidb-binlog-source-code-reading-4/

© 著作权归作者所有

TiDB
粉丝 177
博文 237
码字总数 626298
作品 4
海淀
私信 提问
TiDB Binlog 源码阅读系列文章(一)序

作者:黄佳豪 TiDB Binlog 组件用于收集 TiDB 的 binlog,并准实时同步给下游,如 TiDB、MySQL 等。该组件在功能上类似于 MySQL 的主从复制,会收集各个 TiDB 实例产生的 binlog,并按事务提...

TiDB
06/18
41
0
TiDB Binlog 源码阅读系列文章(二)初识 TiDB Binlog 源码

作者:satoru TiDB Binlog 架构简介 TiDB Binlog 主要由 Pump 和 Drainer 两部分组成,其中 Pump 负责存储 TiDB 产生的 binlog 并向 Drainer 提供按时间戳查询和读取 binlog 的服务,Drainer...

TiDB
07/05
22
0
TiDB 源码阅读系列文章(二十四)TiDB Binlog 源码解析

作者:姚维 TiDB Binlog Overview 这篇文章不是讲 TiDB Binlog 组件的源码,而是讲 TiDB 在执行 DML/DDL 语句过程中,如何将 Binlog 数据 发送给 TiDB Binlog 集群的 Pump 组件。目前 TiDB 在...

TiDB
01/16
49
0
TiDB Executive Summary

一、重要文档 ● 【TiDB 中文文档】 https://pingcap.com/docs-cn ● 【FAQ】 https://pingcap.com/doc-FAQ-zh ● 【OPS】https://www.tidb.cc 二、TiDB 的技术原理 ● 【TiDB 技术内幕】 ○...

易野
2018/10/27
75
0
TiDB Ecosystem Tools 原理解读(一):TiDB-Binlog 架构演进与实现原理

作者:王相 简介 TiDB-Binlog 组件用于收集 TiDB 的 binlog,并提供实时备份和同步功能。该组件在功能上类似于 MySQL 的主从复制,MySQL 的主从复制依赖于记录的 binlog 文件,TiDB-Binlog 组...

TiDB
2018/12/10
28
0

没有更多内容

加载失败,请刷新页面

加载更多

小知识:讲述Linux命令别名与资源文件的区别

别名 别名是命令的快捷方式。为那些需要经常执行,但需要很长时间输入的长命令创建快捷方式很有用。语法是: alias ppp='ping www.baidu.com' 它们并不总是用来缩短长命令。重要的是,你将它...

老孟的Linux私房菜
53分钟前
4
0
《JAVA核心知识》学习笔记(6. Spring 原理)-5

它是一个全面的、企业应用开发一站式的解决方案,贯穿表现层、业务层、持久层。但是 Spring 仍然可以和其他的框架无缝整合。 6.1.1. Spring 特点 6.1.1.1. 轻量级 6.1.1.2. 控制反转 6.1.1....

Shingfi
55分钟前
5
0
Excel导入数据库数据+Excel导入网页数据【实时追踪】

1.Excel导入数据库数据:数据选项卡------>导入数据 2.Excel导入网页数据【实时追踪】:

东方墨天
今天
5
1
正则表达式如何匹配一个单词存在一次或零次并且不占捕获组位置

正则表达式如何匹配一个单词存在一次或零次并且不占捕获组位置 今天要用正则表达式实现匹配一个词出现一次或者不出现的情况,但是又不仅仅是这么简单的需求。先详细说下我这种情况吧,也许有...

Airship
今天
6
0
第八讲:asp.net C# web 读取文件

本讲主要讲解如何在asp.net页面上传文件。 首先,前台页面: 其次,后台页面: 结果: 1、前台效果: 2、后台结果:

刘日辉
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部