TiKV源码阅读三部曲三,写流程深入分析

网友投稿 591 2024-01-23

背景TiKV 是一个支持事务的分布式 Key-Value 数据库,目前已经是 CNCF 基金会 的顶级项目作为一个新同学,需要一定的前期准备才能够有能力参与 TiKV 社区的代码开发,包括但不限于学习 Rust 语言,理解 TiKV 的原理和在前两者的基础上了解熟悉 TiKV 的源码。

TiKV源码阅读三部曲三,写流程深入分析

笔者将结合TiKV 官方源码解析文档 系列文章,基于 6.1 版本的源码撰写三篇博客,分别介绍以下三个方面:TiKV 源码阅读三部曲(一)重要模块:TiKV 的基本概念,TiKV 读写路径上的三个重要模块(KVService,Storage,RaftStore)和断点调试 TiKV 学习源码的方案

TiKV 源码阅读三部曲(二)读流程:TiKV 中一条读请求的全链路流程TiKV 源码阅读三部曲(三)写流程:TiKV 中一条写请求的全链路流程希望此三篇博客能够帮助对 TiKV 开发感兴趣的新同学尽快了解 TiKV 的 codebase。

本文为第三篇博客,将主要介绍 TiKV 中一条写请求的全链路流程写流程以下四篇博客由上到下分别介绍了 TiKV 3.x 版本 KVService,Storage 和 RaftStore 模块对于分布式事务请求的执行流程。

TiKV 源码解析系列文章(九)Service 层处理流程解析TiKV 源码解析系列文章(十一)Storage - 事务控制层TiKV 源码解析系列文章(十二)分布式事务TiKV 源码解析系列文章(十八)Raft Propose 的 Commit 和 Apply 情景分析

本小节将在 TiKV 6.1 版本的基础上,以一条 PreWrite 请求为例,介绍当前版本的写请求全链路执行流程KVService在 KVService 层,通过 handle_request 和 txn_command_future 宏,PreWrite 接口的请求会直接被路由到 。

Storage::sched_txn_command 函数中impl

>TikvforService{handle_request!( kv_prewrite, future_prewrite,PrewriteRequest,PrewriteResponse

, has_time_detail );}txn_command_future!(future_prewrite,PrewriteRequest,PrewriteResponse,(v, resp, tracker

){{ifletOk(v)=&v { resp.set_min_commit_ts(v.min_commit_ts.into_inner()); resp.set_one_pc_commit_ts(v.

one_pc_commit_ts.into_inner());GLOBAL_TRACKERS.with_tracker(tracker,|tracker|{ tracker.write_scan_detail

(resp.mut_exec_details_v2().mut_scan_detail_v2()); tracker.write_write_detail(resp.mut_exec_details_v2

().mut_write_detail());});} resp.set_errors(extract_key_errors(v.map(|v| v.locks)).into());}});CopyStorage

在 Storage 模块,其会将请求路由到 Scheduler::run_cmd 函数中,并进一步路由到 Scheduler::schedule_command 函数中在 schedule_command。

函数中,当前 command 连同 callback 等上下文会被保存到 task_slots 中,如果当前线程申请到了所有 latch 则会调用 execute 函数继续执行该 task,否则如前文所述,当前任务便会被阻塞在某些 latch 上等待其他线程去唤醒进而执行,当前线程会直接返回并执行其他的工作。

// The entry point of the storage scheduler. Not only transaction commands need// to access keys serially.

pubfnsched_txn_command(&self, cmd:TypedCommand, callback:Callback,)->Result

