实现与设计 TiFlash 表达式,优化查询性能

网友投稿 698 2023-03-30

表达式概要

表达式是承载 SQL 大部分逻辑的一个重要部分。SQL 中的表达式和编程语言中的表达式并没有差异。表达式可以大致分为函数、常量、列引用。如 select a + 1 from table中的 a + 1是一个表达式,其中 +是函数,1是常量,a是列引用。

在 SQL 中,表达式会归属在不同的算子里执行,以 select a+b from tests.t where c > 0为例,大家可以从下图看到不同的表达式归属在哪些算子里。

1.PNG

表达式在 SQL 中如何划分出来,并且归属在哪些算子里,是由一套语法规则决定的。下图是 MySQL 8.0 Parser 的语法规则简图,里面大号粗体的是算子标识符,后面跟着的小号字段是归属这个算子的表达式。

2.PNG

在了解了什么是表达式之后,我们来了解一下表达式在 TiFlash 里执行的情况。

TiDB HTAP 的体系里,TiFlash 的表达式是由 TiDB 下推给 TiFlash 执行的。首先我们来回顾下 TiDB 计算下推 TiFlash 的流程。

3.PNG

TiDB 接收 MySQL Client 发送的 SQL,经由 Parser 和 Optimizer 解析成算子,在之后将算子下推到 TiFlash 里执行。与此同时,算子内部的表达式也会跟随一起下推到 TiFlash 里执行。

如下图所示,如果某个算子带有 TiFlash 不支持的函数,就会导致一连串的算子都无法下推到 TiFlash 里执行。算子内部的表达式都可以下推执行,是算子下推的必要条件。

4.PNG

在算子和表达式下推到 TiFlash 后,TiFlash 会用向量化执行引擎来执行这些算子和表达式。在谈到 TiFlash 的向量化执行引擎之前,我们先来讲一下执行引擎的一个经典模型 Volcano Model。

5.PNG

Volcano Model 源自 1994 年的论文 Volcano-An Extensible and Parallel Query Evaluation System。Volcano Model 将 SQL 分解为若干个独立的 Operator,Operator 串联成一棵 Operator Tree。

如上图所示,从最源头的 Table Scan Operator 开始,一行一行地读取数据,Operator 处理后,传给上游 Operator。最终 Operator Tree 一行一行地输出结果。

下面是对 Operator 接口的一个简单的伪代码描述。

struct Operator{Row next(){
    Row row = child.next();
    ....    return row;
}
}

Volcano Model 提供了一个非常简洁的执行模型,在工程上也非常容易实现。但是 Volcano Model 在现代编译器和硬件下运行得慢。在后续几年了诞生了对 Volcano Model 的两个改进方案,Codegen 和 Vectorized。TiFlash 就是使用的 Vectorized 即向量化执行。

向量化执行与 Volcano Model 基本一致,区别在于 Block by Block。

6.PNG

Block 是若干 Row 组合在一起的数据块,Block 内部按 Column 保存数据。

这样设计的好处有几个:

  • 虚函数调用的开销会被减小为 1 / (Block Size)。Volcano Model 中的 Operator 和表达式通常都是用多态来实现的,在其中就会引入虚函数调用。每次 Operator Tree 和内部的表达式被调用就是一系列的虚函数调用,在数据量大的情况下,虚函数开销甚至会成为一个不可忽视的点。Block by Block 可以让 Operator Tree 和内部的表达式的一次调用处理多行而不是一行数据,从而均摊了虚函数开销。

  • Cache Friendly。把会被连续处理的数据放在一个数据块里,提高在 Cache 上的空间局部性。

TiFlash 表达式体系 ExpressionActions

除了向量化执行外,TiFlash 在表达式执行上还有一套独立的执行体系 ExpressionActions,不同于 TiDB 源自 Volcano Model 的 ExpressionTree

这两个表达式体系在逻辑语意上是一致的,仅仅在执行过程上有差别。

7.PNG

如上图所示,同样的表达式在 TiDB 和 TiFlash 里会分别在不同的表达式体系里执行。

接下来以 (a + b) * (a + b)为例子来讲述一下两个表达式体系执行的差异。

