黄东旭解析 TiDB 的核心优势
562
2019-12-17
内容来源:http://mp.weixin.qq.com/s?__biz=MzI3NDIxNTQyOQ==&mid=2247490483&idx=2&sn=e2623b9edce7ef3359a6fa6238a7e414&chksm=eb163cd9dc61b5cf98dc77a2a8b6331fd208c91fc22abbe85f59ca1f6f3153777834d0946ed9#rd
本文转载自 The Last of Us 的知乎专栏,原文链接:https://zhuanlan.zhihu.com/p/89109874
User Defined Function 支持
方案
FunctionCallGeneric:
identifier '(' ExpressionListOpt ')'
{
$$ = &ast.FuncCallExpr{FnName: model.NewCIStr($1), Args: $3.([]ast.ExprNode)}
}
builtin.go
里面的 funcs
就是存储的位置:var funcs = map[string]FunctionClass{
// common functions
ast.Coalesce: &coalesceFunctionClass{BaseFunctionClass{ast.Coalesce, 1, -1}},
ast.IsNull: &isNullFunctionClass{BaseFunctionClass{ast.IsNull, 1, 1}},
ast.Greatest: &greatestFunctionClass{BaseFunctionClass{ast.Greatest, 2, -1}},
ast.Least: &leastFunctionClass{BaseFunctionClass{ast.Least, 2, -1}},
ast.Interval: &intervalFunctionClass{BaseFunctionClass{ast.Interval, 2, -1}},
// ...
}
// functionClass is the interface for a function which may contains multiple functions.
type FunctionClass interface {
// getFunction gets a function signature by the types and the counts of given arguments.
GetFunction(ctx sessionctx.Context, args []Expression) (BuiltinFunc, error)
}
EvalString
, EvalInt
之类的 Function 具体处理方法,从代码来看这些内部函数为了逻辑清楚才多套了这么一层,比如说 trim
处理字符串的方法在 GetFunction
会被分成一个参数、两个参数、三个参数三种逻辑,然后对应三个 BuiltinFunc 结构体,完全处于逻辑复用考虑。func AddUserDefinedFunction(name string, class BuiltinFunc, minArgs int, maxArgs int) {
funcs[strings.ToLower(name)] = &UserDefinedFunctionClass{BaseFunctionClass{name, minArgs, maxArgs}}
userDefinedFuncs[name] = class
}
var userDefinedFuncs = map[string]BuiltinFunc{}
type UserDefinedFunctionClass struct {
BaseFunctionClass
}
func (c *UserDefinedFunctionClass) GetFunction(ctx sessionctx.Context, args []Expression) (BuiltinFunc, error) {
if err := c.VerifyArgs(args); err != nil {
return nil, err
}
if userDefinedFuncs[c.FuncName] == nil {
return nil, c.VerifyArgs(args)
}
return userDefinedFuncs[c.FuncName].Initial(ctx, args), nil
}
type TrimFunction struct {
BaseBuiltinFunc
}
func (b *TrimFunction) Clone() BuiltinFunc {
newSig := &TrimFunction{}
newSig.CloneFrom(&b.BaseBuiltinFunc)
return newSig
}
func (b *TrimFunction) Initial(ctx sessionctx.Context, args []Expression) BuiltinFunc {
var argTps []types.EvalType
switch len(args) {
case 1:
argTps = []types.EvalType{types.ETString}
case 2:
argTps = []types.EvalType{types.ETString, types.ETString}
case 3:
argTps = []types.EvalType{types.ETString, types.ETString, types.ETInt}
}
bf := NewBaseBuiltinFuncWithTp(ctx, args, types.ETString, argTps...)
argType := args[0].GetType()
bf.Tp.Flen = argType.Flen
SetBinFlagOrBinStr(argType, bf.Tp)
b.BaseBuiltinFunc = bf
return b
}
func (b *TrimFunction) EvalString(row chunk.Row) (d string, isNull bool, err error) {
// 具体的 Trim 逻辑
}
err = plugin.ForeachPlugin(plugin.UDF, func(p *plugin.Plugin) error {
udfPlugin := plugin.DeclareUDFManifest(p.Manifest)
meta := udfPlugin.GetUserDefinedFuncClass()
expression.AddUserDefinedFunction(meta.FuncName, meta.Func, meta.MinArgs, meta.MaxArgs)
logutil.BgLogger().Info("insert udf", zap.String("function name", meta.FuncName))
return nil
})
优缺点
Ti-Alloy Engine 替换
方案
在 UDF 的基础功能完善之上的实现就是对标 MySQL Engine、Prometheus remote Read/Write Storage、*** Foreign Data Wrappers 之类的可替换 Storage Engine 的实现方案。说起来也不是很稀奇,这种替换 Storage Engine 的方案其实也可以等同为一个 Proxy,之前开发过 ES、Prometheus 相关的 Proxy 方案本质上比较类似:
buildTableReader
, buildTableScan
等相关处理 Scan、Select 的部分加了钩子,根据 Engine 去选择 Plugin Engine 插入对应的流程:type PluginScanExecutor struct {
baseExecutor
Table *model.TableInfo
Columns []*model.ColumnInfo
Plugin *plugin.Plugin
pm *plugin.EngineManifest
meta *plugin.ExecutorMeta
}
type PluginInsertExec struct {
// ...
}
type PluginSelectionExec struct {
// ...
}
// 插入的处理方法
PluginScanXXX
相关的部分会连接一个对应的 Plugin 实现完善对应的接口:package main
import (
"bufio"
"context"
"fmt"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/util/chunk"
"io"
"os"
"strconv"
"strings"
)
type ReadExecutor struct {
pos int
}
var Files = make(map[string]*bufio.Reader)
func OnReaderOpen(ctx context.Context, meta *plugin.ExecutorMeta) error {
path := fmt.Sprintf("/tmp/%s.log", meta.Table.Name.L)
file, err := os.Open(path)
if err != nil {
return err
}
Files[meta.Table.Name.L] = bufio.NewReader(file)
return nil
}
func OnReaderNext(ctx context.Context, chk *chunk.Chunk, meta *plugin.ExecutorMeta) error {
chk.Reset()
reader := Files[meta.Table.Name.L]
line, _, err := reader.ReadLine()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
es := strings.Split(string(line), ",")
i, err := strconv.Atoi(es[0])
if err !=nil {
chk.AppendNull(0)
} else {
chk.AppendInt64(0, int64(i))
}
chk.AppendString(1, es[1])
return nil
}
var InsertFiles = make(map[string]*os.File)
func OnInsertOpen(ctx context.Context, meta *plugin.ExecutorMeta) error {
path := fmt.Sprintf("/tmp/%s.log", meta.Table.Name.L)
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
InsertFiles[meta.Table.Name.L] = f
return err
}
func OnInsertNext(ctx context.Context, rows [][]expression.Expression, meta *plugin.ExecutorMeta) error {
for _, row := range rows {
b := strings.Builder{}
for _, e := range row {
b.WriteString(e.String() + ",")
}
b.WriteString("\n")
_, err := InsertFiles[meta.Table.Name.L].WriteString(b.String())
if err != nil {
return err
}
}
return nil
}
func OnInsertClose(meta *plugin.ExecutorMeta) error {
return InsertFiles[meta.Table.Name.L].Close()
}
func OnCreateTable(tblInfo *model.TableInfo) error {
path := fmt.Sprintf("/tmp/%s.log", tblInfo.Name.L)
f, err :=os.Create(path)
if err != nil {
return err
}
return f.Close()
}
func OnDropTable(tblInfo *model.TableInfo) error {
path := fmt.Sprintf("/tmp/%s.log", tblInfo.Name.L)
return os.Remove(path)
}
比如上述的代码,可以理解为一个将存储信息存为 CSV 文件的一个最小实现,在最后的实现上还实现了一个简单的 elasticsearch proxy(Mock Skip)用来做演示 demo,实现的几个 UDF、Engine Plugin 也都做了相应的 unit-test,其实这里能做的事情也很多,各种下推支持啊什么的,可优化空间还很大。
Show Case
这里的 Show Case 主要就是演示 UDF 支持和普通的 Table 和 ES Engine Table 的相关 join、select 功能,场景是首先假定了一张替换 Engine 的 ES 表,存的是 logs 日志数据,包含 ip、status code、以及相关的连接 message,另一张表就是普通的表,存的是 blacklist 的具体黑名单的数据,包含 ip、危险等级 level、以及相关的风险信息。
然后通过取 join 找出 logs 里面的被 block 的信息,展示 ES demo 的 Selection 下推支持、UDF 实现的 ip2city 的支持以及 Store 的 Engine 支持:
之前也提到了 TableInfo 的插入字段,这部分的实现方案处理也比较精致,在打印状态里能清晰的看出某个 table 属于哪个计算引擎:
优缺点
彩蛋:Where DSL 支持
方案
上面的实现都是比较功能性的支持,但是闲的没事也想做点看起来炫酷能讲的点,这里也简单的实现了一个针对 Where DSL 的支持,这里对标的是 zombodb 其中实现的部分 Query DSL:https://github.com/zombodb/zombodb/blob/master/QUERY-DSL.md
Select <fields> from <tables> where <table> ===> 'field1: xxxx AND field2: [1 TO 1000] OR ...'
WhereDsl:
"WHERE" TableName "==>" stringLit
{
$$ = &ast.WhereDslStmt{Parser: parser,Table: $2.(*ast.TableName), DslString: $4}
}
WhereClauseOptional:
{
$$ = nil
}
| WhereClause
{
$$ = $1
}
WhereClauseDslOptional:
{
$$ = nil
}
| WhereDsl
{
$$ = $1
}
| WhereClause
{
$$ = $1
}
这里需要增加一个 WhereDSL 所需要的 AST Node:
type ParseInterface interface {
ParseSpecExpr(exprDsl string) (ExprNode, error)
}
type WhereDslStmt struct {
stmtNode
Table *TableName
Parser ParseInterface
DslString string
Where ExprNode
}
func (n *WhereDslStmt) Restore(ctx *RestoreCtx) error {
// TODO : convert dsl string to where syntax and write where
return nil
}
func (n *WhereDslStmt) Accept(v Visitor) (Node, bool) {
newNode, skipChildren := v.Enter(n)
if skipChildren {
return v.Leave(newNode)
}
n = newNode.(*WhereDslStmt)
return v.Leave(n)
}
替换到对应的 Yacc Selection 部分:
SelectStmtBasic FromDual WhereClauseDslOptional
{
st := $1.(*ast.SelectStmt)
lastField := st.Fields.Fields[len(st.Fields.Fields)-1]
if lastField.Expr != nil && lastField.AsName.O == "" {
lastEnd := yyS[yypt-1].offset-1
lastField.SetText(parser.src[lastField.Offset:lastEnd])
}
if $3 != nil {
switch where := $3.(type) {
case ast.ExprNode:
st.Where = where
case ast.StmtNode:
st.WhereDsl = where
}
}
}
SelectStmtFromTable:
SelectStmtBasic "FROM"
TableRefsClause WhereClauseDslOptional SelectStmtGroup HavingClause WindowClauseOptional
{
st := $1.(*ast.SelectStmt)
st.From = $3.(*ast.TableRefsClause)
lastField := st.Fields.Fields[len(st.Fields.Fields)-1]
if lastField.Expr != nil && lastField.AsName.O == "" {
lastEnd := parser.endOffset(&yyS[yypt-5])
lastField.SetText(parser.src[lastField.Offset:lastEnd])
}
if $4 != nil {
switch where := $4.(type) {
case ast.ExprNode:
st.Where = where
case ast.StmtNode:
st.WhereDsl = where
}
}
if $5 != nil {
st.GroupBy = $5.(*ast.GroupByClause)
}
if $6 != nil {
st.Having = $6.(*ast.HavingClause)
}
if $7 != nil {
st.WindowSpecs = ($7.([]ast.WindowSpec))
}
$$ = st
}
Where <tableName> ===> <Query DSL>
的功能,但是本身的功能还是没有。Where DSL => (Field Query List <With AND, OR>) => Where Phase Expr => Origin Plan Generator
var (
_string = StringLit(`'`)
_number = NumberLit()
_between = Seq("[", _number, "TO", _number, "]")
_label = Regex("[a-zA-Z][a-zA-Z0-9]*")
_result = Any(_string, _number, _between)
_term = Seq(_label, ":", _result)
_op = Any(Bind("and", nil), Bind("or", nil))
_andOp = Seq(&_term, Some(Seq(_op, &_term)))
DslPhaseParser = Maybe(_andOp)
)
func (n *WhereDslStmt) createWherePhase() (ExprNode, error) {
if n.Where != nil {
return n.Where, nil
}
result, _ := dsl.ExprParser(n.DslString)
exprNode, _ := n.Parser.ParseSpecExpr(result)
n.Where = exprNode
return exprNode, nil
}
ParseSpecExpr
这个方法:func (p *Parser) ParseSpecExpr(exprDsl string) (ast.ExprNode, error) {
exprParser := New()
sourceSQL := fmt.Sprintf("select %s", exprDsl)
extractNodeFunc := func(node ast.Node) ast.ExprNode {
return node.(*ast.SelectStmt).Fields.Fields[0].Expr
}
stmt, err := exprParser.ParseOneStmt(sourceSQL, "", "")
if err != nil {
return nil, err
}
return extractNodeFunc(stmt), nil
}
type SelectStmt struct {
dmlNode
resultSetNode
// SelectStmtOpts wraps around select hints and switches.
*SelectStmtOpts
// Distinct represents whether the select has distinct option.
Distinct bool
// From is the from clause of the query.
From *TableRefsClause
// Where is the where clause in select statement.
Where ExprNode
WhereDsl StmtNode
// ... other fields
}
// Restore implements Node interface.
func (n *SelectStmt) Restore(ctx *RestoreCtx) error {
// ...
if n.Where == nil && n.WhereDsl != nil {
dsl := n.WhereDsl.(*WhereDslStmt)
n.Where, _ = dsl.createWherePhase() // 重新生成了 Where Stmt 嵌入流程
}
if n.Where != nil {
ctx.WriteKeyWord(" WHERE ")
if err := n.Where.Restore(ctx); err != nil {
return errors.Annotate(err, "An error occurred while restore SelectStmt.Where")
}
}
// ...
}
// Accept implements Node Accept interface.
func (n *SelectStmt) Accept(v Visitor) (Node, bool) {
// ...
if n.Where == nil && n.WhereDsl != nil {
dslNode := n.WhereDsl.(*WhereDslStmt)
n.Where, _ = dslNode.createWherePhase()
}
if n.Where != nil {
node, ok := n.Where.Accept(v)
if !ok {
return n, false
}
n.Where = node.(ExprNode)
}
// ...
}
因此在系统流程通过 Visitor 对 Select Stmt 进行分析的 Restore 和 Accept 流程之中,可以直接通过 DSL 语法糖重新生成 Where Stmt 的方式实现了这种支持。最终实现了我们的目标,不动 DB 只在 Parser 层完成任务。
Show Case
Flags
总结
这次参加 PingCAP 举办的 TiDB Hackathon,对整个系统了解程度一般没什么预先准备,key-point 也不是特别出彩的东西,因此也难免 Hackathon 陪跑、Prize Skip (
在国内面试很多面试官都会对某些流行框架代码的实现方案有所要求,但是其实这方面固然重要但是只要你去看很少能有你看不懂的东西。从个人而言,可能会更看重快速了解一份陌生代码、陌生系统的设计逻辑,找出症结、获得经验方面的能力。所以可能推荐大家可以多摸摸 Hackathon 方面的鱼,还是有很多增长的。
虽然和某些大佬相比这次的 key-point 选题不够有亮点,也成功实现了 Hackathon 陪跑、Prize Skip。不过和朋友参加 Hackathon 本身的经验也比较有趣,提出想法,快速的实现解决问题,对我而言快速结合对 DB、Parser 的已有知识对大型系统的实现方式进行分析和修改实现更是弥足珍贵的锻炼。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。