免费试用
作者:PingCAP
产品技术解读
2024-08-02

导读

TiKV 是一个支持事务的分布式 Key-Value 数据库,目前已经是 CNCF 基金会的顶级项目。它通过 Raft 协议实现数据的高可用性和强一致性,是 TiDB 分布式数据库系统的重要组成部分。本文作为 TiKV 源码解读系列的增补,详细介绍了 TiKV 8.2.0 版本中 Raft 快照的生成、发送、接收和应用的具体实现。

Snapshot 的发送和接收中,我们详细介绍了 Raft 快照的发送和接收机制。本篇文章作为前文的补充,将概述 TiKV 中 Raft 快照的整体流程,并详细介绍涉及的代码路径。本文所讨论的代码基于最近的 8.2.0 版本。

背景

在 TiKV 中,数据空间被切分成各个连续的范围,称为 Region。每个 Region 由一个单独的 Raft 组管理,基于 Raft 协议保证容错性。每个 Raft 组包含多个 Peer,每个 Peer 在不同的 TiKV 节点上运行。

TiKV 中的 Region 和 Raft 组

图 1. TiKV 中的 Region 和 Raft 组

在 Raft 协议中,Leader 节点负责将最新的日志条目通过 Append RPC 发送给 Followers,以确保所有节点的数据一致性。为防止磁盘空间的无限制增长,Leader 节点会定期清理过时的本地 Raft 日志。然而,如果某个 Follower 落后太多,以至于其请求的日志条目已经被 Leader 清理,传统的 Append RPC 将无法继续同步数据。在这种情况下,Leader 节点将采取替代措施,向落后的 Follower 发送一个 Raft 快照。这个快照包含了 Region 在某一特定时间点的完整状态快照,不仅包括存储于 RocksDB 中的数据,还有 Raft 协议的状态信息,例如任期号(term)和所对应的日志索引(index)。在 TiKV 的实际应用中,快照机制通常在 Region 初始化、发生分裂或进行扩展等关键操作时被触发,以确保数据的一致性和系统的稳定性。

概述

TiKV 中的 Raft 快照过程大致分为四个阶段:

1. 生成: Raft Leader 生成一份快照,记录下 Raft 和 RocksDB 在当前时间点的状态。

2. 发送:Raft Leader 通过网络把快照发送给 Follower。

3. 接收: Raft Follower 接收快照并暂时存储。

4. 应用: Raft Follower 将快照应用到 Raft 状态机和 RocksDB 数据中。

以下图表更详细地描述了快照过程:

TiKV 中的 Raft 快照过程

图 2. TiKV 中的 Raft 快照过程

核心原理

本文将详细介绍 Raft 快照实现的原理和流程,这个部分将介绍核心环节的设计思路,在代码路径详解中我们会逐步介绍细节。

快照元数据和数据的分离