首先从 TiDB ExpressionTree讲起。(a + b) * (a + b)会被分解成一棵 Expression Tree,每一个 Expression 都是一个节点。从 Column Expression 和 Literal Expression 开始读取数据,遍历整棵 Expression Tree,最终得出表达式结果。

如下图所示,沿着图中的箭头方向,就可以从 Input 计算得出 (a + b) * (a + b)的结果 Output。

8.PNG

在图中大家可以发现 (a + b)这个子树出现了两次,也就意味着 (a + b)本身执行了两次,那么可不可以复用 (a + b)的计算结果,如下图所示连线?哪怕这样就不是一棵树了。

9.PNG

事实上是可以的,这也是 TiFlash ExpressionActions的设计初衷: 中间计算结果复用。

10.PNG

在 TiFlash ExpressionActions下,中间计算的临时结果会作为 Column 会被写到 Block 里,同时我们会通过 Column Name 获取 Block 中对应的 Column。

如上图所示,沿右图箭头方向遍历 Expression Action,即可得出 (a + b) * (a + b)的计算结果。可以看到,同样是 (a + b) * (a + b),TiFlash ExpressionActions里的 (a + b)只计算了一次。

下面是 ExpressionActions的执行分解图,从左到右。大家可以对照一下分解图,大概了解一下 ExpressionActions 的执行过程。

1.jpg

接下来我们深入一下代码,从代码层面来了解一下 ExpressionActions究竟做了些什么。

class ExpressionActions{public:    void execute(Block & block) const
    {        for (const auto & action : actions)
            action.execute(action);
    }    void add(const ExpressionAction & action);    void finalize(const Names & output_columns);private:
    std::vector<ExpressionAction> actions;
}

以上是 ExpressionActions接口的简化代码。有三个主要方法:

  • execute

    • 对 ExpressionAction::execute的包装。用于执行表达式。

  • add

    • 用于外部组装出一个 ExpressionActions

    • ExpressionActions会维护一个 Sample Block,在 Add Action 的过程中 Sample Block 会不断更新,模拟实际 Block 的变化情况。

    • 在 Add 的过程中,重复的 Action 会被跳过。重复 Action 的判断条件是,该 Action 的执行结果是否已经出现在 Sample Block 里了。

14.png

  • finalize

    • 分析 Block 内的 Column 引用情况,在合适的位置插入 Remove Action 来移除无用的 Column。

    • 在 Column 引用数归 0 的时候,就会插入对应 Column 的 Remove Action。

    • 下图是 (a + b) * (a + b)的引用数分析和 Remove Action 插入情况。

15.png

ExpressionAction是 ExpressionActions内部的执行单元。

struct ExpressionAction
{
    Type type;
    …    void execute(Block & block) const
    {        switch (type)
        {            case: APPLY_FUNCTION:
                …            case: REMOVE_COLUMN:
               …            case: ADD_COLUMN:
               …
        }
    }
}

ExpressionAction有不同的 Type 用于对 Block 进行不同的处理

  • REMOVE_COLUMN

    • 即前文所讲的 Remove Action

  • ADD_COLUMN

    • ADD_COLUMN会在 Block 中插入一个 Column。对于 Literal,插入的会是 ColumnConst,即常量 Column。

    • block.insert({added_column->cloneResized(block.rows()), result_type, result_name});

    • 用于执行 Literal Expression,在 Block 插入一个 Const Column。

16.png

  • APPLY_FUNCTION

    • APPLY_FUNCTION会读取 Block 中的 Argument Columns 传给 IFunction做计算。计算出结果 Column 后,插入到 Block 中。

    • 用于执行 Function Expression,由 ExpressionAction持有的 IFunction执行

17.png

Column Expression 没有对应的 Action Type,直接执行 Block::getPositionByName获取 Column 在 Block 里的下标。但是从 TiDB 获得的 Column Expression 计算得出的并不是 Column 在 Block 中的 Column Name,而是 Column 在 TiDB Schema 中的下标。所以 TiFlash 会维护 TiDB Schema (std::vector<String>) 来桥接 TiDB Column Index 和 TiFlash Column Name,如下图所示。

18.png

标量函数在 TiFlash 中的编译与执行

当 ExpressionAction的 Type 为 APPLY_FUNCTION时,ExpressionAction内部会持有 IFunction。 对 ExpressionAction::execute的调用都会转发给 IFunction执行。IFunction是 TiFlash 向量化标量函数的基类。

