TiKV源码阅读三部曲一,重要模块全面解读

网友投稿 644 2024-01-23

谭新宇:清华大学软件学院研三在读,Apache IoTDB committer,Talent Plan Community mentorTiKV 是一个支持事务的分布式 Key-Value 数据库,目前已经是

TiKV源码阅读三部曲一,重要模块全面解读

CNCF 基金会 的顶级项目作为一个新同学,需要一定的前期准备才能够有能力参与 TiKV 社区的代码开发,包括但不限于学习 Rust 语言,理解 TiKV 的原理和在前两者的基础上了解熟悉 TiKV 的源码。

TiKV 官方源码解析文档 详细地介绍了 TiKV 3.x 版本重要模块的设计要点,主要流程和相应代码片段,是学习 TiKV 源码必读的学习资料当前 TiKV 已经迭代到了 6.x 版本,不仅引入了很多新的功能和优化,而且对源码也进行了多次重构,因而一些官方源码解析文档中的代码片段已经不复存在,这使得读者在阅读源码解析文档时无法对照最新源码加深理解;此外尽管 TiKV 官方源码解析文档系统地介绍了若干重要模块的工作,但并没有将读写流程全链路串起来去介绍经过的模块和对应的代码片段,实际上尽快地熟悉读写流程全链路会更利于新同学从全局角度理解代码。

基于以上存在的问题,笔者将基于 6.1 版本的源码撰写三篇博客,分别介绍以下三个方面:TiKV 源码阅读三部曲(一)重要模块:TiKV 的基本概念,TiKV 读写路径上的三个重要模块(KVService,Storage,RaftStore)和断点调试 TiKV 学习源码的方案

TiKV 源码阅读三部曲(二)读流程:TiKV 中一条读请求的全链路流程TiKV 源码阅读三部曲(三)写流程:TiKV 中一条写请求的全链路流程希望此三篇博客能够帮助对 TiKV 开发感兴趣的新同学尽快了解 TiKV 的 codebase。

本文为第一篇博客,将主要介绍 TiKV 的基本概念,TiKV 读写路径上的三个重要模块(KVService,Storage,RaftStore)和断点调试 TiKV 学习源码的方案基本概念TiKV 的架构简介可以查看 。

官方文档总体来看,TiKV 是一个通过 Multi-Raft 实现的分布式 KV 数据库TiKV 的每个进程拥有一个 store,store 中拥有若干 region每个 region 是一个 raft 组,会存在于副本数个 store 上管理一段 KV 区间的数据。

重要模块KVServiceTiKV 的 Service 层代码位于 src/server 文件夹下,其职责包括提供 RPC 服务、将 store id 解析成地址、TiKV 之间的相互通信等有关 Service 层的概念解析可以查看阅读 。

TiKV 源码解析系列文章(九)Service 层处理流程解析。

TiKV 包含多个 gRPC service其中最重要的一个是 KVService,位于 src/server/service/kv.rs 文件中KVService 定义了 TiKV 的 kv_get,kv_scan,kv_prewrite,kv_commit 等事务操作 API,用于执行 TiDB 下推下来的复杂查询和计算的 coprocessor API,以及 raw_get,raw_put 等 Raw KV API。

batch_commands 接口则是用于将上述的接口 batch 起来,以优化高吞吐量的场景另外,TiKV 的 Raft group 各成员之间通信用到的 raft 和 batch_raft 接口也是在这里提供的。

本小节将简单介绍 KVService 及其启动流程,并顺带介绍 TiKV 若干重要结构的初始化流程cmd/tikv-server/main.rs 是 TiKV 进程启动的入口,其主要做了以下两个工作:解析配置参数

使用 server::server::run_tikv(config) 启动 tikv 进程fnmain(){let build_timestamp =option_env!("TIKV_BUILD_TIME"

);let version_info =tikv::tikv_version_info(build_timestamp);// config parsing// ...// config parsing

server::server::run_tikv(config);}Copy对于 components/server/src/server.rs 的 run-tikv 函数,其会调用 run_impl 函数并根据配置参数来启动对应的 KV 引擎。

在 run_impl 函数中,首先会调用 TikvServer::::init::(config) 函数做若干重要结构的初始化,包含但不限于 batch_system, concurrency_manager, background_worker, quota_limiter 等等,接着在

tikv.init_servers::() 里将 RPC handler 与 KVService 绑定起来,最后在 tikv.run_server(server_config) 中便会使用 TiKV 源码解析系列文章(七)gRPC Server 的初始化和启动流程

中介绍的 grpc server 绑定对应的端口并开始监听连接了/// Run a TiKV server. Returns when the server is shutdown by the user, in which。

