黄东旭解析 TiDB 的核心优势
516
2024-03-18
我们在前面的概述里面,已经描述了 batchSystem
的重要组件:
FSM
n * normalFsm
controlFsm
MailBox
n * normalMailBox
controlMailBox
scheduler
normalScheduler
controlScheduler
poller
router
本文将会带大家看一下,TIKV
在启动过程中,是如何构建与初始化 BatchSystem
这些组件的。
另外,BatchSystem
还有两个比较重要的异步任务:
一个是 ApplyFsm
,其是和 normalFsm
一一对应的,每当 normalFsm
内部的 raft
模块生成了 committedEntries
,就需要 ApplyFsm
将其进行 Apply
,最后落盘到 RocksDB
一个是 AsyncWriter
,专门用于处理 multiRaft
生成的 Msg
和 raftLog
,将 Msg
发送给其他 TIKV
实例,将 raftLog
通过 raftLogEngine
持久化到 RocksDB
BatchSystem
的代码有两个难点,一个是代码的结构稍微不太集中,分别分布在:
components/batch-system/src/
components/raftstore-v2/src/
components/raftstore/src/
一个是异步任务池和 channel
的大量使用,让 Msg
的流转很难理解。
本文专门研究 BatchSystem
的初始化和启动流程,重点观察各个组件在初始化过程中各个异步任务池的作用,还有各个 channel
对应的 Msg
流转分发逻辑。希望经过本文的解析,读者可以通过代码更好的理解 BatchSystem
、multiRaft
原理、Region
合并与分裂的流程。
BatchSystem
创建入口是 components/server/src/server2.rs::run_impl::init_engines
, 主要创建 BatchSystem
的框架,特别是创建并且初始化 control
相关组件,例如 controlFsm
、controlMailBox
、controlScheduler
等等。
components/server/src/server2.rs::run_impl::init_engines : 创建 node,内部 StoreSystem 实现 multi raft 分布式协议 : 创建 raftLogEngine 用于存储和恢复节点 raft log : 创建 tablet_registry 用于存储节点 KV 数据 -- src/server/raftkv2/node.rs::NodeV2::try_bootstrap_store : 创建 NodeV2.StoreSystem ---- components/raftstore-v2/src/batch/store.rs::create_store_batch_system : 创建并且初始化 controlFsm ------ components/batch-system/src/batch.rs::create_system : 创建 controlMailBox、controlScheduler、normalScheduler、router : 创建 BatchSystem
components/server/src/server2.rs
init_engines
函数主要用于创建 TIKV
各个重要模块的数据结构对象,其中:
node
对象代表当前 TIKV
的节点实例,每一个 TIKV
程序仅对应一个 node
对象
raft_engine
对象实际上是 RaftLogEngine
,Raft
接受其他节点或者自己节点产生的任何 raftlog
都需要通过这个对象写入 rocksdb
,是分布式系统恢复一致性的重要保证
tablet_registry
模块和 rocksdb
强相关, TIKV
Raft
Apply
后需要持久化的任何 KV
数据都需要通过 tablet_registry
来写入 rocksdb
router
对象我们在前一个文章中已经描述过,其内部包含了各个 Raft Region
MailBox
,专门用于路由 RPC
请求到 raftStore
模块
fn init_engines( &mut self, flow_listener: engine_rocks::FlowListener, ) -> Arc<EnginesResourceInfo> { ... // Create raft engine let (raft_engine, ...) = CER::build( &self.core.config, ... ); ... let mut node = NodeV2::new( &self.core.config.server, self.pd_client.clone(), ... ); node.try_bootstrap_store(&self.core.config.raft_store, &raft_engine) .unwrap_or_else(|e| fatal!("failed to bootstrap store: {:?}", e)); let router = node.router().clone(); ... let builder = builder.state_storage(Arc::new(StateStorage::new( raft_engine.clone(), router.clone(), ))); let factory = Box::new(builder.build()); let registry = TabletRegistry::new(factory, self.core.store_path.join("tablets")) .unwrap_or_else(|e| fatal!("failed to create tablet registry {:?}", e)); self.tablet_registry = Some(registry.clone()); ... let router = RaftRouter::new(node.id(), router); ... let mut engine = RaftKv2::new(router.clone(), ...); ... self.engines = Some(TikvEngines { raft_engine, engine, }); self.router = Some(router); self.node = Some(node); ... engines_info }
src/server/raftkv2/node.rs
这个函数主要功能是生成唯一的 nodeId
,然后利用 create_store_batch_system
创建 raftStore
的各个对象。
pub fn try_bootstrap_store( &mut self, cfg: &raftstore_v2::Config, raft_engine: &ER, ) -> Result<()> { let store_id = Bootstrap::new( raft_engine, self.cluster_id, &*self.pd_client, self.logger.clone(), ) .bootstrap_store()?; self.store.set_id(store_id); let (router, system) = raftstore_v2::create_store_batch_system( cfg, store_id, self.logger.clone(), self.resource_ctl.clone(), ); self.system = Some((router, system)); Ok(()) }
raftstore_v2::create_store_batch_system
components/raftstore-v2/src/batch/store.rs
StoreFsm
实际上就是 controlFsm
,不管有多少个 region
,controlFsm
只有一个,这里提前将 controlFsm
创建出来
StoreFsm::new
会返回两个结果,一个是 fsm
,一个是 channel
的发送端 tx
。channel 的接收端 rx
是 fsm
成员变量,tx
后续会绑定到 controlMailBox
内部
create_system
将会继续创建 raftStore
的其他组件对象
pub fn create_store_batch_system<EK, ER>( cfg: &Config, store_id: u64, logger: Logger, resource_ctl: Option<Arc<ResourceController>>, ) -> (StoreRouter<EK, ER>, StoreSystem<EK, ER>) where EK: KvEngine, ER: RaftEngine, { let (store_tx, store_fsm) = StoreFsm::new(cfg, store_id, logger.clone()); let (router, system) = batch_system::create_system(&cfg.store_batch_system, store_tx, store_fsm, resource_ctl); let system = StoreSystem { system, workers: None, schedulers: None, logger: logger.clone(), shutdown: Arc::new(AtomicBool::new(false)), node_start_time: monotonic_raw_now(), }; (StoreRouter { router, logger }, system) }
components/batch-system/src/batch.rs
上一个步骤中,controlFsm
就是这里的 controller
参数,sender
是 controlFsm
channel
的发送端。因此 create_system
首先就利用 controlFsm
和 sender
来创建 controlMailBox
接下来,还需要创建 normal_scheduler
、 control_scheduler
与 poller
,这三个组件共用一套 channel
最后,把 controlMailBox
、normal_scheduler
、control_scheduler
放入 router
中去
使用 router
构建 BatchSystem
pool_state_builder
一般用于动态增加或者减少 poller
数量
pub fn create_system<N: Fsm, C: Fsm>( cfg: &Config, sender: mpsc::LooseBoundedSender<C::Message>, controller: Box<C>, resource_ctl: Option<Arc<ResourceController>>, ) -> (BatchRouter<N, C>, BatchSystem<N, C>) { let control_box = BasicMailbox::new(sender, controller, ...); let (sender, receiver) = unbounded(resource_ctl); let normal_scheduler = NormalScheduler { sender: sender.clone(), ... }; let control_scheduler = ControlScheduler { sender: sender.clone(), }; let pool_state_builder = PoolStateBuilder { ... fsm_receiver: receiver.clone(), fsm_sender: sender, ... }; let router = Router::new(control_box, normal_scheduler, control_scheduler, ...); let system = BatchSystem { ... router: router.clone(), receiver, ... workers: Arc::new(Mutex::new(Vec::new())), ... pool_state_builder: Some(pool_state_builder), }; (router, system)}
目前经过 init_engines
处理后,batchSystm
的当前进度为:
FSM
n * normalFsm
(未创建)
controlFsm
(已创建、已初始化)
MailBox
n * normalMailBox
(未创建)
controlMailBox
(已创建、已初始化)
scheduler
normalScheduler
(已创建、已初始化)
controlScheduler
(已创建、已初始化)
poller
(未创建、未初始化)
router
(已创建部分、未初始化)
经过 init_engines
处理后,batchSystem
大概框架已经建立完毕,而且 control
组件基本初始化完成。接下来的流程中,batchSystem
将会着重创建并初始化剩余的组件。
components/server/src/server2.rs::run_impl::init_servers -- src/server/raftkv2/node.rs::NodeV2::start ---- components/raftstore-v2/src/batch/store.rs::StoreSystem::start : 启动 async_write ,循环等待 multiRaft 生产的 raftlog,然后通过 raftLogEngine 写入 : 构建 StorePollerBuilder,并且通过初始化 StorePollerBuilder 构建 PeerFsm : 使用 BatchSystem::spawn 方法构建 poller、StorePoller,启动多个异步任务 poller.poll : 根据 StorePollerBuilder 构建 PeerFsm 来构建 mailboxes,并将它们注册到 router 成员变量里面去 : 发送控制命令 `StoreMsg::Start,创建 ApplyFsm,启动 Apply 的异步任务,对 committed entries 进行 Apply ------ components/raftstore/src/store/async_io/write.rs::StoreWriters::spawn/increase_to ----------- components/raftstore/src/store/async_io/write.rs::Worker::run --------------- components/raftstore/src/store/async_io/write.rs::Worker::handle_msg/write_to_db : 创建 async_write 并且开启异步协程循环,通过 async_write channel 的 rx 接收 raft log,异步写入 rocksdb : 向外部返回 async_write channel 的 tx ------ components/raftstore-v2/src/batch/store.rs::StorePollerBuilder::new ------ components/raftstore-v2/src/batch/store.rs::StorePollerBuilder::init : 创建各个 region 的 PeerFsm,也就是 normalFsm : `PeerFsm` 内部会继续创建 `Peer`,比较关键的是创建了 `raftnode`,也就是创建了 `raft` 模块 ------ components/batch-system/src/batch.rs::BatchSystem::spawn ---------- components/raftstore-v2/src/batch/store.rs::StorePollerBuilder::build : 构建 poll_ctx,特别地注意 schedulers.write(async_writeChannel的tx) : 构建 Poller、StorePoller ---------- components/batch-system/src/batch.rs::Poller::poll : 接收 fsm,然后交给 StorePoller 的 handle_normal / handle_control 对 fsm 进行处理 : 对于 raft 生成的 raftlog/msg,利用 schedulers.write(async_writeChannel的tx) 发送给 async_write 进行处理 : 对于 raft 生成的 committedEntries, 通过 peer 的 apply_scheduler(也就是 ApplyFsm 的 channel tx) 发送给 ApplyFsm 进行处理 --------------- components/raftstore-v2/src/batch/store.rs::StorePoller::handle_control -------------------- components/raftstore-v2/src/fsm/peer.rs::PeerFsmDelegate::on_start ------------------------- components/raftstore-v2/src/operation/command/mod.rs::Peer::schedule_apply_fsm : 创建 ApplyFsm,异步运行 handle_all_tasks,对 committedEntries 进行 Apply
components/server/src/server2.rs
主要作用调用 NodeV2::start
fn init_servers<F: KvFormat>(&mut self) -> Arc<VersionTrack<ServerConfig>> { ... let engines = self.engines.as_mut().unwrap(); ... self.node .as_mut() .unwrap() .start( engines.raft_engine.clone(), self.tablet_registry.clone().unwrap(), self.router.as_ref().unwrap(), server.transport(), ... ) ... server_config }
src/server/raftkv2/node.rs
主要作用调用 StoreSystem::start
pub fn start<T>( &mut self, raft_engine: ER, registry: TabletRegistry<EK>, router: &RaftRouter<EK, ER>, ... ) -> Result<()> where T: Transport + 'static, { let store_id = self.id(); if let Some(region) = Bootstrap::new( ... ) .bootstrap_first_region(&self.store, store_id)? { ... registry.tablet_factory().open_tablet(ctx, &path).unwrap(); } ... self.start_store( raft_engine, registry, router, ... )?; Ok(()) }fn start_store<T>( &mut self, raft_engine: ER, registry: TabletRegistry<EK>, router: &RaftRouter<EK, ER>, ... ) -> Result<()> where T: Transport + 'static, { ... let system = &mut self.system.as_mut().unwrap().1; system.start( store_id, store_cfg, raft_engine, registry, ... )?; Ok(()) }
components/raftstore-v2/src/batch/store.rs
启动 async_write
,循环等待 multiRaft
生产的 raftlog
,然后通过 raftLogEngine
写入
构建 StorePollerBuilder
,并且通过初始化 StorePollerBuilder
构建 PeerFsm
使用BatchSystem::spawn
方法构建 poller
、StorePoller
,启动多个异步任务 poller.poll
,等待 normalScheduler
/controlScheduler
发来的 fsm
,然后交给 StorePoller
的 handle_normal
/ handle_control
对 fsm
进行处理
根据 StorePollerBuilder
构建 PeerFsm
来构建 mailboxes
,并将它们注册到 router
成员变量里面去
发送控制命令 StoreMsg::Start
,创建 ApplyFsm
,启动 Apply
的异步任务,持久化 committed entries
pub fn start<T, C>( &mut self, store_id: u64, cfg: Arc<VersionTrack<Config>>, raft_engine: ER, tablet_registry: TabletRegistry<EK>, ... ) -> Result<()> where T: Transport + 'static, C: PdClient + 'static, { ... let mut workers = Workers::new(background, pd_worker, ...); workers .async_write .spawn(store_id, raft_engine.clone(), ...)?; ... let tablet_scheduler = workers.tablet.start_with_timer( "tablet-worker", tablet::Runner::new( tablet_registry.clone(), ... ), ); let schedulers = Schedulers { read: read_scheduler, pd: workers.pd.scheduler(), tablet: tablet_scheduler, write: workers.async_write.senders(), ... }; let builder = StorePollerBuilder::new( cfg.clone(), store_id, raft_engine.clone(), tablet_registry, router.clone(), schedulers.clone(), ... store_meta.clone(), ... ); self.schedulers = Some(schedulers); let peers = builder.init()?; self.system.spawn(tag, builder.clone()); let apply_pool = builder.apply_pool.clone(); let refresh_config_runner = refresh_config::Runner::new( ... self.system.build_pool_state(builder), ... ); assert!(workers.refresh_config_worker.start(refresh_config_runner)); self.workers = Some(workers); let mut mailboxes = Vec::with_capacity(peers.len()); let mut address = Vec::with_capacity(peers.len()); { let mut meta = store_meta.as_ref().lock().unwrap(); for (region_id, (tx, mut fsm)) in peers { ... address.push(region_id); mailboxes.push(( region_id, BasicMailbox::new(tx, fsm, router.state_cnt().clone()), )); } } router.register_all(mailboxes); // Make sure Msg::Start is the first message each FSM received. let watch = Arc::new(ReplayWatch::new(self.logger.clone())); for addr in address { router .force_send(addr, PeerMsg::Start(Some(watch.clone()))) .unwrap(); } router.send_control(StoreMsg::Start).unwrap(); Ok(()) }
components/raftstore-v2/src/batch/store.rs
构建 apply_pool
,为后续 ApplyFsm
的运行启动做准备
返回 StorePollerBuilder
pub fn new( cfg: Arc<VersionTrack<Config>>, store_id: u64, engine: ER, tablet_registry: TabletRegistry<EK>, trans: T, router: StoreRouter<EK, ER>, schedulers: Schedulers<EK, ER>, ... ) -> Self { ... let apply_pool = YatpPoolBuilder::new(DefaultTicker::default()) ... .build_future_pool(); let global_stat = GlobalStoreStat::default(); StorePollerBuilder { ... } }
components/raftstore-v2/src/batch/store.rs
创建各个 region
的 PeerFsm
,也就是 normalFsm
PeerFsm
内部会继续创建 Peer
,比较关键的是创建了 raftnode
,也就是创建了 raft
模块
fn init(&self) -> Result<HashMap<u64, SenderFsmPair<EK, ER>>> { let mut regions = HashMap::default(); let cfg = self.cfg.value(); self.engine .for_each_raft_group::<Error, _>(&mut |region_id| { let storage = match Storage::new( region_id, self.store_id, self.engine.clone(), self.schedulers.read.clone(), &self.logger, )? { Some(p) => p, None => return Ok(()), }; ... let (sender, peer_fsm) = PeerFsm::new( &cfg, &self.tablet_registry, self.key_manager.as_deref(), &self.snap_mgr, storage, )?; ... Ok(()) })?; Ok(regions) }impl<EK: KvEngine, ER: RaftEngine> PeerFsm<EK, ER> { pub fn new( cfg: &Config, tablet_registry: &TabletRegistry<EK>, key_manager: Option<&DataKeyManager>, snap_mgr: &TabletSnapManager, storage: Storage<EK, ER>, ) -> Result<SenderFsmPair<EK, ER>> { let peer = Peer::new(cfg, tablet_registry, key_manager, snap_mgr, storage)?; let (tx, rx) = mpsc::loose_bounded(cfg.notify_capacity); let fsm = Box::new(PeerFsm { peer, mailbox: None, receiver: rx, ... }); Ok((tx, fsm)) }} impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> { /// Creates a new peer. /// /// If peer is destroyed, `None` is returned. pub fn new( cfg: &Config, tablet_registry: &TabletRegistry<EK>, key_manager: Option<&DataKeyManager>, snap_mgr: &TabletSnapManager, storage: Storage<EK, ER>, ) -> Result<Self> { ... let raft_group = RawNode::new(&raft_cfg, storage, &logger)?; let region = raft_group.store().region_state().get_region().clone(); let tablet_index = storage.region_state().get_tablet_index(); if tablet_index != 0 { raft_group .store() .recover_tablet(tablet_registry, ...); let mut ctx = TabletContext::new(®ion, Some(tablet_index)); tablet_registry.load(ctx, false)?; } let cached_tablet = tablet_registry.get_or_default(region_id); let mut peer = Peer { tablet: cached_tablet, ... raft_group, ... }; ... Ok(peer) }}
components/batch-system/src/batch.rs
构建 poller
构建 StorePoller
启动多个异步任务 poller.poll
,等待 normalScheduler
/controlScheduler
发来的 fsm
,然后交给 StorePoller
的 handle_normal
/ handle_control
对 fsm
进行处理
pub fn spawn<B>(&mut self, name_prefix: String, mut builder: B) where B: HandlerBuilder<N, C>, B::Handler: Send + 'static, { for i in 0..self.pool_size { self.start_poller( Priority::Normal, &mut builder, ); } ... } fn start_poller<B>(&mut self, name: String, priority: Priority, builder: &mut B) where B: HandlerBuilder<N, C>, B::Handler: Send + 'static, { let handler = builder.build(priority); let receiver = match priority { Priority::Normal => self.receiver.clone(), ... }; let mut poller = Poller { router: self.router.clone(), fsm_receiver: receiver, handler, ... }; let t = thread::Builder::new() .name(name) .spawn_wrapper(move || { ... poller.poll(); }) .unwrap(); self.workers.lock().unwrap().push(t); }
components/raftstore-v2/src/batch/store.rs
构建 StorePoller
,其 handle_normal
/ handle_control
方法会对 fsm
进行解析,获取其 msg
,递交给 raft
模块进行进一步处理
对于 raft
模块生成的 raftlog
,会使用 schedulers.write
通过 async_write/raftLogEngine
将日志存储到 rocksdb
进行持久化
对于 raft
模块生成的 apply entries
,会通过 ApplyFsm
使用 tablet_registry
存储到 rocksdb
fn build(&mut self, _priority: batch_system::Priority) -> Self::Handler { ... let mut poll_ctx = StoreContext { store_id: self.store_id, trans: self.trans.clone(), ... router: self.router.clone(), ... schedulers: self.schedulers.clone(), store_meta: self.store_meta.clone(), ... engine: self.engine.clone(), tablet_registry: self.tablet_registry.clone(), apply_pool: self.apply_pool.clone(), ... }; ... StorePoller::new(poll_ctx, ...) }
poll
函数主要是循环等待 channel
的接收端 fsm_receiver
发送 fsm
,然后使用 handler
,也就是 StorePoller
进行处理。
components/batch-system/src/batch.rs
pub fn poll(&mut self) { ... let mut run = true; while run && self.fetch_fsm(&mut batch) { ... if batch.control.is_some() { let len = self.handler.handle_control(batch.control.as_mut().unwrap()); ... } for (i, p) in batch.normals.iter_mut().enumerate() { ... let res = self.handler.handle_normal(p); ... } let mut fsm_cnt = batch.normals.len(); while batch.normals.len() < max_batch_size { if let Ok(fsm) = self.fsm_receiver.try_recv() { run = batch.push(fsm); } if !run || fsm_cnt >= batch.normals.len() { break; } let p = batch.normals[fsm_cnt].as_mut().unwrap(); let res = self.handler.handle_normal(p); } ... } ... }fn fetch_fsm(&mut self, batch: &mut Batch<N, C>) -> bool { if batch.control.is_some() { return true; } if let Ok(fsm) = self.fsm_receiver.try_recv() { return batch.push(fsm); } if batch.is_empty() { self.handler.pause(); if let Ok(fsm) = self.fsm_receiver.recv() { return batch.push(fsm); } } !batch.is_empty() }
前面 StoreSystem::start
发出的 StoreMsg::Start
消息会触发 StorePoller::handle_control
,进而触发 ApplyFsm
的构建:
components/raftstore-v2/src/batch/store.rs
fn handle_control(&mut self, fsm: &mut StoreFsm) -> Option<usize> { ... let mut delegate = StoreFsmDelegate::new(fsm, &mut self.poll_ctx); delegate.handle_msgs(&mut self.store_msg_buf); ... } components/raftstore-v2/src/fsm/peer.rs: pub fn handle_msgs(&mut self, store_msg_buf: &mut Vec<StoreMsg>) where T: Transport, { for msg in store_msg_buf.drain(..) { match msg { StoreMsg::Start => self.on_start(), StoreMsg::Tick(tick) => self.on_tick(tick), ... } } } fn on_start(&mut self, watch: Option<Arc<ReplayWatch>>) { ... if self.fsm.peer.storage().is_initialized() { self.fsm.peer.schedule_apply_fsm(self.store_ctx); } ... }
构建 ApplyFsm
启动 handle_all_tasks
异步任务来接受 apply_scheduler
发送过来的 apply entries
components/raftstore-v2/src/operation/command/mod.rs
pub fn schedule_apply_fsm<T>(&mut self, store_ctx: &mut StoreContext<EK, ER, T>) { ... let (apply_scheduler, mut apply_fsm) = ApplyFsm::new( &store_ctx.cfg, self.peer().clone(), region_state, mailbox, ... ); store_ctx .apply_pool .spawn(async move { apply_fsm.handle_all_tasks().await }) .unwrap(); self.set_apply_scheduler(apply_scheduler); }
Apply
流程异步处理任务,用于处理 raft
生成的 CommittedEntries
,对其进行 Apply
components/raftstore-v2/src/fsm/apply.rs
impl<EK: KvEngine, R: ApplyResReporter> ApplyFsm<EK, R> { pub async fn handle_all_tasks(&mut self) { loop { ... let res = futures::select! { res = self.receiver.next().fuse() => res, ... }; ... loop { match task { // TODO: flush by buffer size. ApplyTask::CommittedEntries(ce) => self.apply.apply_committed_entries(ce).await, ... } ... } } }}
这个异步任务专门处理 raft
生成的 raft log
与 raft msg
对于 raft log
,使用 raft_engine
写入到 rocksdb
对于 raft msg
,使用 trans
发送给其他 tikv
实例
components/raftstore/src/store/async_io/write.rs
impl<EK, ER> StoreWriters<EK, ER>where EK: KvEngine, ER: RaftEngine, { pub fn senders(&self) -> WriteSenders<EK, ER> { WriteSenders::new(self.writers.clone()) } pub fn spawn<T: Transport + 'static, N: PersistedNotifier>( &mut self, store_id: u64, raft_engine: ER, kv_engine: Option<EK>, notifier: &N, trans: &T, cfg: &Arc<VersionTrack<Config>>, ) -> Result<()> { let pool_size = cfg.value().store_io_pool_size; if pool_size > 0 { self.increase_to( pool_size, StoreWritersContext { store_id, notifier: notifier.clone(), raft_engine, kv_engine, transfer: trans.clone(), cfg: cfg.clone(), }, )?; } Ok(()) } pub fn increase_to<T: Transport + 'static, N: PersistedNotifier>( &mut self, size: usize, writer_meta: StoreWritersContext<EK, ER, T, N>, ) -> Result<()> { ... self.writers .update(move |writers: &mut SharedSenders<EK, ER>| -> Result<()> { let mut cached_senders = writers.get(); for i in current_size..size { let (tx, rx) = bounded( writer_meta.cfg.value().store_io_notify_capacity, ); let mut worker = Worker::new( ... rx, ... ); let t = thread::Builder::new() .name(thd_name!(tag)) .spawn_wrapper(move || { set_io_type(IoType::ForegroundWrite); worker.run(); })?; cached_senders.push(tx); handlers.push(t); } writers.set(cached_senders); Ok(()) })?; Ok(()) }} impl<EK, ER, N, T> Worker<EK, ER, N, T>where EK: KvEngine, ER: RaftEngine, N: PersistedNotifier, T: Transport, { fn run(&mut self) { let mut stopped = false; while !stopped { let handle_begin = match self.receiver.recv() { Ok(msg) => { stopped |= self.handle_msg(msg); } ... }; ... self.write_to_db(true); } } pub fn write_to_db(&mut self, notify: bool) { ... let mut write_raft_time = 0f64; if !self.batch.raft_wbs[0].is_empty() { ... for i in 0..self.batch.raft_wbs.len() { self.raft_engine .consume_and_shrink( &mut self.batch.raft_wbs[i], ... ) ... } self.batch.raft_wbs.truncate(1); ... } for task in &mut self.batch.tasks { for msg in task.messages.drain(..) { ... if let Err(e) = self.trans.send(msg) { ... }... } } ... }}
BatchSystem
的整体代码充斥着各种异步任务,还有各种 channel
的发送和接收。如果没有对其初始化比较了解的话,可能很难看得懂 msg
是如何在整个系统中流转的。
经过本文的流程梳理,相信大家已经对整体 BatchSystem
有了比较熟悉的认知,后续研究 Raft Msg
流转,或者 Region
的 Merge
与 Split
应该比较有信心了。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。