Raft 消息 (protobuf definition:https://github.com/pingcap/kvproto/blob/df42997c2c57536219c67253966ede4d61d25757/include/eraftpb.proto#L77) 有一个 snapshot 字段,可以存储快照数据和元数据。一个简单的做法是将快照数据嵌入到 Raft 消息中,通过 Raft Peer 之间的标准通信通道传输(如图顶部的消息队列所示)。但问题在于 Raft 快照比其它消息大得多,将快照数据放入 Raft 消息中可能会阻塞正常的 Raft 消息处理逻辑。

因此,TiKV 中的一个设计选择是,Raft 快照消息(表示为 MsgSnapshot)仅包含快照的元数据。实际的快照数据作为文件保存在磁盘上。快照文件以及快照消息通过专用的 gRPC 流连接由 Snap Worker 发送(如步骤 7 和 8 所示)。使用 gRPC 流可以将数据分成更小的块以进行高效传输。

从 ApplyFsm 调度 RegionTask::Gen

Region Worker 负责生成和应用快照数据。例如,在快照的应用过程中,PeerStorage 调度了一个 RegionTask::Apply 任务给 Region Worker(步骤 11)。但是快照的生成过程略有不同,尽管 PeerStorage 依然是整个过程的发起点,但是 RegionTask::Gen 任务是通过 ApplyFsm 来调度的(步骤 2 和 3)。为什么 PeerStorage 不直接进行 RegionTask::Gen 的调度呢?

这是为了控制快照生成的时间点,让快照尽可能地包含最新的数据。在 Raft 批处理系统中,Raft 信息是分批处理的。同一批信息中可能同时包含快照请求和新的写入请求,而我们希望快照在同批次的写入都完成之后生成。因此,Raft 批处理系统会先把同批次的所有写入任务发给 Apply 批处理系统处理,然后再分派快照任务(ApplyTask::Snapshot)。这样,因为 ApplyFsm 是对任务依次进行处理的,当它处理到快照任务的时候,同批次的写入已经完成。

代码路径详解

我们将逐步介绍不同阶段的代码路径,各步骤与上图一一对应。不过这里包含的代码片段很简化,省略了许多细节,仅用于展示大致的流程。代码基于 TiKV 8.2.0 版本。

快照生成

步骤 1: GenSnapTask

TiKV 中实现 Raft 共识协议的是 raft-rs 模块 ( https://github.com/tikv/raft-rs ),快照过程在该模块中发起。在 raft-rs 中,Leader 对每一个 Follower 维护一个 Progress 对象,其中记录了该 Follower 所需要的下一个日志索引(pr.next_idx)。Raft leader 在 maybe_send_append 中处理某个 Follower 的 Append RPC 的发送,如果它无法获取前置日志(pr.next_idx - 1)的任期(用于 Append 过程的匹配校验),则需要发送快照。此时会调用 prepare_send_snapshot 函数,触发快照过程。

raft-rs: src/raft.rs
impl RaftCore {
  fn maybe_send_append(..., pr: &mut Progress, ) {
      ...
      let term = self.raft_log.term(pr.next_idx - 1);
      match (term, ents) {
          ...
          _ => {
               // send snapshot if we failed to get term or entries.
               if !self.prepare_send_snapshot(&mut m, pr, to) { 
                   return false;
                }
          }
      }
  }
  fn prepare_send_snapshot(&mut self, m: &mut Message, pr: &mut Progress, to: u64) -> bool {
      let snapshot_r = self.raft_log.snapshot(pr.pending_request_snapshot, to);
      // ...
   }
}
raft-rs: src/raft_log.rs
impl RaftLog {
  pub fn snapshot(&self, request_index: u64, to: u64) -> Result<Snapshot> {
      // ...
      self.store.snapshot(request_index, to)
  }
}

如上所示,快照过程经过若干调用后来到 Storage trait 的 snapshot 方法(定义:https://github.com/tikv/raft-rs/blob/2aefbf627f243dd261b7585ef1250d32efd9dfe7/src/storage.rs#L159 )。在 TiKV 中,Storage trait 的实现是 PeerStorage,其 snapshot 实现如下:

components/raftstore/src/store/peer_storage.rs
impl PeerStorage {
  pub fn snapshot(&self, request_index: u64, to: u64) -> raft::Result<Snapshot> {
      // ...
      let task = GenSnapTask::new(...);
      let mut gen_snap_task = self.gen_snap_task.borrow_mut();
      *gen_snap_task = Some(task);
      Err(raft::Error::Store(
          raft::StorageError::SnapshotTemporarilyUnavailable,
      ))
  }
}

PeerStorage::snapshot构建了一个 GenSnapTask 并将其设置在 gen_snap_task 字段,然后它会返回一个 SnapshotTemporarilyUnavailable 错误,这个错误意味着快照正在生成过程中。在之后的过程中,snapshot() 函数会在 Raft 协议的每次心跳时被重新调用。如果快照生成未完成,它会继续返回 SnapshotTemporarilyUnavailable。当快照生成完毕后(步骤 5),snapshot() 的调用就会返回 Ok。

步骤 2: ApplyTask::Snapshot

Peer::handle_raft_ready_append 函数检查PeerStorage 的gen_snap_task 字段,并将任务发送给 ApplyFsm。如前所述,PeerFsm 会先给 ApplyFsm 发送该批次中所有的写入任务(见 handle_raft_committed_entries 函数),再发送快照任务。

components/raftstore/src/store/peer.rs
impl Peer {
  pub fn handle_raft_ready_append(...){
      // ...
      if !ready.committed_entries().is_empty() {
          self.handle_raft_committed_entries(ctx, ready.take_committed_entries());
      }
      if let Some(mut gen_task) = self.mut_store().take_gen_snap_task() {
          ctx.apply_router.schedule_task(self.region_id, ApplyTask::Snapshot(gen_task));
      }
  }
  fn handle_raft_committed_entries<T>() {
      let mut apply = Apply::new(...)
      ctx.apply_router.schedule_task(self.region_id, ApplyTask::apply(apply));
  }
}

步骤 3: RegionTask::Gen

ApplyFsm 在 ApplyFsm::handle_snapshot 函数中处理快照任务。它将快照任务转换为 RegionTask::Gen ,并发送给 Region Worker。

components/raftstore/src/store/fsm/apply.rs
impl ApplyFsm {
  fn handle_tasks() {
      loop {
          match msg {
              Msg::Snapshot(snap_task) => self.handle_snapshot(..., snap_task),
          }
      }
  }
  fn handle_snapshot(..., snap_task: GenSnapTask) {
      snap_task.generate_and_schedule_snapshot()
  }
}
impl GenSnapTask {
  pub fn generate_and_schedule_snapshot(){
      let snapshot = RegionTask::Gen {...}
      region_sched.schedule(snapshot)
  }
}

步骤 4 和 5: do_snapshot() and notify

Region Runner 定义了 Region 任务的处理逻辑。一系列函数调用到达 do_snapshot,由它完成实际的快照生成工作,包括从 RocksDB 扫描 Region 的数据并写入 SST 文件。注意,快照生成工作(ctx.handle_gen)是在一个单独的线程池中,主要的考虑是因为它耗时较长,避免阻塞其它任务。

components/raftstore/src/store/worker/region.rs
impl Runnable for Runner {
  fn run(&mut self, task: Task<EK::Snapshot>) {
      match task {
          Task::Gen {...} => { 
              let ctx = SnapGenContext {...}
              self.pool.spawn(async move {
                  ctx.handle_gen(...)
              }
          }
      }
  }
}
impl SnapGenContext {
  fn generate_snap( ... ) { 
      let snap = box_try!(store::do_snapshot::<EK>(...));
      notifier.try_send(snap)
  }
  fn handle_gen( ... ) {
      self.generate_snap(...)
  }
}

生成完成后,notifier.try_send(snap) 将快照生成结果发送到一个通道,结果将使 PeerStorage::snapshot() 在下一次调用中返回 Ok。

快照发送

步骤 6 和 7: MsgSnapshot and send_snap()

PeerStorage::snapshot() 成功后返回一个 Snapshot 结果。不过这个快照只包含元数据,快照的数据依然以 SST 文件的形式存储在磁盘上。

src/server/raft_client.rs
impl AsyncRaftSender {
  fn fill_msg(&mut self, ctx: &Context<'_>) {
      // ...
      if msg.get_message().has_snapshot() {
          self.send_snapshot_sock(msg);
          continue;
      }
      // ...
  }
  fn send_snapshot_sock(&self, msg: RaftMessage) {
      if let Err(e) = self.snap_scheduler.schedule(SnapTask::Send {...}) {
          // ...
      }
  }    
}

AsyncRaftSender 拦截快照消息并将其转换为快照发送任务。任务被发送到 Snap Scheduler,由 Snap Worker 来处理。

src/server/snap.rs
impl Runnable for Runner {
  fn run(&mut self, task: Task) {
      match task {
          Task::Send { addr, msg, cb } => {
              let send_task = send_snap(...);
              let task = async move {
                  let res = match send_task {
                      Err(e) => Err(e),
                      Ok(f) => f.await,
                  };
                  // ...
              }
              self.pool.spawn(task);
          }
      }
  }
}
pub fn send_snap(...) {
  // ... 
}

Snap Runner 中定义了 Snap Worker 在处理不同任务时的处理逻辑,对于快照发送任务(Task::Send),Snap Runner 生成一个新的异步任务来运行 send_snap 函数。send_snap 通过打开一个新的 gRPC 流连接来传输快照消息及快照数据。

快照接收

步骤 8 和 9: recv_snap() and MsgSnapshot

在接收端,TiKV 实例看到传入的 gRPC 请求,通过调度一个 recv snap 任务来将请求转发给 Snap Worker。

src/server/service/kv.rs
impl Tikv for Service {
  fn snapshot(...) {
      let task = SnapTask::Recv { stream, sink };
      if let Err(e) = self.snap_scheduler.schedule(task) {...}
  }
}

Snap Worker 在 recv_snap 函数中接收快照元数据和内容。

src/server/snap.rs
impl Runnable for Runner {
  fn run(&mut self, task: Task) {
      match task {
          Task::Recv { ... } => {
              let task = async move {
                  let result = recv_snap(...).await;
              }
              self.pool.spawn(task);
          }
      }
  }
}
fn recv_snap(...) {
  let mut context = RecvSnapContext::new(head, &snap_mgr)?;
  while let Some(item) = stream.next().await {
      // ...
  }
  context.finish(raft_router)
}

快照接收后,context.finish(raft_router) 将快照消息发送到 Raftstore 以触发快照的应用。

快照应用

步骤 10 到 12: apply_snapshot()和 apply_snap()

快照在不同层级和不同地方被应用:

  1. 快照信息被 Raftstore 收到之后,会调用 raft_rs 的 step 函数。经过一系列的调用,到达 Raft::handle_snapshot ,在该函数中恢复 Raft 状态机的日志和配置。
raft-rs: src/raft.rs
impl Raft {
  fn handle_snapshot(&mut self, mut m: Message) {
      if self.restore(m.take_snapshot()) {...}
  }
  pub fn restore(&mut self, snap: Snapshot) -> bool {
      ...
      self.raft_log.restore(snap);
  }
}
  1. Raftstore 在对 ready 处理时 Peer::handle_raft_ready_append 会调用 PeerStorage::handle_raft_ready函数,该函数会调用PeerStorage::apply_snapshot 来更新 Peer 的状态。在 Snapshot 被持久化之后,PeerStorage::on_persist_snapshot 函数会被调用,它会进一步调用PeerStorage::persist_snapshot 将快照应用任务发送给 Region Worker。
components/raftstore/src/store/peer_storage.rs
impl PeerStorage {
  pub fn apply_snapshot() {...}
  pub fn persist_snapshot(&mut self, res: &PersistSnapshotResult) {
      self.schedule_applying_snapshot();
  }
  pub fn schedule_applying_snapshot(&mut self) {
      let task = RegionTask::Apply {}
      if let Err(e) = self.region_scheduler.schedule(task) {...}
  }    
}
  1. Region Worker 对快照应用任务进行处理,经过一系列调用会来到 Runner::apply_snap,它会将 Region 中的数据更新为快照中的数据。
components/raftstore/src/store/worker/region.rs
impl Runnable for Runner {
  fn run(&mut self, task: Task<EK::Snapshot>) {
      match task {
          Task::Apply { .. } => {
              self.pending_applies.push_back(task);
              self.handle_pending_applies(false);    
          }
      }
  }
  fn handle_pending_applies(&mut self, is_timeout: bool) {
      while !self.pending_applies.is_empty() {
          if let Some(Task::Apply { region_id, .. }) = self.pending_applies.front() {
              if let Some(Task::Apply {}) = self.pending_applies.pop_front() {
                  self.handle_apply(region_id, peer_id, status);
              }
          }
      }
  }
  fn handle_apply(&mut self, region_id: u64, peer_id: u64, status: Arc<AtomicUsize>) {
      match self.apply_snap(region_id, peer_id, Arc::clone(&status)) {...}
  }
  fn apply_snap(&mut self, region_id: u64, peer_id: u64, abort: Arc<AtomicUsize>) -> Result<()> {
      // ...    
  }
}

总结

以上便是 TiKV 中与快照相关的代码路径概览。希望通过本文的介绍,能帮助读者更深入地理解 TiKV 中的 Raft 快照机制及其实现细节,从而更有效地进行源码阅读和学习。

新经济行业内容专区上线,为新经济企业数据库选型和应用提供深入洞察和可靠参考路径。