raft-rs 示例程序源码解析

网友投稿 484 2024-02-24



前言

raft-rs 的 5 节点示例程序稍微比较复杂一些,但是看懂的话,就会对 raft 的使用得心应手。

raft-rs 示例程序源码解析

示例程序

Node 结构体

struct Node {     // None if the raft is not initialized.     raft_group: Option<RawNode<MemStorage>>,     my_mailbox: Receiver<Message>,     mailboxes: HashMap<u64, Sender<Message>>,     // Key-value pairs after applied. `MemStorage` only contains raft logs,     // so we need an additional storage engine.     kv_pairs: HashMap<u16, String>, }

示例程序中,Node 是使用 RAFT 的外部应用,代表 RAFT 的一个节点应用程序,其中 raft_group 就是上一篇文章所说的 RawNode,是 RAFT 对外的接口,也就是 Node 节点内部运行的 RAFT;

my_mailbox 是 Node 接受其他 Node 信息的窗口,mailboxes 是 Node 发送给其他 Node 信息的窗口;

kv_pairs 是 request 最后 apply 的结果。

应用启动

fn main() {     const NUM_NODES: u32 = 5;     // Create 5 mailboxes to send/receive messages. Every node holds a `Receiver` to receive     // messages from others, and uses the respective `Sender` to send messages to others.     let (mut tx_vec, mut rx_vec) = (Vec::new(), Vec::new());     for _ in 0..NUM_NODES {         let (tx, rx) = mpsc::channel();         tx_vec.push(tx);         rx_vec.push(rx);     }     // A global pending proposals queue. New proposals will be pushed back into the queue, and     // after its committed by the raft cluster, it will be poped from the queue.     let proposals = Arc::new(Mutex::new(VecDeque::<Proposal>::new()));     for (i, rx) in rx_vec.into_iter().enumerate() {         // A map[peer_id -> sender]. In the example we create 5 nodes, with ids in [1, 5].         let mailboxes = (1..6u64).zip(tx_vec.iter().cloned()).collect();         let mut node = match i {             // Peer 1 is the leader.             0 => Node::create_raft_leader(1, rx, mailboxes, &logger),             // Other peers are followers.             _ => Node::create_raft_follower(rx, mailboxes),         };   }   // Propose some conf changes so that followers can be initialized.   add_all_followers(proposals.as_ref());      ... }

从上述代码可以看到,示例程序先创建了 5 对 channel,这些 channel 是示例程序模拟真实应用的 transport 接口。

在创建 RAFT 5 个 node 节点的时候,每个 node 节点都会选择 5 对 channel 其中一个接收端作为自己的 my_mailbox,作为接收窗口,接收其他 peer node 节点的 msg。

然后复制全部其他的 5 个 channel 的发送端,作为 Node 节点的发送窗口,每个发送端对应一个 peer node 节点,向这些 channel 发送端发送 message,相应的 peer node 节点的 channel 接收端就会接收到消息。

create_raftfn create_raft_leader(r         id: u64,         my_mailbox: Receiver<Message>,         mailboxes: HashMap<u64, Sender<Message>>,         logger: &slog::Logger,     ) -> Self {         let mut cfg = example_config();         cfg.id = id;                  let mut s = Snapshot::default();         let storage = MemStorage::new();         storage.wl().apply_snapshot(s).unwrap();         let raft_group = Some(RawNode::new(&cfg, storage, &logger).unwrap());         Node {             raft_group,             my_mailbox,             mailboxes,             kv_pairs: Default::default(),         }     }      fn create_raft_follower(         my_mailbox: Receiver<Message>,         mailboxes: HashMap<u64, Sender<Message>>,     ) -> Self {         Node {             raft_group: None,             my_mailbox,             mailboxes,             kv_pairs: Default::default(),         }     }

示例中 create_raft_follower 并没有创建 RAFT,而是等待新消息进入再创建,这个我们后面再说。

现在我们成功创建了 Leader 节点和 4 个 Follower 节点,但是 Follower 节点上面并没有运行 RAFT 程序,也就是说现在 RAFT 集群现在只有 Leader 节点一个,其他 Follower 节点上面没有运行 RAFT 模块。

接下来,我们需要要求 Leader 节点的 RAFT 程序提出配置变更要求,方法就是调用 propose 接口,并且传入 ConfChange-AddNode 的 Msg 参数:

fn add_all_followers(proposals: &Mutex<VecDeque<Proposal>>) {     for i in 2..6u64 {         let mut conf_change = ConfChange::default();         conf_change.node_id = i;         conf_change.set_change_type(ConfChangeType::AddNode);         loop {             let (proposal, rx) = Proposal::conf_change(&conf_change);             proposals.lock().unwrap().push_back(proposal);             if rx.recv().unwrap() {                 break;             }             thread::sleep(Duration::from_millis(100));         }     } }propose_conf_change

Leader 节点将 AddNode 的请求发送给内部 RAFT 程序,调用了 propose_conf_change 接口:

fn main() {     ...     if raft_group.raft.state == StateRole::Leader {       // Handle new proposals.       let mut proposals = proposals.lock().unwrap();       for p in proposals.iter_mut().skip_while(|p| p.proposed > 0) {         propose(raft_group, p);       }     }     ... } fn propose(raft_group: &mut RawNode<MemStorage>, proposal: &mut Proposal) {     let last_index1 = raft_group.raft.raft_log.last_index() + 1;     ...     } else if let Some(ref cc) = proposal.conf_change {         let _ = raft_group.propose_conf_change(vec![], cc.clone());     }     ...     let last_index2 = raft_group.raft.raft_log.last_index() + 1;     if last_index2 == last_index1 {         // Propose failed, dont forget to respond to the client.         proposal.propose_success.send(false).unwrap();     } else {         proposal.proposed = last_index1;     } }ready

Leader 节点调用 propose_conf_change 后,就需要调用 ready 函数等待内部 RAFT 程序处理 Msg 完成。

值得注意的是,一般来说,如何处理 ready 函数返回的 Ready 结构体是示例应用程序的关键:

fn main() {     ...     // Handle readies from the raft.     on_ready(       raft_group,       &mut node.kv_pairs,       &node.mailboxes,       &proposals,       &logger,     );     ... } fn on_ready(     raft_group: &mut RawNode<MemStorage>,     kv_pairs: &mut HashMap<u16, String>,     mailboxes: &HashMap<u64, Sender<Message>>,     proposals: &Mutex<VecDeque<Proposal>>,     logger: &slog::Logger, ) {     if !raft_group.has_ready() {         return;     }     let store = raft_group.raft.raft_log.store.clone();     // Get the `Ready` with `RawNode::ready` interface.     let mut ready = raft_group.ready();     ...     if !ready.messages().is_empty() {         // Send out the messages come from the node.         handle_messages(ready.take_messages());     }     // Apply the snapshot. Its necessary because in `RawNode::advance` we stabilize the snapshot.     if *ready.snapshot() != Snapshot::default() {         let s = ready.snapshot().clone();         if let Err(e) = store.wl().apply_snapshot(s) {             ...         }     }     ...     // Apply all committed entries.     handle_committed_entries(raft_group, ready.take_committed_entries());     // Persistent raft logs. Its necessary because in `RawNode::advance` we stabilize     // raft logs to the latest position.     if let Err(e) = store.wl().append(ready.entries()) {         ...     }     if let Some(hs) = ready.hs() {         // Raft HardState changed, and we need to persist it.         store.wl().set_hardstate(hs.clone());     }     if !ready.persisted_messages().is_empty() {         // Send out the persisted messages come from the node.         handle_messages(ready.take_persisted_messages());     }     // Call `RawNode::advance` interface to update position flags in the raft.     let mut light_rd = raft_group.advance(ready);     // Update commit index.     if let Some(commit) = light_rd.commit_index() {         store.wl().mut_hard_state().set_commit(commit);     }     // Send out the messages.     handle_messages(light_rd.take_messages());     // Apply all committed entries.     handle_committed_entries(raft_group, light_rd.take_committed_entries());     // Advance the apply index.     raft_group.advance_apply(); }

调用 has_ready 函数来判断内部 RAFT 模块是否处理信息完毕

调用 ready 函数获取 Ready 结构体

获取 Ready 结构体内部的 messages 信息,并且调用 handle_message 函数将 Msg 发送到其他 peer node 节点。值得注意的是,按照论文来说此时只有 Leader 才可以并行执行 Msg 发送和日志落盘。因此只有 Leader 节点调用 ready.messages().is_empty() 才是 false,Follower 都是 true。

获取 Ready 结构体的 snapshot,并将其应用到 RAFT 日志里面去,落盘 snapshot

获取 Ready 结构体已经 Committed 的 Log Entries,也就是经过大多数节点确认的消息,此时因为只有 Leader 才运行 RAFT 模块,因此 conf_change 这个 Log Entries 已经是 Committed Entries 的了。这时候,应用需要 apply 这些 log entries

获取 Ready 结构体的普通 Log Entries,落盘处理

如果 hardstate 有变化,那么需要落盘日志

由于日志已经落盘,可以获取 Ready 结构体的 Msg 信息发送到其他 peer node 节点,take_persisted_messages 是对 Follower 的节点起作用的,因为 Leader 已经在落盘前并行发送了 Msg

调用 advance 接口,更新 RAFT 模块的状态

advance 接口会返回新的 commit index,应用需要持久化到日志磁盘

由于调用 ready 和 advance 接口之间不允许调用 step、propose、campaign 等等接口,因此对于 follower 来说,light_rd.take_messages 肯定返回空。但是对于 leader 来说,由于 ready 后又落盘了一些 Entries,如果这些 Entries 已经收到 大多数 peer node 的 msgAppendRespone,那么 commit index 也要推进到这些 Entries 的 last index,并且需要将新的 commit index 信息发送给 followers

advance 接口会返回新的 committed entries,应用需要继续 apply 这些 entries

更新 RAFT 模块的 apply index

其中,handle_messages 逻辑很简单,就是轮询各个 channel 的发送端,将消息发送到相应的 peer node:

let handle_messages = |msgs: Vec<Message>| {   for msg in msgs {     let to = msg.to;     if mailboxes[&to].send(msg).is_err() {       ...     }   } };

其中处理 committed entries 的逻辑也很简单,

如果 entries 的类型是 confChange 的话,就调用 RAFT 的 apply_conf_change 函数,并且落盘到日志磁盘中。因为 raft-rs 的 joint consensus 是需要 conf change entries 在 commit 后才起作用的,必须调用 apply_conf_change 函数才能进行真正的配置变更。

如果 entries 的类型是普通类型的 entries 的话,就存储到 kv_pairs 当中去。

let mut handle_committed_entries =         |rn: &mut RawNode<MemStorage>, committed_entries: Vec<Entry>| {           for entry in committed_entries {             if entry.data.is_empty() {               // From new elected leaders.               continue;             }             if let EntryType::EntryConfChange = entry.get_entry_type() {               // For conf change messages, make them effective.               let mut cc = ConfChange::default();               cc.merge_from_bytes(&entry.data).unwrap();               let cs = rn.apply_conf_change(&cc).unwrap();               store.wl().set_conf_state(cs);             } else {               // For normal proposals, extract the key-value pair and then               // insert them into the kv engine.               let data = str::from_utf8(&entry.data).unwrap();               let reg = Regex::new("put ([0-9]+) (.+)").unwrap();               if let Some(caps) = reg.captures(data) {                 kv_pairs.insert(caps[1].parse().unwrap(), caps[2].to_string());               }             }             if rn.raft.state == StateRole::Leader {               // The leader should response to the clients, tell them if their proposals               // succeeded or not.               let proposal = proposals.lock().unwrap().pop_front().unwrap();               proposal.propose_success.send(true).unwrap();             }           } };step

当 Leader 调用 handle_messages 函数将 msg 发送给 followers 的 channel 发送端后,followers 的 channel 接收端就会收到消息:

fn main() {     ...     let handle = thread::spawn(move || loop {       thread::sleep(Duration::from_millis(10));       loop {         // Step raft messages.         match node.my_mailbox.try_recv() {           Ok(msg) => node.step(msg, &logger),           Err(TryRecvError::Empty) => break,           Err(TryRecvError::Disconnected) => return,         }       }       ... }        fn step(&mut self, msg: Message, logger: &slog::Logger) {   if self.raft_group.is_none() {     if is_initial_msg(&msg) {       self.initialize_raft_from_message(&msg, logger);     } else {       return;     }   }   let raft_group = self.raft_group.as_mut().unwrap();   let _ = raft_group.step(msg); }

由于 follower 在启动的时候并没有创建 RAFT 模块,因此 raft_group 是空的,这时候就会调用 initialize_raft_from_message:

fn initialize_raft_from_message(&mut self, msg: &Message, logger: &slog::Logger) {   if !is_initial_msg(msg) {     return;   }   let mut cfg = example_config();   cfg.id = msg.to;   let logger = logger.new(o!("tag" => format!("peer_{}", msg.to)));   let storage = MemStorage::new();   self.raft_group = Some(RawNode::new(&cfg, storage, &logger).unwrap()); } fn is_initial_msg(msg: &Message) -> bool {     let msg_type = msg.get_msg_type();     msg_type == MessageType::MsgRequestVote         || msg_type == MessageType::MsgRequestPreVote         || (msg_type == MessageType::MsgHeartbeat && msg.commit == 0) }

直到这个时候,Leader 和 follower 才组成 5 节点的 RAFT 集群。

propose

示例程序造出来了 100 个请求,并且让 Leader node 通过 propose 函数发送给内部的 RAFT 模块。

接下来,Leader node 的 on_ready 函数就会接收到 RAFT 模块的 Ready 结构体,解析后发送相关的 msgAppend 给 followers 的 mailboxs

followers node 通过 my_mailbox 接收到请求后,会调用 step 函数传入 follower node 内部的 RAFT 模块。

followers node 通过 on_ready 函数接收到 RAFT 模块的 Ready 结构体,解析后发送 msgAppendRespone 给 leader node 的 mailbox

Leader node 的 my_mailbox 接收到请求后,继续调用 step 将消息传入 leader 的 RAFT,RAFT 解析 msgAppendRespone 中的 index,并且更新其 committed index

leader node 的 on_ready 函数接收到 RAFT 模块的 Ready 结构体,分析出其中的 committed entries,将其存储到 kv_pairs,并且返回给客户端成功。接着还会发送 message 给 followers 的 mailboxs 最新的 commit index

followers 通过 my_maibox 收到消息后,继续调用 step 函数传入 RAFT,RAFT 模块根据 message 更新自身的 commit index

followers 调用 on_ready 函数解析出 committed entries,将其存储到自身的 kv_pairs。

至此,5 个节点的 kv_pairs 都含有用户请求的 data 数据。

fn main() {     let handle = thread::spawn(move || loop {       if raft_group.raft.state == StateRole::Leader {         // Handle new proposals.         let mut proposals = proposals.lock().unwrap();         for p in proposals.iter_mut().skip_while(|p| p.proposed > 0) {           propose(raft_group, p);         }       }     }     ...            (0..100u16)

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:脚本实现mssql快速批量导入SQL脚本(mssql批量导入sql)
下一篇:SQL Server出现错误0:解决方案(sqlserver错误0)
相关文章