TiKV 源码阅读三部曲(二):读操作流程详解

网友投稿 571 2024-02-03

TiKV 是一个支持事务的分布式 Key-Value 数据库,目前已经是 CNCF 基金会 的顶级项目作为一个新同学,需要一定的前期准备才能够有能力参与 TiKV 社区的代码开发,包括但不限于学习 Rust 语言,理解 TiKV 的原理和在前两者的基础上了解熟悉 TiKV 的源码。

TiKV 源码阅读三部曲(二):读操作流程详解

TiKV 官方源码解析文档 详细地介绍了 TiKV 3.x 版本重要模块的设计要点,主要流程和相应代码片段,是学习 TiKV 源码必读的学习资料当前 TiKV 已经迭代到了 6.x 版本,不仅引入了很多新的功能和优化,而且对源码也进行了多次重构,因而一些官方源码解析文档中的代码片段已经不复存在,这使得读者在阅读源码解析文档时无法对照最新源码加深理解;此外尽管 TiKV 官方源码解析文档系统地介绍了若干重要模块的工作,但并没有将读写流程全链路串起来去介绍经过的模块和对应的代码片段,实际上尽快地熟悉读写流程全链路会更利于新同学从全局角度理解代码。

基于以上存在的问题,笔者将基于 6.1 版本的源码撰写三篇博客,分别介绍以下三个方面:TiKV 源码阅读三部曲(一)重要模块:TiKV 的基本概念,TiKV 读写路径上的三个重要模块(KVService,Storage,RaftStore)和断点调试 TiKV 学习源码的方案

TiKV 源码阅读三部曲(二)读流程:TiKV 中一条读请求的全链路流程TiKV 源码阅读三部曲(三)写流程:TiKV 中一条写请求的全链路流程希望此三篇博客能够帮助对 TiKV 开发感兴趣的新同学尽快了解 TiKV 的 codebase。

本文为第二篇博客,将主要介绍 TiKV 中一条读请求的全链路流程读流程TiKV 源码解析系列文章(十九)read index 和 local read 情景分析 介绍了 TiKV 3.x 版本的 ReadIndex/LeaseRead 实现方案。

本小节将在 TiKV 6.1 版本的源码基础上,以一条读请求为例,介绍当前版本读请求的全链路执行流程前文已经提到,可以从 kvproto 对应的 service Tikv 中了解当前 TiKV 支持的 RPC 接口。

经过简单整理,常用的读接口如下:// Key/value store API for TiKV. service Tikv{ rpc KvGet(kvrpcpb.GetRequest)returns

(kvrpcpb.GetResponse){} rpc KvScan(kvrpcpb.ScanRequest)returns(kvrpcpb.ScanResponse){} rpc KvBatchGet

(kvrpcpb.BatchGetRequest)returns(kvrpcpb.BatchGetResponse){} rpc RawGet(kvrpcpb.RawGetRequest)returns

(kvrpcpb.RawGetResponse){} rpc RawBatchGet(kvrpcpb.RawBatchGetRequest)returns(kvrpcpb.RawBatchGetResponse

){} rpc RawScan(kvrpcpb.RawScanRequest)returns(kvrpcpb.RawScanResponse){} rpc RawBatchScan(kvrpcpb

.RawBatchScanRequest)returns(kvrpcpb.RawBatchScanResponse){}...}Copy以下将以最常用的 KvGet 接口为例介绍读流程,其他的读接口所经过的模块大致相似,之后也可以用断点调试的方案去自行阅读。

KVService在 KVService 中, handle_request 宏将业务逻辑封装到了 future_get 函数中在 future_get 函数中,主要使用了 storage.get(req.take_context(), Key::from_raw(req.get_key()), req.get_version().into())。

函数将请求路由到 Storage 模块去执行为了可观测性,当前 TiKV 在读写关键路径上加了很多全局和 request 级别的 metric,这一定程度上影响了刚开始阅读代码的体验其实刚开始熟悉代码时只需要关注核心逻辑即可,metric 相关的代码可以先不用细究。

implTikvforService{handle_request!

(kv_get, future_get,GetRequest,GetResponse, has_time_detail);}fnfuture_get