{...self.sched.run_cmd(cmd,T::callback(callback));Ok(())}pub(incrate::storage)fnrun_cmd(&self, cmd

:Command, callback:StorageCallback){// write flow controlif cmd.need_flow_control()&&self.inner.too_busy

(cmd.ctx().region_id){SCHED_TOO_BUSY_COUNTER_VEC.get(cmd.tag()).inc(); callback.execute(ProcessResult

::Failed{ err:StorageError::from(StorageErrorInner::SchedTooBusy),});return;}self.schedule_command(cmd

, callback);}fnschedule_command(&self, cmd:Command, callback:StorageCallback){let cid =self.inner.gen_id

();let tracker =get_tls_tracker_token();debug!("received new command";"cid"=> cid,"cmd"=>?cmd,"tracker"

=>?tracker);let tag = cmd.tag();let priority_tag =get_priority_tag(cmd.priority());SCHED_STAGE_COUNTER_VEC

.get(tag).new.inc();SCHED_COMMANDS_PRI_COUNTER_VEC_STATIC.get(priority_tag).inc();letmut task_slot =self

.inner.get_task_slot(cid);let tctx = task_slot.entry(cid).or_insert_with(||{self.inner .new_task_context

(Task::new(cid, tracker, cmd), callback)});ifself.inner.latches.acquire(&mut tctx.lock, cid){fail_point!

("txn_scheduler_acquire_success"); tctx.on_schedule();let task = tctx.task.take().unwrap();drop(task_slot

);self.execute(task);return;}let task = tctx.task.as_ref().unwrap();let deadline = task.cmd.deadline(

);let cmd_ctx = task.cmd.ctx().clone();self.fail_fast_or_check_deadline(cid, tag, cmd_ctx, deadline);

fail_point!("txn_scheduler_acquire_fail");}Copy在 execute 函数中,当前线程会生成一个异步任务 spawn 到另一个 worker 线程池中去,该任务主要包含以下两个步骤:

使用 Self::with_tls_engine(|engine| Self::snapshot(engine, snap_ctx)).await 获取 snapshot此步骤与上文读流程中获取 snapshot 的步骤相同,可能通过 ReadLocal 也可能通过 ReadIndex 来获取引擎的 snapshot,此小节不在赘述。

使用 sched.process(snapshot, task).await 基于获取到的 snapshot 和对应 task 去调用 scheduler::process 函数,进而被路由到 scheduler::process_write

函数中/// Executes the task in the sched pool.fnexecute(&self,mut task:Task){set_tls_tracker_token(task

.tracker);let sched =self.clone();self.get_sched_pool(task.cmd.priority()).pool .spawn(asyncmove{...// The program is currently in scheduler worker threads.

// Safety: `self.inner.worker_pool` should ensure that a TLS engine exists.matchunsafe{with_tls_engine

(|engine:&E|kv::snapshot(engine, snap_ctx))}.await{Ok(snapshot)=>{... sched.process(snapshot, task).await

;}Err(err)=>{...}}}).unwrap();}/// Process the task in the current thread.asyncfnprocess(self, snapshot

:E::Snap, task:Task){ifself.check_task_deadline_exceeded(&task){return;}let resource_tag =self.inner.

resource_tag_factory.new_tag(task.cmd.ctx());async{...if task.cmd.readonly(){self.process_read(snapshot

, task,&mut statistics);}else{self.process_write(snapshot, task,&mut statistics).await;};...}.in_resource_metering_tag

(resource_tag).await;}Copyscheduler::process_write 函数是事务处理的关键函数,目前已经有近四百行,里面夹杂了很多新特性和新优化的复杂逻辑,其中最重要的逻辑有两个:

使用 task.cmd.process_write(snapshot, context).map_err(StorageError::from) 根据 snapshot 和 task 执行事务对应的语义:可以从

Command::process_write 函数看到不同的请求都有不同的实现,每种请求都可能根据 snapshot 去底层获取一些数据并尝试写入一些数据有关 PreWrite 和其他请求的具体操作可以参照 。

TiKV 源码解析系列文章(十二)分布式事务,此处不再赘述需要注意的是,此时的写入仅仅缓存在了 WriteData 中,并没有对底层引擎进行实际修改使用 engine.async_write_ext(&ctx, to_be_write, engine_cb, proposed_cb, committed_cb)。

将缓存的 WriteData 实际写入到 engine 层,对于 RaftKV 来说则是表示一次 propose,想要对这一批 WriteData commit 且 applyasyncfnprocess_write