19.png

首先我们从函数在 TiFlash 中的编译讲起。tipb是 TiDB 与 TiFlash 之间的序列化协议,下图的 tipb::Expr等同是 TiDB 里的 Expression。

对于传入的一个 tipb::Expr,首先分门别类,按照 Column,Literal,Function 分别处理。

如果是 Function,首先处理 Function 的所有参数。参数本身也是 tipb::Expr,所以也会按照对应的 tipb::Expr处理流程处理。在处理完 Function 的所有参数后,就可以去构建 IFunction本身,然后塞入 ExpressionAction,返回处理结果。

20.png

TiDB 对函数的标识是 tipb::ScalarFuncSig,而 TiFlash 使用 Function Name 作为函数的标识。在 TiFlash 里,我们会用映射表的形式将 tipb::ScalarFuncSig映射成 Function Name。再根据 Function Name 找到对应的 TiFlash Function Builder。

21.png

对于窗口函数、聚合函数、distinct 聚合函数、标量函数都有各自的映射表。

const std::unordered_map<tipb::ExprType, String> window_func_map({
    {tipb::ExprType::Rank, "rank"},
        ...
});const std::unordered_map<tipb::ExprType, String> agg_func_map({
    {tipb::ExprType::Count, "count"},
        ...
});const std::unordered_map<tipb::ExprType, String> distinct_agg_func_map({
    {tipb::ExprType::Count, "countDistinct"},
    ...
});const std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map({
    {tipb::ScalarFuncSig::CastIntAsInt, "tidb_cast"},
        ...
});

拿到 Function Name 之后,我们就可以去找到 Function 对应的 Function Builder 去 Build 出 IFunction实现。TiFlash 有两类 Function Builder: Default Function Builder 和 Special Function Builder。前者是用于处理大多数 Function,后者处理某些特殊情况。

22.png

String DAGExpressionAnalyzerHelper::buildFunction(
    DAGExpressionAnalyzer * analyzer,    const tipb::Expr & expr,    const ExpressionActionsPtr & actions){    const String & func_name = getFunctionName(expr);    if (function_builder_map.count(func_name) != 0)
    {        return function_builder_map[func_name](analyzer, expr, actions);
    }    else
    {        return buildDefaultFunction(analyzer, expr, actions);
    }
}

Default Function Builder 的处理本身很简单,就是先处理所有的函数入参,然后调用 applyFunction生成对应的 IFunction实现。

String DAGExpressionAnalyzerHelper::buildDefaultFunction(
    DAGExpressionAnalyzer * analyzer,    const tipb::Expr & expr,    const ExpressionActionsPtr & actions){    const String & func_name = getFunctionName(expr);
    Names argument_names;    for (const auto & child : expr.children())
    {
        String name = analyzer->getActions(child, actions);
        argument_names.push_back(name);
    }    return analyzer->applyFunction(func_name, argument_names, actions, getCollatorFromExpr(expr));
}

对某些函数,有些特殊处理,这时就会使用到 Special Function Builder。

特殊处理的 Function Builder 会放在表里,遇到对应的函数,会转发过来。

下面是一些函数的特殊处理映射。

FunctionBuilderMap DAGExpressionAnalyzerHelper::function_builder_map(
    {...
     {"ifNull", DAGExpressionAnalyzerHelper::buildIfNullFunction},
     {"multiIf", DAGExpressionAnalyzerHelper::buildMultiIfFunction},
     ...
     {"bitAnd", DAGExpressionAnalyzerHelper::buildBitwiseFunction},
     {"bitOr", DAGExpressionAnalyzerHelper::buildBitwiseFunction},
     {"bitXor", DAGExpressionAnalyzerHelper::buildBitwiseFunction},
     {"bitNot", DAGExpressionAnalyzerHelper::buildBitwiseFunction},
     {"bitShiftLeft", DAGExpressionAnalyzerHelper::buildBitwiseFunction},
     {"bitShiftRight", DAGExpressionAnalyzerHelper::buildBitwiseFunction},
     ...
});

