完结篇 | TiDB Binlog 源码阅读系列文章 (九)同步数据到下游

网友投稿 579 2020-02-25

内容来源:http://mp.weixin.qq.com/s?__biz=MzI3NDIxNTQyOQ==&mid=2247490913&idx=1&sn=9b908db3756563db4c0d05930b0dac43&chksm=eb163a0bdc61b31d3bf4a027e398a1a5f2500a048c66b661f1f9aac9fc6c3e18a67ff7eb5262#rd



上篇文章 介绍了用于将 binlog 同步到 MySQL / TiDB 的 Loader package,本文往回退一步,介绍 Drainer 同步到不同下游的机制。


TiDB Binlog(github.com/pingcap/tidb-binlog)用于收集 TiDB 的 binlog,并准实时同步给下游。同步数据这一步重要操作由 Drainer 模块支持,它可以将 binlog 同步到 TiDB / MySQL / Kafka / File (增量备份)等下游组件。

  • 对于 TiDB 和 MySQL 两种类型的下游组件,Drainer 会从 binlog 中还原出对应的 SQL 操作在下游直接执行;
  • 对于 Kafka 和 File(增量备份)两种类型的下游组件,输出约定编码格式的 binlog。用户可以定制后续各种处理流程,如更新搜索引擎索引、清除缓存、增量备份等。TiDB Binlog 自带工具 Reparo 实现了将增量备份数据(下游类型为 File(增量备份))同步到 TiDB / MySQL 的功能。

本文将按以下几个小节介绍 Drainer 如何将收到的 binlog 同步到下游:

1. Drainer Sync 模块:Drainer 通过 Sync 模块调度整个同步过程,所有的下游相关的同步逻辑统一封装成了 Syncer 接口。
2. 恢复工具 Reparo(读音:reh-PAH-roh):从下游保存的 File(增量备份)中读取 binlog 同步到 TiDB / MySQL。

Drainer Sync 模块



Syncer

同步机制的核心是 Syncer 接口,定义如下:
// Syncer sync binlog item to downstream
type Syncer interface {
// Sync the binlog item to downstream
Sync(item *Item) error
// will be close if Close normally or meet error, call Error() to check it
Successes() <-chan *Item
// Return not nil if fail to sync data to downstream or nil if closed normally
Error() <-chan error
// Close the Syncer, no more item can be added by `Sync`
Close() error
}
其中 Sync 方法表示异步地向下游同步一个 binlog,对应的参数类型是 *Item,这是一个封装了 binlog 的结构体;Successes 方法返回一个 channel,从中可以读取已经成功同步到下游的 Item;Error 方法返回一个 channel,当 Syncer 同步过程出错中断时,会往这个 channel 发送遇到的错误;Close 用于关掉 Syncer,释放资源。
支持的每个下游类型在 drainer/sync 目录下都有一个对应的 Syncer 实现,例如 MySQL 对应的是 mysql.go 里的 MySQLSyncer,Kafka 对应的是 kafka.go 里的 KafkaSyncerDrainer 启动时,会根据配置文件中指定的下游,找到对应的 Syncer 实现,然后就可以用统一的接口管理整个同步过程了。


Checkpoint

同步进程可能因为各种原因退出,重启后要恢复同步就需要知道上次同步的进度。在 Drainer 里记录同步进度的功能抽象成 Checkpoint 接口,其定义如下:
type CheckPoint interface {
// Load loads checkpoint information.
Load() error

// Save saves checkpoint information.
Save(int64) error

// Pos gets position information.
TS() int64

// Close closes the CheckPoint and release resources, after closed other methods should not be called again.
Close() error
}
从以上定义中可以看到,Save 的参数和 TS 的返回结果都是 int64 类型,因为同步的进度是以 TiDB 中单调递增的 commit timestamp 来记录的,它的类型就是 int64。
Drainer 支持不同类型的 Checkpoint 实现,例如  mysql.go 里的 MySQLCheckpoint,默认将 commit timestamp 写到 tidb_binlog 库下的 checkpoint 表。Drainer 会根据下游类型自动选择不同的 Checkpoint 实现,例如 TiDB / MySQL 的下游就会使用 MySQLCheckPoint,File(增量备份) 则使用 PbCheckpoint
Syncer 小节,我们看到 SyncerSuccesses 方法提供了一个 channel 用来接收已经处理完毕的 binlog,收到 binlog 后,我们用 Checkpoint 的 Save 方法保存 binlog 的 commit timestamp 就可以记下同步进度,细节可查看源码中的 handleSuccess 方法。