/// case the server will be properly stopped.pubfnrun_tikv(config:TikvConfig){...// Do some prepare works before start.

pre_start();let _m =Monitor::default();dispatch_api_version!(config.storage.api_version(),{if!config.

raft_engine.enable {run_impl::(config)}else{run_impl::(config)}})

}#[inline]fnrun_impl(config:TikvConfig){letmut tikv =TikvServer::

::init::(config);...let server_config = tikv.init_servers::();... tikv.run_server(server_config

);signal_handler::wait_for_signal(Some(tikv.engines.take().unwrap().engines)); tikv.stop();}fnrun_server

(&mutself, server_config:Arc){let server =self.servers.as_mut().unwrap();

server .server .build_and_bind().unwrap_or_else(|e|fatal!("failed to build server: {}", e)); server

.server .start(server_config,self.security_mgr.clone()).unwrap_or_else(|e|fatal!("failed to start server: {}"

, e));}CopyKVService 服务启动后,所有发往监听端口的请求便会路由到 KVService 对应的 handler 上有关 KVService 目前支持的接口,可以直接查看 kvproto。

对应的 service Tikv,目前的 RPC 接口已经接近 60 个,每个接口在代码中都会对应一个 handler// Key/value store API for TiKV. service Tikv { // Commands using a transactional interface. rpc KvGet(kvrpcpb.GetRequest) returns (kvrpcpb.GetResponse) {} rpc KvScan(kvrpcpb.ScanRequest) returns (kvrpcpb.ScanResponse) {} rpc KvPrewrite(kvrpcpb.PrewriteRequest) returns (kvrpcpb.PrewriteResponse) {} rpc KvPessimisticLock(kvrpcpb.PessimisticLockRequest) returns (kvrpcpb.PessimisticLockResponse) {} rpc KVPessimisticRollback(kvrpcpb.PessimisticRollbackRequest) returns (kvrpcpb.PessimisticRollbackResponse) {} ... }。

Copy当 KVService 收到请求之后,会根据请求的类型把这些请求转发到不同的模块进行处理对于从 TiDB 下推的读请求,比如 sum,avg 操作,会转发到 Coprocessor 模块进行处理,对于 KV 请求会直接转发到 Storage 模块进行处理。

KV 操作根据功能可以被划分为 Raw KV 操作以及 Txn KV 操作两大类Raw KV 操作包括 raw put、raw get、raw delete、raw batch get、raw batch put、raw batch delete、raw scan 等普通 KV 操作。

Txn KV 操作是为了实现事务机制而设计的一系列操作,如 prewrite 和 commit 分别对应于 2PC 中的 prepare 和 commit 阶段的操作与 TiKV 源码解析系列文章(七)gRPC Server 的初始化和启动流程

中介绍的 handler example 不同,当前 KVService 对事务 API 例如 kv_prewrite, kv_commit 和 Raw API 例如 raw_get, raw_scan 进行了封装,由于他们都会被路由到 Storage 模块,所以接口无关的逻辑都被封装到了

handle_request 宏中,接口相关的逻辑则被封装到了 future_prewirte, future_commit 等 future_xxx 函数中需要注意的是,对于 coprocessor API,raft API 等相关接口依然采用了原生对接 grpc-rs 的方式。

macro_rules! handle_request {($fn_name: ident,$future_name: ident,$req_ty: ident,$resp_ty: ident)=>{handle_request!

($fn_name,$future_name,$req_ty,$resp_ty, no_time_detail);};($fn_name: ident,$future_name: ident,$req_ty

: ident,$resp_ty: ident,$time_detail: tt)=>{fn$fn_name(&mutself, ctx:RpcContext,mut req:$req_ty, sink

:UnarySink){forward_unary!(self.proxy,$fn_name, ctx, req, sink);let begin_instant =Instant::

now();let source = req.mut_context().take_request_source();let resp =$future_name(&self.storage, req)

;let task =asyncmove{let resp = resp.await?;let elapsed = begin_instant.saturating_elapsed();set_total_time!

(resp, elapsed,$time_detail); sink.success(resp).await?;GRPC_MSG_HISTOGRAM_STATIC.$fn_name.observe(elapsed

.as_secs_f64());record_request_source_metrics(source, elapsed);ServerResult::Ok(())}.map_err(|e|{log_net_error!

(e,"kv rpc failed";"request"=>stringify!($fn_name));GRPC_MSG_FAIL_COUNTER.$fn_name.inc();}).map(|_|()

); ctx.spawn(task);}}}implTikvfor