一个特殊的处理的情况是复用函数实现。某些函数可以由另一个函数来代理执行,比如 leftUTF8(str,len) = substrUTF8(str,1,len)。如果让 substrUTF8来代理执行 leftUTF8,那么就可以省掉 leftUTF8本身的开发实现工作。下面是 leftUTF8(str,len) = substrUTF8(str,1,len)的代理实现代码。为 substrUTF8生成第二个参数 1后,将 leftUTF8的两个参数传入 substrUTF8substrUTF8就可以代理 leftUTF8执行。

String DAGExpressionAnalyzerHelper::buildLeftUTF8Function(
    DAGExpressionAnalyzer * analyzer,    const tipb::Expr & expr,    const ExpressionActionsPtr & actions){    const String & func_name = "substringUTF8";
    Names argument_names;    // the first parameter: str
    String str = analyzer->getActions(expr.children()[0], actions, false);
    argument_names.push_back(str);    // the second parameter: const(1)
    auto const_one = constructInt64LiteralTiExpr(1);    auto col_const_one = analyzer->getActions(const_one, actions, false);
    argument_names.push_back(col_const_one);    // the third parameter: len
    String name = analyzer->getActions(expr.children()[1], actions, false);
    argument_names.push_back(name);    return analyzer->applyFunction(func_name, argument_names, actions, getCollatorFromExpr(expr));
}

接下来我们来看一下 IFunction本身的接口。

class IFunction{public:    virtual String getName() const = 0;    virtual DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const;    virtual void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) const;
};

IFunction有三个主要方法

  • getName: 返回 Function 的 Name,Name 是作为 TiFlash 向量化函数的唯一标识来使用。

  • getReturnTypeImpl: 负责做向量化函数的类型推导,因为输入参数数据类型的变化可能会导致输出数据类型变化。

  • executeImpl: 负责向量化函数的执行逻辑,这也是一个向量化函数的主体部分。一个 TiFlash 向量化函数够不够"向量化",够不够快也就看这里了。

接下来以 jsonLength(string)为例子,讲一下向量化函数的执行

  1. 从 Block 中获取 Json Column

  2. 创建同等大小的 Json Length Column

  3. Foreach Json Column,获取每一个行的 Json

  4. 调用 GetJsonLength(Json)获取 Json Length,将结果插入 Json Length Column 中的对应位置。

  5. 将 Json Length Column 插入到 Block 中,完成单次计算

23.png

void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) const override{    // 1. 获取 json column,json column 本身是 String 类型,所以 json column 用的是 ColumnString 这个 column 实现
    const ColumnPtr column = block.getByPosition(arguments[0]).column;    if (const auto * col = checkAndGetColumn<ColumnString>(column.get()))
    {        // 2.创建 json len column, json len 本身是 UInt64 类型,用的是 ColumnUInt64 这个 column 实现
        auto col_res = ColumnUInt64::create();        typename ColumnUInt64::Container & vec_col_res = col_res->getData();
        {            // 3. 遍历 json column,ColumnString 提供了一些裸操作内部 string 的方法,可以提高效率
            const auto & data = col->getChars();            const auto & offsets = col->getOffsets();            const size_t size = offsets.size();
            vec_col_res.resize(size);

            ColumnString::Offset prev_offset = 0;            for (size_t i = 0; i < size; ++i)
            {                // 4. 调用 GetJsonLength,计算出 json 的 length,插入到 json len column 中。
                std::string_view sv(reinterpret_cast<const char *>(&data[prev_offset]), offsets[i] - prev_offset - 1);
                vec_col_res[i] = GetJsonLength(sv);
                prev_offset = offsets[i];
            }
        }        // 5. 将 json len column 插入到 Block 中,完成单次计算。
        block.getByPosition(result).column = std::move(col_res);
    }    else
        throw Exception(fmt::format("Illegal column {} of argument of function {}", column->getName(), getName()), ErrorCodes::ILLEGAL_COLUMN);
}

前段时间 TiFlash 有个社区活动,号召大家来参与 TiFlash 函数下推的工作。

聚合函数在 TiFlash 中的编译与执行

聚合函数不同于标量函数的点在于输入 m 行数据输出 n 行数据,且有 m >= n。

24.png 25.png

所以聚合函数不会使用 ExpressionActions和 IFunction,而是有独立的执行体系。

聚合函数本身由 Aggregate算子来执行,Aggregate算子负责管理聚合函数的执行步骤,执行并发度等等。

