TiSpark 如何扩展 Spark

网友投稿 743 2023-04-11

Spark API

Spark 主要提供以下四种拓展方式

TiSpark 如何扩展 Spark

API

局限

版本

Customized function or RDD

无法支持 Spark SQL

任意

DataSource API

API 变动会比较频繁

Before Spark 2.3: v1

Spark 2.3-3.0: v1+v2

After Spark 3.0: v1+新版 v2

Catalyst Extension

无法自定义 rule 的顺序

After Spark 2.2

Catalog plugin

Spark 3 之后才可使用

After Spark 3.0

Customized function or RDD

当使用 RDD 编程时,我们完全可以基于 Customized function or RDD 实现外部数据源的读写,完成数据源的拓展以及各种操作。但当 Spark SQL 出现,这样的方式就失效了。

Example

Create customized function for existing RDD

class CustomFunctions(rdd:RDD[SalesRecord]) { def totalSales = rdd.map(_.itemValue).sum }

Create customized RDD

class DiscountRDD(prev:RDD[SalesRecord],discountPercentage:Double) extends RDD[SalesRecord](prev){ // override compute method to calculate the discount override def compute(split: Partition, context: TaskContext): Iterator[SalesRecord] = { firstParent[SalesRecord].iterator(split, context).map(salesRecord => { val discount = salesRecord.itemValue*discountPercentage new SalesRecord(salesRecord.transactionId, salesRecord.customerId,salesRecord.itemId,discount) })} override protected def getPartitions: Array[Partition] = firstParent[SalesRecord].partitions }

DataSource API

Define the way read and write from other datasource.

DataSource API 是 Spark 中非常活跃的一个 API,几乎每个 Spark 版本都会对其进行优化。其主要的版本对照如表

Spark version

DataSource API

Before Spark 2.3

v1

Spark 2.3 - Spark 3.0

v2

After Spark 3.0

v2 升级版

Before Spark 2.3:DataSource API v1

优点

It is simple,适用于大多数场景

缺点

Coupled with other APIs (RDD, DataFrame, SQLContext)

和上层 API 耦合过重,就比如 SQLContext 被废弃,那意味着 DataSource API 也得废弃

Hard to push down

下推通过组合 trait 的方式,那么就会有笛卡尔积的 trait。比如要增加 limit 能力, 就得增加 limitscan, limitprunedscan, limitfilterscan, limitprunedfilterscan 多个 trait

可下推的算子也有限,比如不支持下推 Aggregate

Write API too simple

No streaming support

// Read trait RelationProvider { def createRelation(sqlContext: SQLContext,parameters: Map[String, String],schema: StructType): BaseRelation } abstract class BaseRelation{} trait TableScan {} trait PrunedFilteredScan {} //write trait InsertableRelation {}

Spark 2.3 - Spark 3.0: DataSource API v2

优点

Friendly to Java for DS V2 APIs is Java interface

Does not coupled with other APIs

Easier to push down

每种 pushdown 都有自己的 interface

Streaming support

// Read public interface DataSourceV2 {} public interface ReadSupport extends DataSourceV2 { DataSourceReader createReader(DataSourceOptions options); } public interface DataSourceReader { List<InputPartition<Row>> planInputPartitions(); } public interface SupportsPushDownFulters extends DataSourceReader { Filter[] pushFilters(Filter[] filters) } public interface InputPartition<T> extends Serializable { InputPartitionReader<T> createPartitionReader(); } public interface InputPartitionReader<T> extends Closeable { boolean next() throws IOEXception; T get(); }

After Spark 3.0 datasource api v2

摘自 Data Source V2 API Improvement design doc

升级版的 v2 优化了以下问题

Scan execution order is not obvious

Splitting and reading data partitions should be independent

Columnar Scan API should not be a mixin trait

Streaming API doesn’t play well with the batch API

Interface name is confusing

Catalyst extension

Spark SQL 最重要的部分就是 catalyst ,它负责 SQL 的解析分析优化等操作,其主要流程如下。

从 Spark 2.2 之后,Spark 支持拓展 catalyst。拓展点如下表

Stage

Extension

description

Parser

injectParser

负责 SQL 解析

Analyzer

injectResolutionRule

injectPostHocResolutionRule

injectCheckRule

负责逻辑执行计划生成,catalog 绑定,以及进行各种检查

Optimizer

injectOptimizerRule

负责逻辑执行计划的优化

Planner

injectPlannerStrategy

负责物理执行计划的生成

其中 Analyzer 有三个拓展点,分别的用处为

injectResolutionRule:可以在这鉴权,检查元信息

injectPostHocResolutionRule:可以在这检查 insert

injectCheckRule:check 是否还有 unresolved 的

在 Spark 3 之后,又额外提供了一些其他拓展点

e.injectColumnar

e.injectFunction

e.injectQueryStagePrepRule

Catalyst 拓展只能在 catalyst 框架下进行,具体表现为

拓展点的位置被限制

无法修改其他 rule

这其实会有一些问题:比如我们无法修改原有 rule 的行为,原有 rule 会 block 一些我们特殊需求,如特殊的类型转换等。

Catalog plugin

更好的支持多数据源

在 spark 3 之后,Spark 提供了 Catalog plugin,能够

Provide schema

DDL

Multiple catalog

TiSpark 2.5

TiKV 为数据源进行 Read + Write。

TiSpark Read

API: Customized RDD + Catalyst Extension + Catalog plugin

TiSpark Read 使用 Catalyst Extension ,拓展 injectPlannerStrategy拓展点。在该拓展点中,TiSpark 会截取可下推的 sub-plan 进行下推至 TiKV,并获取数据。无法下推的部分交给 Spark 完成。

TiSpark 2.5 之后,还使用了 Spark 3.0 提供的 catalog plugin ,侵入性更小。在没有 catalog plugin 时,我们需要 hacker catalog 以及更多的执行计划,还需要提供一些额外机制(dbprefix)用于判断属于哪一个 DataSource,非常不方便。

为什么不使用 DataSource API 呢?

DS API 无法精确下推

Spark 3.2 之前不支持下推 Aggration

无法根据数据类型进行是否下推的判断

TiSpark Write

DataSource API V1 + RDD API

TiSpark Write 的实现基于 DataSource API V1 拓展。TiSpark 会从 Dataframe 出发,利用 RDD API 进行数据的各种处理,最后使用 TiKV-java-client 提供的 2PC 接口保证整体事务的原子性

TiSpark master

TiSpark 2.5 主要基于 DS V1 实现,其问题有

Write 拓展的粒度不够细

No streaming support

Can’t use v2 API in user view (如 writeto 无法使用)

Can’t apply new features of spark(很多新特性都是基于 DSV2,如delete)

No Catalog support,比如需要通过 option 传入 db 与 table 信息

No Catalyst optimize,Write 逻辑节点在 catalyst 中不会有任何优化/检查

因此 TiSpark 在 master 分支进行了 DS V1 -> DS V2 的升级,主要做的事情如下

Support catalog plugin

Spark 3 之后,提供了 catalog plugin,使得 mutile catalog 成为可能。

使用 catalog plugin 可以带来以下优点

Less hacker code

原来我们需要自定义混合的 catalog ( TiDB schema + hive schema),逻辑混杂在一起。

Need not dbprefix

为了防止同名库表,原来我们需要 dbprefix 区分不同的 datasoue

Make read simple

由于原来我们需要在 catalyst 额外进行一些拓展,使用混合 catalog 去判断库表的存在性。难以维护开发

Read on DSV2

在 V1 API 中,Read 需要拓展 catalyst,将逻辑节点替换为 TiDBRelation,然后由 TiDBRelation 提供 schema(而不是 catalog)

在 V2 AP2 中,schema 由 TiDBTable 提供。更重要的是无需拓展 catalyst 并去替换为 TiDBRelation,Spark 会使用 catalog.loadTable 帮我们加载 TiDBTable。

因此,Read 可以进行如下优化

不必在 Analyzer 中改写相关逻辑节点了为 TiDBRelation 了。

不必在 Planner 下推时使用 TiDBRelation 的 schema 了,直接从 Spark 原生逻辑节点 DataSourceV2ScanRelation2(TiDBTable) 中获取即可

Write on DSV2

对开发者来说,Spark 提供了 v1,v2 API 用于拓展。

对于用户来说,Spark 也向用户暴露了两类API。如 df.write 是早期的 api,df.writeto.append 为新版 api。

Spark 会判断开发者是拓展了 v1 还是 v2

df.writeto.append:开发者只能使用 v2 API 实现。

df.write:需要向前兼容,因此 v1 与 v2 API 都可以用。Spark 主要通过实现的接口判断开发者使用的 API 是 v1 还是 v2。不满足任一以下情况的都会被判断为 v1 API

Source extends SupportsCatalogOption/TableProvider

Table extends supportsWrite

Table has batchwrite capabilities

Problems

原来 write 由 DSV1 实现,在转向 V2的过程中我们发现了一些问题

无法处理整体数据

V2 write framework can’t process global Data

和 Catalyst 的优化有冲突,如

Data convert:boolean -> long (unsafe)

Autorandom:mismatch

Temporary solution

由于 DSV2 存在一些问题,我们的处理方式是:先进行整体 v2 的切换,在 v2 框架中,write 仍先使 v1 API 实现

总结

DSV2 support 带来的好处

Less hacker code, easier to develop

The benefit of DSV2

Streaming support

New user API support(writeto)

Closer to catalyst

The new feature of Spark

Delete

More pushdown support

TiSpark prospect

Read

随着 Spark 对下推的支持,我们可以逐步使用 DataSource API ,而不再是拓展 catalyst 的方式。

Write

能够使用 DSV2 改写write

Spark SQl

支持更多的 SQL:Insert,delete,update,mergeinto

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:TiSpark 原理之下推丨TiDB 工具分享
下一篇:使用 Spring Boot 构建 TiDB 应用程序
相关文章