(self, snapshot:E::Snap, task:Task, statistics:&mutStatistics){...let write_result ={let _guard = sample

.observe_cpu();let context =WriteContext{ lock_mgr:&self.inner.lock_mgr, concurrency_manager:self.inner

.concurrency_manager.clone(), extra_op: task.extra_op, statistics, async_apply_prewrite:self.inner.enable_async_apply_prewrite

,};let begin_instant =Instant::now();let res =unsafe{with_perf_context::(tag,||{ task.cmd .process_write

(snapshot, context).map_err(StorageError::from)})};SCHED_PROCESSING_READ_HISTOGRAM_STATIC.get(tag).observe

(begin_instant.saturating_elapsed_secs()); res };...// Safety: `self.sched_pool` ensures a TLS engine exists.

unsafe{with_tls_engine(|engine:&E|{ifletErr(e)= engine.async_write_ext(&ctx, to_be_write, engine_cb, proposed_cb

, committed_cb){SCHED_STAGE_COUNTER_VEC.get(tag).async_write_err.inc();info!("engine async_write failed"

;"cid"=> cid,"err"=>?e); scheduler.finish_with_err(cid, e);}})}}pub(crate)fnprocess_write

L:LockManager>(self, snapshot:S, context:WriteContext,)->Result{matchself{Command::

Prewrite(t)=> t.process_write(snapshot, context),Command::PrewritePessimistic(t)=> t.process_write(snapshot

, context),Command::AcquirePessimisticLock(t)=> t.process_write(snapshot, context),Command::Commit(t)

=> t.process_write(snapshot, context),Command::Cleanup(t)=> t.process_write(snapshot, context),Command

::Rollback(t)=> t.process_write(snapshot, context),Command::PessimisticRollback(t)=> t.process_write(

snapshot, context),Command::ResolveLock(t)=> t.process_write(snapshot, context),Command::ResolveLockLite

(t)=> t.process_write(snapshot, context),Command::TxnHeartBeat(t)=> t.process_write(snapshot, context

),Command::CheckTxnStatus(t)=> t.process_write(snapshot, context),Command::CheckSecondaryLocks(t)=> t

.process_write(snapshot, context),Command::Pause(t)=> t.process_write(snapshot, context),Command::RawCompareAndSwap

(t)=> t.process_write(snapshot, context),Command::RawAtomicStore(t)=> t.process_write(snapshot, context

), _ =>panic!("unsupported write command"),}}fnasync_write_ext(&self, ctx:&Context, batch:WriteData, write_cb

:Callback, proposed_cb:Option, committed_cb:Option,)->kv::Result{fail_point!

("raftkv_async_write");if batch.modifies.is_empty(){returnErr(KvError::from(KvErrorInner::EmptyRequest

));}ASYNC_REQUESTS_COUNTER_VEC.write.all.inc();let begin_instant =Instant::now_coarse();self.exec_write_requests

( ctx, batch,Box::new(move|res|match res {...}), proposed_cb, committed_cb,).map_err(|e|{let status_kind

=get_status_kind_from_error(&e);ASYNC_REQUESTS_COUNTER_VEC.write.get(status_kind).inc(); e.into()})}Copy

进入 raftkv::async_write_ext 函数后,其进而通过 raftkv::exec_write_requests -> RaftStoreRouter::send_command 的调用栈将 task 连带 callback 发送给 RaftBatchSystem 交由 RaftStore 模块处理。

fnexec_write_requests(&self, ctx:&Context, batch:WriteData, write_cb:Callback, proposed_cb

:Option, committed_cb:Option,)->Result{...let cb =StoreCallback::write_ext

(Box::new(move|resp|{write_cb(on_write_result(resp).map_err(Error::into));}), proposed_cb, committed_cb

,);let extra_opts =RaftCmdExtraOpts{ deadline: batch.deadline, disk_full_opt: batch.disk_full_opt,};self

.router.send_command(cmd, cb, extra_opts)?;Ok(())}/// Sends RaftCmdRequest to local store.fnsend_command

(&self, req:RaftCmdRequest, cb:Callback, extra_opts:RaftCmdExtraOpts,)->RaftStoreResult

{send_command_impl::(self, req, cb, extr

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:MSSQL批量创建视图:简单可靠的数据查询方式(mssql批量创建视图)
下一篇:极速传送:从DMP到MSSQL的数据迁移之路(dmp转mssql)
相关文章