Aggregate算子的执行有两部分

  • 前者 ExpressionActions用于执行标量函数的部分。

  • 后者 Aggregator用于执行聚合函数的部分

如下图所示,对于 select max(a+b)ExpressionActions执行 a+bAggregator执行 max

26.png

下面详细来说一说 Aggregator是如何执行聚合函数的。

首先 Aggregator会多线程从 Input 读取 Block,调用 executeOnBlock写入 Thread Local AggregatedDataVariantsAggregatedDataVariants内部会保存当前线程的部分聚合计算结果。

在 executeOnBlock阶段完成后,Aggregator会调用 mergeAndConvertBlocks将多个 AggregatedDataVariants合并成一个,输出最终聚合的结果给 Output 。

27.png

Aggregator::executeOnBlock会对每一个 Key 都会创建一个 Aggregate Data。 输入的 Row 会根据 Key 找到对应的 Aggregate Data, 调用 IAggregateFunction::add,更新 Agg Function 保存在 Aggregate Data 里的聚合结果。 如下图所示,Row1 和 Row4 经由 Aggregate Function 计算后,更新 Key1 保存的 Aggregate Data;Row2 经由 Aggregate Function 计算后,更新 Key2 保存的 Aggregate Data;Row3 和 Row5 经由 Aggregate Function 计算后,更新 Key3 保存的 Aggregate Data。

28.png

在 executeOnBlock完成后,每个线程都会有一个独立的 Aggregate Data。

在 mergeAndConvertBlocks阶段会把其他 Aggregate Data 都合并到 Aggregate Data0 上面

IAggregateFunction::merge用于执行把所有线程计算的部分聚合结果聚合成一个最终聚合结果。

如下图所示,Aggregate Data 在合并的时候,同一个 Key 的数据会合并到一起,与其他 Key 互不干扰。

29.png

IAggregateFunction是聚合函数的实现基类。接下来我们来看一下 IAggregateFunction本身的接口。

class IAggregateFunction{public:    String getName() const = 0;    DataTypePtr getReturnType() const = 0;    void add(AggregateDataPtr __restrict place, const IColumn ** arg_columns, size_t row_num, Arena * arena) const = 0;    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const = 0;    void insertResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena * arena) const = 0;
};
  • add用于计算聚合结果并更新到 place

  • merge用于将 rhs里的聚合结果合并到 place

  • insertResultInto用于将 place里保存的聚合结果输出成 Block

接下来我们以 Sum 这个聚合函数为例子来看一下聚合函数的执行过程。

template <typename T>using AggregateFunctionSumSimple = 
AggregateFunctionSum<T, typename NearestFieldType<T>::Type, AggregateFunctionSumData<typename NearestFieldType<T>::Type>>;template <typename T, typename TResult, typename Data>class AggregateFunctionSum final : public IAggregateFunctionDataHelper<Data, AggregateFunctionSum<T, TResult, Data>>
{public:    void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
    {        this->data(place).add(column.getData()[row_num]);
    }    void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
    {        this->data(place).merge(this->data(rhs));
    }
};template <typename T>struct AggregateFunctionSumData{
    T sum{};    template <typename U>    void add(U value) { AggregateFunctionSumAddImpl<T>::add(sum, value); }    void merge(const AggregateFunctionSumData & rhs) { AggregateFunctionSumAddImpl<T>::add(sum, rhs.sum); }
}template <typename T>struct AggregateFunctionSumAddImpl{    static void NO_SANITIZE_UNDEFINED ALWAYS_INLINE add(T & lhs, const T & rhs) { lhs += rhs; }
};
  • 实际的 Sum 实现带有很多优化,这里选择最简化的实现

  • 对于 Sum,AggregateData 的实现是 AggregateFunctionSumData,内部维护一个 T Sum保存聚合结果

  • 对于 Sum,add和 merge都是执行 += 操作。

下图是对 Sum 执行过程的一个简化描述图。

add阶段,Aggregator会从 Input 读取数据,执行 sum += input,更新 Aggregate Data 里的聚合结果。在 add完成后,会执行 sum0 += sum1和 sum0 += sum2,将聚合结果合并成一个。最终输出 sum0作为最终结果。

30.png


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:如何选择适合自己的数据库:关系型还是非关系型?
下一篇:DM 工具优化 DML 处理,分享 TiDB 实践经验
相关文章