Translator

Syncer 在收到 binlog 后需要将里面记录的变更转换成适合下游 Syncer 类型的格式,这部分实现在 drainer/translator 包。
以下游是 MySQL / TiDB 的情况为例。MySQLSyncer.Sync 会先调用 TiBinlogToTxn
将 binlog 转换成 loader.Txn 以便接入下层的 loader 模块 (loader 接收一个个 loader.Txn 结构并还原成对应的 SQL 批量写入 MySQL / TiDB)。
loader.Txn 定义如下:
// Txn holds transaction info, an DDL or DML sequences
type Txn struct {
DMLs []*DML
DDL *DDL

// This field is used to hold arbitrary data you wish to include so it
// will be available when receiving on the Successes channel
Metadata interface{}
}
Txn 主要有两类:DDL 和 DML。Metadata 目前放的就是传给 Sync 的 *Item 对象。DDL 的情况比较简单,因为 binlog 中已经直接包含了我们要用到的 DDL Query。DML 则需要遍历 binlog 中的一个个行变更,根据它的类型 insert / update / delete 还原成相应的 loader.DML


Schema

上个小节中,我们提到了对行变更数据的解析,在 binlog 中编码的行变更是没有列信息的,我们需要查到对应版本的列信息才能还原出 SQL 语义。Schema 就是解决这个问题的模块。
在 Drainer 启动时,会调用 loadHistoryDDLJobs 从 TiKV 处查询截至当前时间所有已完成的 DDL Job 记录,按 SchemaVersion 升序排序(可以粗略认为这是一个单调递增地赋给每个 DDL 任务的版本号)。这些记录在 Syncer 中会用于创建一个 Schema 对象。在运行过程中,Drainer 每遇到一条 DDL 也会 添加到 Schema 中
binlog 中带有一个 SchemaVersion 信息,记录这条 binlog 生成的时刻 Schema 版本。在同步 Binlog 前,我们会先用这个 SchemaVersion 信息调用 Schema 的一个方法 handlePreviousDDLJobIfNeed上一段中我们看到 Schema 从何处收集到有序的 DDL Job 记录,这个方法则是按顺序应用 SchemaVersion 小于等于指定版本的 DDL Job,在 Schema 中维护每个表对应版本的最新结构信息,去掉一些错误代码后实现大致如下:
func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {
var i int
for i = 0; i < len(s.jobs); i++ {
if s.jobs[i].BinlogInfo.SchemaVersion <= version {
_, _, _, err := s.handleDDL(s.jobs[i])
if err != nil {
return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", s.jobs[i], s)
}
} else {
break
}
}

s.jobs = s.jobs[i:]

return nil
}
对于每个符合条件的 Job,由 handleDDL 方法将其表结构 TableInfo 等信息更新到 Schema 中,其他模块就可以查询到表格当前最新的信息。

恢复工具


我们知道 Drainer 除了可以将 binlog 直接还原到下游数据库以外,还支持同步到其他外部存储系统块,所以我们也提供了相应的工具来处理存储下来的文件,Reparo 是其中之一,用于读取存储在文件系统中的 binlog 文件,写入 TiDB 中。本节简单介绍下 Reparo 的用途与实现,读者可以作为示例了解如何处理同步到文件系统的 binlog 增量备份。
Reparo 可以读取同步到文件系统上的 binlog 增量备份并同步到 TiDB。


读取 binlog

当下游设置成 File(增量备份) 时,Drainer 会将 Protobuf 编码的 binlog 保存到指定目录,每写满 512 MB 新建一个文件。每个文件有个编号,从 0 开始依次类推。文件名格式定义如下:
// BinlogName creates a binlog file name. The file name format is like binlog-0000000000000001-20181010101010
func BinlogName(index uint64) string {
currentTime := time.Now()
return binlogNameWithDateTime(index, currentTime)
}