>( storage:&Storage,mut req:GetRequest,)->implFuture{...

let v = storage.get( req.take_context(),Key::from_raw(req.get_key()), req.get_version

().into(),);asyncmove{let v = v.await;...Ok(resp)}}CopyStorage在 Storage 模块的 get 函数中,所有的 task 都会被 spawn 到 readPool 中执行,具体执行的任务主要包含以下两个工作:

使用 Self::with_tls_engine(|engine| Self::snapshot(engine, snap_ctx)).await? 获取 snapshot使用 snap_store.get(&key, &mut statistics)

基于获取到的 snapshot 获取符合对应事务语义的数据第二个工作比较简单,本小节不再赘述,以下主要介绍第一个工作的具体代码流程/// Get value of the given key from a snapshot.。

////// Only writes that are committed before `start_ts` are visible.pubfnget(&self,mut ctx:Context, key

:Key, start_ts:TimeStamp,)->implFuture

=self.read_pool.spawn_handle(asyncmove{...let snap_ctx =prepare_snap_ctx(&ctx,iter::once(&key), start_ts

,&bypass_locks,&concurrency_manager,CMD,)?;let snapshot =Self::with_tls_engine(|engine|Self::snapshot

(engine, snap_ctx)).await?;{let begin_instant =Instant::now();let stage_snap_recv_ts = begin_instant;

let buckets = snapshot.ext().get_buckets();letmut statistics =Statistics::default();let result =Self::

with_perf_context(CMD,||{let _guard = sample.observe_cpu();let snap_store =SnapshotStore::new( snapshot

, start_ts, ctx.get_isolation_level(),!ctx.get_not_fill_cache

(), bypass_locks, access_locks,false,); snap_store

.get(&key,&mut statistics)// map storage::txn::Error -> storage::Error.map_err(Error::from).map(|r|{KV_COMMAND_KEYREAD_HISTOGRAM_STATIC

.get(CMD).observe(1_f64); r })});...Ok(( result

?,KvGetStatistics{ stats: statistics, latency_stats,}

,))}}.in_resource_metering_tag(resource_tag), priority,thread_rng().next_u64(),);asyncmove{ res

.map_err(|_|Error::from(ErrorInner::SchedTooBusy)).await?}}Copy对于 Self::snapshot(engine, snap_ctx) 函数,其会经由

storage::snapshot -> kv::snapshot -> raftkv::async_snapshot -> raftkv::exec_snapshot 的调用链来到 ServerRaftStoreRouter::read

函数中/// Get a snapshot of `engine`.fnsnapshot( engine:&E, ctx:SnapContext,)->implstd::future。

::Future{kv::snapshot(engine, ctx).map_err(txn::Error::from).map_err(Error::from

)}/// Get a snapshot of `engine`.pubfnsnapshot( engine:&E, ctx:SnapContext,)->impl

std::future::Future{let begin =Instant::now();let(callback, future)=tikv_util

::future::paired_must_called_future_callback(drop_snapshot_callback::);let val = engine.async_snapshot

(ctx, callback);// make engine not cross yield pointasyncmove{ val?;// propagate errorlet result

= future .map_err(|cancel|Error::from(ErrorInner::Other(box_err!(cancel)))).await?;with_tls_tracker

(|tracker|{ tracker.metrics.get_snapshot_nanos += begin.elapsed().as_nanos()asu64;});fail_point!

(after-snapshot); result }}fnasync_snapshot(&self,mut ctx:SnapContext, cb:Callback

::Snap>)->kv::Result{...self.exec_snapshot( ctx, req,Box::new(move|res|match res

{...}),).map_err(|e|{let status_kind =get_status_kind_from_error(&e);ASYNC_REQUESTS_COUNTER_VEC.snapshot

.get(status_kind).inc(); e.into()})}fnexec_snapshot(&self, ctx:SnapContext, req:Request

, cb:Callback,)->Result{...letmut cmd =RaftCmdRequest::default(); cmd

.set_header(header); cmd.set_requests(vec![req].into());self.router .read( ctx

.read_id, cmd,StoreCallback::read(Box::new(move|resp|{cb(on_read_result(resp).map_err(Error

::into));})),).map_err(From::from)}implLocalReadRouterforServerRaftStoreRouter

