黄东旭解析 TiDB 的核心优势
579
2020-02-25
内容来源:http://mp.weixin.qq.com/s?__biz=MzI3NDIxNTQyOQ==&mid=2247490913&idx=1&sn=9b908db3756563db4c0d05930b0dac43&chksm=eb163a0bdc61b31d3bf4a027e398a1a5f2500a048c66b661f1f9aac9fc6c3e18a67ff7eb5262#rd
上篇文章 介绍了用于将 binlog 同步到 MySQL / TiDB 的 Loader package,本文往回退一步,介绍 Drainer 同步到不同下游的机制。
TiDB Binlog(github.com/pingcap/tidb-binlog)用于收集 TiDB 的 binlog,并准实时同步给下游。同步数据这一步重要操作由 Drainer 模块支持,它可以将 binlog 同步到 TiDB / MySQL / Kafka / File (增量备份)等下游组件。
本文将按以下几个小节介绍 Drainer 如何将收到的 binlog 同步到下游:
Drainer Sync 模块
Syncer
// Syncer sync binlog item to downstream
type Syncer interface {
// Sync the binlog item to downstream
Sync(item *Item) error
// will be close if Close normally or meet error, call Error() to check it
Successes() <-chan *Item
// Return not nil if fail to sync data to downstream or nil if closed normally
Error() <-chan error
// Close the Syncer, no more item can be added by `Sync`
Close() error
}
Checkpoint
type CheckPoint interface {
// Load loads checkpoint information.
Load() error
// Save saves checkpoint information.
Save(int64) error
// Pos gets position information.
TS() int64
// Close closes the CheckPoint and release resources, after closed other methods should not be called again.
Close() error
}
Translator
// Txn holds transaction info, an DDL or DML sequences
type Txn struct {
DMLs []*DML
DDL *DDL
// This field is used to hold arbitrary data you wish to include so it
// will be available when receiving on the Successes channel
Metadata interface{}
}
Schema
func (s *Schema) handlePreviousDDLJobIfNeed(version int64) error {
var i int
for i = 0; i < len(s.jobs); i++ {
if s.jobs[i].BinlogInfo.SchemaVersion <= version {
_, _, _, err := s.handleDDL(s.jobs[i])
if err != nil {
return errors.Annotatef(err, "handle ddl job %v failed, the schema info: %s", s.jobs[i], s)
}
} else {
break
}
}
s.jobs = s.jobs[i:]
return nil
}
恢复工具
读取 binlog
// BinlogName creates a binlog file name. The file name format is like binlog-0000000000000001-20181010101010
func BinlogName(index uint64) string {
currentTime := time.Now()
return binlogNameWithDateTime(index, currentTime)
}
// binlogNameWithDateTime creates a binlog file name.
func binlogNameWithDateTime(index uint64, datetime time.Time) string {
return fmt.Sprintf("binlog-%016d-%s", index, datetime.Format(datetimeFormat))
}
// ReadDir reads and returns all file and dir names from directory
func ReadDir(dirpath string) ([]string, error) {
dir, err := os.Open(dirpath)
if err != nil {
return nil, errors.Trace(err)
}
defer dir.Close()
names, err := dir.Readdirnames(-1)
if err != nil {
return nil, errors.Annotatef(err, "dir %s", dirpath)
}
sort.Strings(names)
return names, nil
}
func Decode(r io.Reader) (*pb.Binlog, int64, error) {
payload, length, err := binlogfile.Decode(r)
if err != nil {
return nil, 0, errors.Trace(err)
}
binlog := &pb.Binlog{}
err = binlog.Unmarshal(payload)
if err != nil {
return nil, 0, errors.Trace(err)
}
return binlog, length, nil
}
写入 TiDB
小结
TiDB Binlog(github.com/pingcap/tidb-binlog)组件用于收集 TiDB 的 binlog,并准实时同步给下游,如 TiDB、MySQL 等。该组件在功能上类似于 MySQL 的主从复制,会收集各个 TiDB 实例产生的 binlog,并按事务提交的时间排序,全局有序的将数据同步至下游。利用 TiDB Binlog 可以实现数据准实时同步到其他数据库,以及 TiDB 数据准实时的备份与恢复。我们希望通过《TiDB Binlog 源码阅读系列文章》帮助大家理解和掌握这个项目,也有助于我们和社区共同进行 TiDB Binlog 的设计、开发和测试。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。