// binlogNameWithDateTime creates a binlog file name.
func binlogNameWithDateTime(index uint64, datetime time.Time) string {
return fmt.Sprintf("binlog-%016d-%s", index, datetime.Format(datetimeFormat))
}
文件的前缀都是 “binlog-”,后面跟一个 16 位右对齐的编号和一个时间戳。将目录里的文件按字母顺序排序就可以得到按编号排序的 binlog 文件名。从指定目录获取文件列表的实现如下:
// ReadDir reads and returns all file and dir names from directory
func ReadDir(dirpath string) ([]string, error) {
dir, err := os.Open(dirpath)
if err != nil {
return nil, errors.Trace(err)
}
defer dir.Close()

names, err := dir.Readdirnames(-1)
if err != nil {
return nil, errors.Annotatef(err, "dir %s", dirpath)
}

sort.Strings(names)

return names, nil
}
这个函数简单地获取目录里全部文件名,排序后返回。在上层还做了一些过滤来去掉临时文件等。得到文件列表后,Reparo 会用标准库的 bufio.NewReader 逐个打开文件,然后用 Decode 函数读出其中的一条条 binlog:
func Decode(r io.Reader) (*pb.Binlog, int64, error) {
payload, length, err := binlogfile.Decode(r)
if err != nil {
return nil, 0, errors.Trace(err)
}

binlog := &pb.Binlog{}
err = binlog.Unmarshal(payload)
if err != nil {
return nil, 0, errors.Trace(err)
}
return binlog, length, nil
}
这里先调用了 binlogfile.Decode 从文件中解析出对应 Protobuf 编码的一段二进制数据然后解码出 binlog。


写入 TiDB

得到 binlog 后就可以准备写入 TiDB。Reparo 这部分实现像一个简化版的 DrainerSync 模块,同样有一个 Syncer 接口以及几个具体实现(除了 mysqlSyncer 还有用于调试的 printSyncermemSyncer),所以就不再介绍。值得一提的是,这里也跟前面很多 MySQL / TiDB 同步相关的模块一样使用了 loader 模块。

小结


本文介绍了 Drainer 是如何实现数据同步的以及 Reparo 如何从文件系统中恢复增量备份数据到 MySQL / TiDB。在 Drainer 中,Syncer 封装了同步到各个下游模块的具体细节,Checkpoint 记录同步进度,Translator 从 binlog 中还原出具体的变更,Schema 在内存中维护每个表对应的表结构定义。


TiDB Binlog 源码阅读系列在此就全部完结了 相信大家通过本系列文章更全面地理解了 TiDB Binlog 的原理和实现细节。我们将继续打磨优化,欢迎大家给我们反馈使用过程中遇到的问题或建议;如果社区小伙伴们想参与 TiDB Binlog 的设计、开发和测试,也欢迎与我们联系 info@pingcap.com,或者在 Repo 中 提 issue 讨论。


💡 文中划线部分均有跳转,请点击【阅读原文】查看原版

 TiDB Binlog 源码阅读系列文章 
(一)序
(二)初识 TiDB Binlog 源码
(三)Pump client 介绍
(四)Pump server 介绍
(五)Pump Storage 介绍(上)
(六)Pump Storage 介绍(下)

(七)Drainer server 介绍

 (八)Loader Package 介绍

TiDB Binlog(github.com/pingcap/tidb-binlog)组件用于收集 TiDB 的 binlog,并准实时同步给下游,如 TiDB、MySQL 等。该组件在功能上类似于 MySQL 的主从复制,会收集各个 TiDB 实例产生的 binlog,并按事务提交的时间排序,全局有序的将数据同步至下游。利用 TiDB Binlog 可以实现数据准实时同步到其他数据库,以及 TiDB 数据准实时的备份与恢复。我们希望通过《TiDB Binlog 源码阅读系列文章》帮助大家理解和掌握这个项目,也有助于我们和社区共同进行 TiDB Binlog 的设计、开发和测试。



版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:TiDB Architecture Team:挑战数据库的本质难题 | PingCAP 招聘季
下一篇:如何做到 10T 集群数据安全备份、1GB/s 快速恢复?
相关文章