{fnread(&self, read_id:Option, req:RaftCmdRequest, cb:Callback

,)->RaftStoreResult{letmut local_reader =self.local_reader.borrow_mut(); local_reader

.read(read_id, req, cb);Ok(())}}Copy在 ServerRaftStoreRouter::read 函数中,其会调用 local_reader 的 read 函数,并进而路由到

LocalReader::propose_raft_command 函数在该函数中,会使用 LocalReader::pre_propose_raft_command 函数来判断是否能够 ReadLocal,如果可以则直接获取本地引擎的 snapshot 并执行 callback 返回即可,否则便调用 。

redirect 函数连带 callback 路由到 RaftBatchSystem 的对应 normal 状态机中去执行 ReadIndex 读,之后本线程不再处理该任务#[inline]pubfnread

(&mutself, read_id:Option, req:RaftCmdRequest, cb:Callback,){self

.propose_raft_command(read_id, req, cb);maybe_tls_local_read_metrics_flush();}pubfnpropose_raft_command

(&mutself,mut read_id:Option, req:RaftCmdRequest, cb:Callback,){match

self.pre_propose_raft_command(&req){Ok(Some((mut delegate, policy)))=>{let delegate_ext:LocalReadContext

;letmut response =match policy {// Leader can read local if and only if it is in lease.RequestPolicy

::ReadLocal=>{...let region =Arc::clone(&delegate.region);let response = delegate

.execute(&req,®ion,None, read_id,Some(delegate_ext));// Try renew lease in advance delegate

.maybe_renew_lease_advance(&self.router, snapshot_ts); response }

// Replica can serve stale read if and only if its `safe_ts` >= `read_ts`RequestPolicy::StaleRead=>{...

let region =Arc::clone(&delegate.region);// Getting the snapshotlet response = delegate

.execute(&req,®ion,None, read_id,Some(delegate_ext));...} _ =>unreachable!(),};...

cb.invoke_read(response);}// Forward to raftstore.Ok(None)=>self.redirect(RaftCommand::new

(req, cb)),Err(e)=>{letmut response =cmd_resp::new_error(e);ifletSome(delegate)=self.delegates.get(&req

.get_header().get_region_id()){cmd_resp::bind_term(&mut response, delegate.term);} cb.invoke_read

(ReadResponse{ response, snapshot:None, txn_extra_op:

TxnExtraOp::Noop,});}}}Copy需要注意的是,在此处能否 ReadLocal 的判断是可以并行的,也就是乐观情况下并行的读请求可以并行获取底层引擎的 snapshot,不需要经过 RaftBatchSystem 。

那么到底什么时候可以直接读取 snapshot 而不需要经过 RaftStore 走一轮 ReadIndex 来处理呢?原理就是 Lease 机制,可以先简单阅读一下 TiKV Lease Read 的功能介绍

接着让我们回到 LocalReader::pre_propose_raft_command 函数,其会进行一系列的检查(此处已略去),如果皆通过则会进一步调用 inspector.inspect(req)。

函数,在其内部,其会进行一系列的判断并返回是否可以 ReadLocalreq.get_header().get_read_quorum():如果该请求明确要求需要用 read index 方式处理,所以返回 ReadIndex。

self.has_applied_to_current_term():如果该 leader 尚未 apply 到它自己的 term,则使用 ReadIndex 处理,这是 Raft 有关线性一致性读的一个 corner case。

self.inspect_lease():如果该 leader 的 lease 已经过期或者不确定,说明可能出现了一些问题,比如网络不稳定,心跳没成功等,此时使用 ReadIndex 处理,否则便可以使用 ReadLocal 处理。

pubfnpre_propose_raft_command(&mutself, req:&RaftCmdRequest,)->Result{

...match inspector.inspect(req){Ok(RequestPolicy::ReadLocal)=>Ok(Some((delegate,RequestPolicy::ReadLocal

))),Ok(RequestPolicy::StaleRead)=>Ok(Some((delegate,RequestPolicy::StaleRead))),// It can not handle other policies.

Ok(_)=>Ok(None),Err(e)=>Err(e),}}fninspect(&mutself, req:&RaftCmdR

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:TiKV 源码阅读三部曲(三):写操作流程解析
下一篇:TiKV 高性能追踪的实现原理与分析
相关文章