黄东旭解析 TiDB 的核心优势
711
2023-04-11
Spark 主要提供以下四种拓展方式
API
局限
版本
Customized function or RDD
无法支持 Spark SQL
任意
DataSource API
API 变动会比较频繁
Before Spark 2.3: v1
Spark 2.3-3.0: v1+v2
After Spark 3.0: v1+新版 v2
Catalyst Extension
无法自定义 rule 的顺序
After Spark 2.2
Catalog plugin
Spark 3 之后才可使用
After Spark 3.0
当使用 RDD 编程时,我们完全可以基于 Customized function or RDD 实现外部数据源的读写,完成数据源的拓展以及各种操作。但当 Spark SQL 出现,这样的方式就失效了。
Example
Create customized function for existing RDD
class CustomFunctions(rdd:RDD[SalesRecord]) { def totalSales = rdd.map(_.itemValue).sum }Create customized RDD
class DiscountRDD(prev:RDD[SalesRecord],discountPercentage:Double) extends RDD[SalesRecord](prev){ // override compute method to calculate the discount override def compute(split: Partition, context: TaskContext): Iterator[SalesRecord] = { firstParent[SalesRecord].iterator(split, context).map(salesRecord => { val discount = salesRecord.itemValue*discountPercentage new SalesRecord(salesRecord.transactionId, salesRecord.customerId,salesRecord.itemId,discount) })} override protected def getPartitions: Array[Partition] = firstParent[SalesRecord].partitions }Define the way read and write from other datasource.
DataSource API 是 Spark 中非常活跃的一个 API,几乎每个 Spark 版本都会对其进行优化。其主要的版本对照如表
Spark version
DataSource API
Before Spark 2.3
v1
Spark 2.3 - Spark 3.0
v2
After Spark 3.0
v2 升级版
优点
It is simple,适用于大多数场景
缺点
Coupled with other APIs (RDD, DataFrame, SQLContext)
和上层 API 耦合过重,就比如 SQLContext 被废弃,那意味着 DataSource API 也得废弃
Hard to push down
下推通过组合 trait 的方式,那么就会有笛卡尔积的 trait。比如要增加 limit 能力, 就得增加 limitscan, limitprunedscan, limitfilterscan, limitprunedfilterscan 多个 trait
可下推的算子也有限,比如不支持下推 Aggregate
Write API too simple
No streaming support
// Read trait RelationProvider { def createRelation(sqlContext: SQLContext,parameters: Map[String, String],schema: StructType): BaseRelation } abstract class BaseRelation{} trait TableScan {} trait PrunedFilteredScan {} //write trait InsertableRelation {}优点
Friendly to Java for DS V2 APIs is Java interface
Does not coupled with other APIs
Easier to push down
每种 pushdown 都有自己的 interface
Streaming support
// Read public interface DataSourceV2 {} public interface ReadSupport extends DataSourceV2 { DataSourceReader createReader(DataSourceOptions options); } public interface DataSourceReader { List<InputPartition<Row>> planInputPartitions(); } public interface SupportsPushDownFulters extends DataSourceReader { Filter[] pushFilters(Filter[] filters) } public interface InputPartition<T> extends Serializable { InputPartitionReader<T> createPartitionReader(); } public interface InputPartitionReader<T> extends Closeable { boolean next() throws IOEXception; T get(); }摘自 Data Source V2 API Improvement design doc
升级版的 v2 优化了以下问题
Scan execution order is not obvious
Splitting and reading data partitions should be independent
Columnar Scan API should not be a mixin trait
Streaming API doesn’t play well with the batch API
Interface name is confusing
Spark SQL 最重要的部分就是 catalyst ,它负责 SQL 的解析分析优化等操作,其主要流程如下。
从 Spark 2.2 之后,Spark 支持拓展 catalyst。拓展点如下表
Stage
Extension
description
Parser
injectParser
负责 SQL 解析
Analyzer
injectResolutionRule
injectPostHocResolutionRule
injectCheckRule
负责逻辑执行计划生成,catalog 绑定,以及进行各种检查
Optimizer
injectOptimizerRule
负责逻辑执行计划的优化
Planner
injectPlannerStrategy
负责物理执行计划的生成
其中 Analyzer 有三个拓展点,分别的用处为
injectResolutionRule:可以在这鉴权,检查元信息
injectPostHocResolutionRule:可以在这检查 insert
injectCheckRule:check 是否还有 unresolved 的
在 Spark 3 之后,又额外提供了一些其他拓展点
e.injectColumnar
e.injectFunction
e.injectQueryStagePrepRule
Catalyst 拓展只能在 catalyst 框架下进行,具体表现为
拓展点的位置被限制
无法修改其他 rule
这其实会有一些问题:比如我们无法修改原有 rule 的行为,原有 rule 会 block 一些我们特殊需求,如特殊的类型转换等。
更好的支持多数据源
在 spark 3 之后,Spark 提供了 Catalog plugin,能够
Provide schema
DDL
Multiple catalog
TiKV 为数据源进行 Read + Write。
API: Customized RDD + Catalyst Extension + Catalog plugin
TiSpark Read 使用 Catalyst Extension ,拓展 injectPlannerStrategy拓展点。在该拓展点中,TiSpark 会截取可下推的 sub-plan 进行下推至 TiKV,并获取数据。无法下推的部分交给 Spark 完成。
TiSpark 2.5 之后,还使用了 Spark 3.0 提供的 catalog plugin ,侵入性更小。在没有 catalog plugin 时,我们需要 hacker catalog 以及更多的执行计划,还需要提供一些额外机制(dbprefix)用于判断属于哪一个 DataSource,非常不方便。
为什么不使用 DataSource API 呢?
DS API 无法精确下推
Spark 3.2 之前不支持下推 Aggration
无法根据数据类型进行是否下推的判断
DataSource API V1 + RDD API
TiSpark Write 的实现基于 DataSource API V1 拓展。TiSpark 会从 Dataframe 出发,利用 RDD API 进行数据的各种处理,最后使用 TiKV-java-client 提供的 2PC 接口保证整体事务的原子性
TiSpark 2.5 主要基于 DS V1 实现,其问题有
Write 拓展的粒度不够细
No streaming support
Can’t use v2 API in user view (如 writeto 无法使用)
Can’t apply new features of spark(很多新特性都是基于 DSV2,如delete)
No Catalog support,比如需要通过 option 传入 db 与 table 信息
No Catalyst optimize,Write 逻辑节点在 catalyst 中不会有任何优化/检查
因此 TiSpark 在 master 分支进行了 DS V1 -> DS V2 的升级,主要做的事情如下
Spark 3 之后,提供了 catalog plugin,使得 mutile catalog 成为可能。
使用 catalog plugin 可以带来以下优点
Less hacker code
原来我们需要自定义混合的 catalog ( TiDB schema + hive schema),逻辑混杂在一起。
Need not dbprefix
为了防止同名库表,原来我们需要 dbprefix 区分不同的 datasoue
Make read simple
由于原来我们需要在 catalyst 额外进行一些拓展,使用混合 catalog 去判断库表的存在性。难以维护开发
在 V1 API 中,Read 需要拓展 catalyst,将逻辑节点替换为 TiDBRelation,然后由 TiDBRelation 提供 schema(而不是 catalog)
在 V2 AP2 中,schema 由 TiDBTable 提供。更重要的是无需拓展 catalyst 并去替换为 TiDBRelation,Spark 会使用 catalog.loadTable 帮我们加载 TiDBTable。
因此,Read 可以进行如下优化
不必在 Analyzer 中改写相关逻辑节点了为 TiDBRelation 了。
不必在 Planner 下推时使用 TiDBRelation 的 schema 了,直接从 Spark 原生逻辑节点 DataSourceV2ScanRelation2(TiDBTable) 中获取即可
对开发者来说,Spark 提供了 v1,v2 API 用于拓展。
对于用户来说,Spark 也向用户暴露了两类API。如 df.write 是早期的 api,df.writeto.append 为新版 api。
Spark 会判断开发者是拓展了 v1 还是 v2
df.writeto.append:开发者只能使用 v2 API 实现。
df.write:需要向前兼容,因此 v1 与 v2 API 都可以用。Spark 主要通过实现的接口判断开发者使用的 API 是 v1 还是 v2。不满足任一以下情况的都会被判断为 v1 API
Source extends SupportsCatalogOption/TableProvider
Table extends supportsWrite
Table has batchwrite capabilities
原来 write 由 DSV1 实现,在转向 V2的过程中我们发现了一些问题
无法处理整体数据
V2 write framework can’t process global Data
和 Catalyst 的优化有冲突,如
Data convert:boolean -> long (unsafe)
Autorandom:mismatch
由于 DSV2 存在一些问题,我们的处理方式是:先进行整体 v2 的切换,在 v2 框架中,write 仍先使 v1 API 实现
DSV2 support 带来的好处
Less hacker code, easier to develop
The benefit of DSV2
Streaming support
New user API support(writeto)
Closer to catalyst
The new feature of Spark
Delete
More pushdown support
Read
随着 Spark 对下推的支持,我们可以逐步使用 DataSource API ,而不再是拓展 catalyst 的方式。
Write
能够使用 DSV2 改写write
Spark SQl
支持更多的 SQL:Insert,delete,update,mergeinto
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。