Service{handle_request!(kv_get, future_get,GetRequest,GetResponse, has_time_detail);handle_request!

(kv_scan, future_scan,ScanRequest,ScanResponse);handle_request!( kv_prewrite, future_prewrite,PrewriteRequest

,PrewriteResponse, has_time_detail );...handle_request!(raw_get, future_raw_get,RawGetRequest,RawGetResponse

);handle_request!( raw_batch_get, future_raw_batch_get,RawBatchGetRequest,RawBatchGetResponse);handle_request!

(raw_scan, future_raw_scan,RawScanRequest,RawScanResponse);...fncoprocessor(&mutself, ctx:RpcContext<

_>,mut req:Request, sink:UnarySink){forward_unary!(self.proxy, coprocessor, ctx, req, sink)

;let source = req.mut_context().take_request_source();let begin_instant =Instant::now();let future =future_copr

(&self.copr,Some(ctx.peer()), req);let task =asyncmove{let resp = future.await?.consume(); sink.success

(resp).await?;let elapsed = begin_instant.saturating_elapsed();GRPC_MSG_HISTOGRAM_STATIC.coprocessor

.observe(elapsed.as_secs_f64());record_request_source_metrics(source, elapsed);ServerResult::Ok(())}.

map_err(|e|{log_net_error!(e,"kv rpc failed";"request"=>"coprocessor");GRPC_MSG_FAIL_COUNTER.coprocessor

.inc();}).map(|_|()); ctx.spawn(task);}...}Copy在事务相关 API 的 future_xxx 函数实现中,对于带有写语义的 future_prewrite, future_commit 等函数,由于它们会被统一调度到 Storage 模块的 sched_txn_command 函数中,当前又抽象出了

txn_command_future 宏来减少冗余代码;对于带有读语义的 future_get, future_scan 等函数,由于他们会分别调用 Storage 模块的 get/scan 等函数,因而目前并没有进行进一步抽象。

macro_rules! txn_command_future {($fn_name: ident,$req_ty: ident,$resp_ty: ident,($req: ident)$prelude

: stmt;($v: ident,$resp: ident,$tracker: ident){$else_branch: expr })=>{fn$fn_name

,F:KvFormat>( storage:&Storage,$req:$req_ty,)->implFuture{$prelude

let$tracker=GLOBAL_TRACKERS.insert(Tracker::new(RequestInfo::new($req.get_context(),RequestType::Unknown

,0,)));set_tls_tracker_token($tracker);let(cb, f)=paired_future_callback();let res = storage.sched_txn_command

($req.into(), cb);asyncmove{defer!{{GLOBAL_TRACKERS.remove($tracker);}};let$v=match res {Err(e)=>Err(

e),Ok(_)=> f.await?,};letmut$resp=$resp_ty::default();ifletSome(err)=extract_region_error(&$v){$resp.

set_region_error(err);}else{$else_branch;}Ok($resp)}}};($fn_name: ident,$req_ty: ident,$resp_ty: ident

,($v: ident,$resp: ident,$tracker: ident){$else_branch: expr })=>{txn_command_future!($fn_name,$req_ty

,$resp_ty,(req){};($v,$resp,$tracker){$else_branch});};($fn_name: ident,$req_ty: ident,$resp_ty: ident

,($v: ident,$resp: ident){$else_branch: expr })=>{txn_command_future!($fn_name,$req_ty,$resp_ty,(req)

{};($v,$resp, tracker){$else_branch});};}txn_command_future!(future_prewrite,PrewriteRequest,PrewriteResponse

,(v, resp, tracker){{ifletOk(v)=&v { resp.set_min_commit_ts(v.min_commit_ts.into_inner()); resp.set_one_pc_commit_ts

(v.one_pc_commit_ts.into_inner());GLOBAL_TRACKERS.with_tracker(tracker,|tracker|{ tracker.write_scan_detail

(resp.mut_exec_details_v2().mut_scan_detail_v2()); tracker.write_write_detail(resp.mut_exec_details_v2

().mut_write_detail());});} resp.set_errors(extract_key_errors(v.map(|v| v.locks)).into());}});fnfuture_get

( storage:&Storage,mut req:GetRequest,)->implFuture

ServerResult>{let tracker =GLOBAL_TRACKERS.insert(Tracker::new(RequestInfo::new( req.get_context

(),RequestType::KvGet, req.get_version(),)));set_tls_tracker_token(tracker);let start =Instant::now()

;let v = storage.get( req.take_context(),Key::from_raw(req.get_key()), req.get_version().into(),);async

move{let v = v.await;let duration_ms =duration_to_ms(start.saturating_elapsed());letmut resp =GetResponse

::default();ifletSome(err)=extract_region_error(&v){ resp.set_region_error(err);}else{match v {Ok(

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:MSSQL容量优化:精准统计分析(mssql 容量统计)
下一篇:MSSQL批量创建视图:简单可靠的数据查询方式(mssql批量创建视图)
相关文章