From eeaee89a4eba7aa17abd630388df2f1c3a0f76ad Mon Sep 17 00:00:00 2001 From: Nick Cameron Date: Fri, 12 Jul 2019 19:25:04 +1200 Subject: [PATCH] Port #243 to the Prost branch (#259) Signed-off-by: Nick Cameron --- proto/proto/eraftpb.proto | 1 + proto/src/prost/eraftpb.rs | 2 + proto/src/prost/wrapper_eraftpb.rs | 12 + src/errors.rs | 5 + src/progress.rs | 41 ++- src/raft.rs | 88 ++++++- src/raft_log.rs | 12 +- src/raw_node.rs | 6 + src/storage.rs | 42 +++- tests/integration_cases/test_raft.rs | 293 +++++++++++++++++++++- tests/integration_cases/test_raft_snap.rs | 81 ++++++ tests/test_util/mod.rs | 4 + 12 files changed, 553 insertions(+), 34 deletions(-) diff --git a/proto/proto/eraftpb.proto b/proto/proto/eraftpb.proto index e7c25f50c..858bb8ee6 100644 --- a/proto/proto/eraftpb.proto +++ b/proto/proto/eraftpb.proto @@ -71,6 +71,7 @@ message Message { repeated Entry entries = 7; uint64 commit = 8; Snapshot snapshot = 9; + uint64 request_snapshot = 13; bool reject = 10; uint64 reject_hint = 11; bytes context = 12; diff --git a/proto/src/prost/eraftpb.rs b/proto/src/prost/eraftpb.rs index 9e6e7d650..7e82059d5 100644 --- a/proto/src/prost/eraftpb.rs +++ b/proto/src/prost/eraftpb.rs @@ -61,6 +61,8 @@ pub struct Message { pub commit: u64, #[prost(message, optional, tag = "9")] pub snapshot: ::std::option::Option, + #[prost(uint64, tag = "13")] + pub request_snapshot: u64, #[prost(bool, tag = "10")] pub reject: bool, #[prost(uint64, tag = "11")] diff --git a/proto/src/prost/wrapper_eraftpb.rs b/proto/src/prost/wrapper_eraftpb.rs index c5e82f43d..a8c865152 100644 --- a/proto/src/prost/wrapper_eraftpb.rs +++ b/proto/src/prost/wrapper_eraftpb.rs @@ -364,6 +364,18 @@ impl Message { self.snapshot.take().unwrap_or_else(Snapshot::default) } #[inline] + pub fn clear_request_snapshot(&mut self) { + self.request_snapshot = 0 + } + #[inline] + pub fn set_request_snapshot(&mut self, v: u64) { + self.request_snapshot = v; + } + #[inline] + pub fn get_request_snapshot(&self) -> u64 { + self.request_snapshot + } + #[inline] pub fn clear_reject(&mut self) { self.reject = false } diff --git a/src/errors.rs b/src/errors.rs index f5648ef37..326ae5350 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -69,6 +69,10 @@ quick_error! { NotExists(id: u64, set: &'static str) { display("The node {} is not in the {} set.", id, set) } + /// The request snapshot is dropped. + RequestSnapshotDropped { + description("raft: request snapshot dropped") + } } } @@ -82,6 +86,7 @@ impl cmp::PartialEq for Error { (&Error::Io(ref e1), &Error::Io(ref e2)) => e1.kind() == e2.kind(), (&Error::StepLocalMsg, &Error::StepLocalMsg) => true, (&Error::ConfigInvalid(ref e1), &Error::ConfigInvalid(ref e2)) => e1 == e2, + (&Error::RequestSnapshotDropped, &Error::RequestSnapshotDropped) => true, _ => false, } } diff --git a/src/progress.rs b/src/progress.rs index ee9248ec3..ea6dc005c 100644 --- a/src/progress.rs +++ b/src/progress.rs @@ -26,6 +26,7 @@ // limitations under the License. use crate::errors::Error; +use crate::raft::INVALID_INDEX; use fxhash::FxHashMap; use std::cmp; use std::collections::hash_map::HashMap; @@ -189,6 +190,10 @@ pub struct Progress { /// this Progress will be paused. raft will not resend snapshot until the pending one /// is reported to be failed. pub pending_snapshot: u64, + /// This field is used in request snapshot. + /// If there is a pending request snapshot, this will be set to the request + /// index of the snapshot. + pub pending_request_snapshot: u64, /// This is true if the progress is recently active. Receiving any messages /// from the corresponding follower indicates the progress is active. @@ -248,8 +253,8 @@ impl Progress { self.pending_snapshot = 0; } - /// Unsets pendingSnapshot if Match is equal or higher than - /// the pendingSnapshot + /// Unsets pending_snapshot if matched is equal or higher than + /// the pending_snapshot and the snapshot is not requested. pub fn maybe_snapshot_abort(&self) -> bool { self.state == ProgressState::Snapshot && self.matched >= self.pending_snapshot } @@ -278,25 +283,41 @@ impl Progress { /// Returns false if the given index comes from an out of order message. /// Otherwise it decreases the progress next index to min(rejected, last) /// and returns true. - pub fn maybe_decr_to(&mut self, rejected: u64, last: u64) -> bool { + pub fn maybe_decr_to(&mut self, rejected: u64, last: u64, request_snapshot: u64) -> bool { if self.state == ProgressState::Replicate { // the rejection must be stale if the progress has matched and "rejected" // is smaller than "match". - if rejected <= self.matched { + // Or rejected equals to matched and request_snapshot is the INVALID_INDEX. + if rejected < self.matched + || (rejected == self.matched && request_snapshot == INVALID_INDEX) + { return false; } - self.next_idx = self.matched + 1; + if request_snapshot == INVALID_INDEX { + self.next_idx = self.matched + 1; + } else { + self.pending_request_snapshot = request_snapshot; + } return true; } - // the rejection must be stale if "rejected" does not match next - 1 - if self.next_idx == 0 || self.next_idx - 1 != rejected { + // The rejection must be stale if "rejected" does not match next - 1. + // Do not consider it stale if it is a request snapshot message. + if (self.next_idx == 0 || self.next_idx - 1 != rejected) + && request_snapshot == INVALID_INDEX + { return false; } - self.next_idx = cmp::min(rejected, last + 1); - if self.next_idx < 1 { - self.next_idx = 1; + // Do not decrease next index if it's requesting snapshot. + if request_snapshot == INVALID_INDEX { + self.next_idx = cmp::min(rejected, last + 1); + if self.next_idx < 1 { + self.next_idx = 1; + } + } else if self.pending_request_snapshot == INVALID_INDEX { + // Allow requesting snapshot even if it's not Replicate. + self.pending_request_snapshot = request_snapshot; } self.resume(); true diff --git a/src/raft.rs b/src/raft.rs index a6e201d2e..97fab9baa 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -106,6 +106,10 @@ pub struct Raft { /// The maximum length (in bytes) of all the entries. pub max_msg_size: u64, + /// The peer is requesting snapshot, it is the index that the follower + /// needs it to be included in a snapshot. + pub pending_request_snapshot: u64, + prs: Option, /// The current role of this node. @@ -243,6 +247,7 @@ impl Raft { raft_log, max_inflight: c.max_inflight_msgs, max_msg_size: c.max_size_per_msg, + pending_request_snapshot: INVALID_INDEX, prs: Some(ProgressSet::new(peers.len(), learners.len())), state: StateRole::Follower, is_learner: false, @@ -447,7 +452,7 @@ impl Raft { } m.set_msg_type(MessageType::MsgSnapshot); - let snapshot_r = self.raft_log.snapshot(); + let snapshot_r = self.raft_log.snapshot(pr.pending_request_snapshot); if let Err(e) = snapshot_r { if e == Error::Store(StorageError::SnapshotTemporarilyUnavailable) { debug!( @@ -520,17 +525,24 @@ impl Raft { if pr.is_paused() { return; } - let term = self.raft_log.term(pr.next_idx - 1); - let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size); let mut m = Message::default(); m.set_to(to); - if term.is_err() || ents.is_err() { - // send snapshot if we failed to get term or entries + if pr.pending_request_snapshot != INVALID_INDEX { + // Check pending request snapshot first to avoid unnecessary loading entries. if !self.prepare_send_snapshot(&mut m, pr, to) { return; } } else { - self.prepare_send_entries(&mut m, pr, term.unwrap(), ents.unwrap()); + let term = self.raft_log.term(pr.next_idx - 1); + let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size); + if term.is_err() || ents.is_err() { + // send snapshot if we failed to get term or entries. + if !self.prepare_send_snapshot(&mut m, pr, to) { + return; + } + } else { + self.prepare_send_entries(&mut m, pr, term.unwrap(), ents.unwrap()); + } } self.send(m); } @@ -625,6 +637,7 @@ impl Raft { self.pending_conf_index = 0; self.read_only = ReadOnly::new(self.read_only.option); + self.pending_request_snapshot = INVALID_INDEX; let (last_index, max_inflight) = (self.raft_log.last_index(), self.max_inflight); let self_id = self.id; @@ -716,9 +729,11 @@ impl Raft { /// Converts this node to a follower. pub fn become_follower(&mut self, term: u64, leader_id: u64) { + let pending_request_snapshot = self.pending_request_snapshot; self.reset(term); self.leader_id = leader_id; self.state = StateRole::Follower; + self.pending_request_snapshot = pending_request_snapshot; info!("{} became follower at term {}", self.tag, self.term); } @@ -1155,7 +1170,7 @@ impl Raft { m.get_index() ); - if pr.maybe_decr_to(m.get_index(), m.get_reject_hint()) { + if pr.maybe_decr_to(m.get_index(), m.get_reject_hint(), m.get_request_snapshot()) { debug!( "{} decreased progress of {} to [{:?}]", self.tag, @@ -1224,7 +1239,8 @@ impl Raft { if pr.state == ProgressState::Replicate && pr.ins.full() { pr.ins.free_first_one(); } - if pr.matched < self.raft_log.last_index() { + // Does it request snapshot? + if pr.matched < self.raft_log.last_index() || pr.pending_request_snapshot != INVALID_INDEX { *send_append = true; } @@ -1332,6 +1348,7 @@ impl Raft { // out the next msgAppend. // If snapshot failure, wait for a heartbeat interval before next try pr.pause(); + pr.pending_request_snapshot = INVALID_INDEX; } /// Check message's progress to decide which action should be taken. @@ -1706,9 +1723,43 @@ impl Raft { Ok(()) } + /// Request a snapshot from a leader. + pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> { + if self.state == StateRole::Leader { + info!( + "{} can not request snapshot on leader; dropping request snapshot", + self.tag + ); + } else if self.leader_id == INVALID_ID { + info!( + "{} no leader at term {}; dropping request snapshot", + self.tag, self.term + ); + } else if self.get_snap().is_some() { + info!( + "{} there is a pending snapshot; dropping request snapshot", + self.tag + ); + } else if self.pending_request_snapshot != INVALID_INDEX { + info!( + "{} there is a pending snapshot; dropping request snapshot", + self.tag + ); + } else { + self.pending_request_snapshot = request_index; + self.send_request_snapshot(); + return Ok(()); + } + Err(Error::RequestSnapshotDropped) + } + // TODO: revoke pub when there is a better way to test. /// For a given message, append the entries to the log. pub fn handle_append_entries(&mut self, m: &Message) { + if self.pending_request_snapshot != INVALID_INDEX { + self.send_request_snapshot(); + return; + } if m.get_index() < self.raft_log.committed { let mut to_send = Message::default(); to_send.set_to(m.get_from()); @@ -1753,6 +1804,10 @@ impl Raft { /// For a message, commit and send out heartbeat. pub fn handle_heartbeat(&mut self, mut m: Message) { self.raft_log.commit_to(m.get_commit()); + if self.pending_request_snapshot != INVALID_INDEX { + self.send_request_snapshot(); + return; + } let mut to_send = Message::default(); to_send.set_to(m.get_from()); to_send.set_msg_type(MessageType::MsgHeartbeatResponse); @@ -1790,7 +1845,10 @@ impl Raft { fn restore_raft(&mut self, snap: &Snapshot) -> Option { let meta = snap.get_metadata(); - if self.raft_log.match_term(meta.get_index(), meta.get_term()) { + // Do not fast-forward commit if we are requesting snapshot. + if self.pending_request_snapshot == INVALID_INDEX + && self.raft_log.match_term(meta.get_index(), meta.get_term()) + { info!( "{} [commit: {}, lastindex: {}, lastterm: {}] fast-forwarded commit to \ snapshot [index: {}, term: {}]", @@ -1833,6 +1891,7 @@ impl Raft { meta.get_term() ); + self.pending_request_snapshot = INVALID_INDEX; let nodes = meta.get_conf_state().get_nodes(); let learners = meta.get_conf_state().get_learners(); self.prs = Some(ProgressSet::new(nodes.len(), learners.len())); @@ -2054,4 +2113,15 @@ impl Raft { pub fn abort_leader_transfer(&mut self) { self.lead_transferee = None; } + + fn send_request_snapshot(&mut self) { + let mut m = Message::default(); + m.set_msg_type(MessageType::MsgAppendResponse); + m.set_index(self.raft_log.committed); + m.set_reject(true); + m.set_reject_hint(self.raft_log.last_index()); + m.set_to(self.leader_id); + m.set_request_snapshot(self.pending_request_snapshot); + self.send(m); + } } diff --git a/src/raft_log.rs b/src/raft_log.rs index a70f7d56d..09b881826 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -384,11 +384,13 @@ impl RaftLog { } /// Returns the current snapshot - pub fn snapshot(&self) -> Result { - self.unstable - .snapshot - .clone() - .map_or_else(|| self.store.snapshot(), Ok) + pub fn snapshot(&self, request_index: u64) -> Result { + if let Some(snap) = self.unstable.snapshot.as_ref() { + if snap.get_metadata().get_index() >= request_index { + return Ok(snap.clone()); + } + } + self.store.snapshot(request_index) } fn must_check_outofbounds(&self, low: u64, high: u64) -> Option { diff --git a/src/raw_node.rs b/src/raw_node.rs index ffd13ac95..2b9730145 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -458,6 +458,12 @@ impl RawNode { let _ = self.raft.step(m); } + /// Request a snapshot from a leader. + /// The snapshot's index must be greater or equal to the request_index. + pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> { + self.raft.request_snapshot(request_index) + } + /// TransferLeader tries to transfer leadership to the given transferee. pub fn transfer_leader(&mut self, transferee: u64) { let mut m = Message::default(); diff --git a/src/storage.rs b/src/storage.rs index e103c986a..fa0c42dae 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -78,7 +78,8 @@ pub trait Storage { /// If snapshot is temporarily unavailable, it should return SnapshotTemporarilyUnavailable, /// so raft state machine could know that Storage needs some time to prepare /// snapshot and call snapshot later. - fn snapshot(&self) -> Result; + /// A snapshot's index must not less than the `request_index`. + fn snapshot(&self, request_index: u64) -> Result; } /// The Memory Storage Core instance holds the actual state of the storage struct. To access this @@ -89,6 +90,9 @@ pub struct MemStorageCore { // TODO: maybe vec_deque // entries[i] has raft log position i+snapshot.get_metadata().get_index() entries: Vec, + // If it is true, the next snapshot will return a + // SnapshotTemporarilyUnavailable error. + trigger_snap_unavailable: bool, } impl Default for MemStorageCore { @@ -98,6 +102,7 @@ impl Default for MemStorageCore { entries: vec![Entry::default()], hard_state: HardState::default(), snapshot: Snapshot::default(), + trigger_snap_unavailable: false, } } } @@ -223,6 +228,11 @@ impl MemStorageCore { Ok(()) } + + /// Trigger a SnapshotTemporarilyUnavailable error. + pub fn trigger_snap_unavailable(&mut self) { + self.trigger_snap_unavailable = true; + } } /// `MemStorage` is a thread-safe implementation of Storage trait. @@ -312,9 +322,17 @@ impl Storage for MemStorage { } /// Implements the Storage trait. - fn snapshot(&self) -> Result { - let core = self.rl(); - Ok(core.snapshot.clone()) + fn snapshot(&self, request_index: u64) -> Result { + let mut core = self.wl(); + if core.trigger_snap_unavailable { + core.trigger_snap_unavailable = false; + Err(Error::Store(StorageError::SnapshotTemporarilyUnavailable)) + } else { + if core.snapshot.get_metadata().get_index() < request_index { + core.snapshot.mut_metadata().set_index(request_index); + } + Ok(core.snapshot.clone()) + } } } @@ -528,11 +546,16 @@ mod test { cs.set_nodes(nodes.clone()); let data = b"data".to_vec(); + let unavailable = Err(RaftError::Store( + StorageError::SnapshotTemporarilyUnavailable, + )); let mut tests = vec![ - (4, Ok(new_snapshot(4, 4, nodes.clone(), data.clone()))), - (5, Ok(new_snapshot(5, 5, nodes.clone(), data.clone()))), + (4, Ok(new_snapshot(4, 4, nodes.clone(), data.clone())), 0), + (5, Ok(new_snapshot(5, 5, nodes.clone(), data.clone())), 5), + (5, Ok(new_snapshot(6, 5, nodes.clone(), data.clone())), 6), + (5, unavailable, 6), ]; - for (i, (idx, wresult)) in tests.drain(..).enumerate() { + for (i, (idx, wresult, windex)) in tests.drain(..).enumerate() { let storage = MemStorage::new(); storage.wl().entries = ents.clone(); @@ -540,7 +563,10 @@ mod test { .wl() .create_snapshot(idx, Some(cs.clone()), data.clone()) .expect("create snapshot failed"); - let result = storage.snapshot(); + if wresult.is_err() { + storage.wl().trigger_snap_unavailable(); + } + let result = storage.snapshot(windex); if result != wresult { panic!("#{}: want {:?}, got {:?}", i, wresult, result); } diff --git a/tests/integration_cases/test_raft.rs b/tests/integration_cases/test_raft.rs index 1b27259ac..dd6131399 100644 --- a/tests/integration_cases/test_raft.rs +++ b/tests/integration_cases/test_raft.rs @@ -32,6 +32,7 @@ use std::panic::{self, AssertUnwindSafe}; use crate::test_util::*; use raft::eraftpb::{ ConfChange, ConfChangeType, ConfState, Entry, EntryType, HardState, Message, MessageType, + Snapshot, }; use raft::storage::MemStorage; use raft::*; @@ -237,7 +238,7 @@ fn test_progress_maybe_decr() { ]; for (i, &(state, m, n, rejected, last, w, wn)) in tests.iter().enumerate() { let mut p = new_progress(state, m, n, 0, 0); - if p.maybe_decr_to(rejected, last) != w { + if p.maybe_decr_to(rejected, last, 0) != w { panic!("#{}: maybeDecrTo= {}, want {}", i, !w, w); } if p.matched != m { @@ -283,7 +284,7 @@ fn test_progress_resume() { paused: true, ..Default::default() }; - p.maybe_decr_to(1, 1); + p.maybe_decr_to(1, 1, INVALID_INDEX); assert!(!p.paused, "paused= true, want false"); p.paused = true; p.maybe_update(2); @@ -4181,3 +4182,291 @@ fn test_conf_change_check_before_campaign() { } assert_eq!(nt.peers[&1].state, StateRole::Candidate); } + +fn prepare_request_snapshot() -> (Network, Snapshot) { + fn index_term_11(id: u64, ids: Vec) -> Interface { + let store = MemStorage::new(); + store + .wl() + .apply_snapshot(new_snapshot(11, 11, ids.clone())) + .unwrap(); + let mut raft = new_test_raft(id, vec![], 5, 1, store); + raft.reset(11); + raft + } + let mut nt = Network::new(vec![ + Some(index_term_11(1, vec![1, 2, 3])), + Some(index_term_11(2, vec![1, 2, 3])), + Some(index_term_11(3, vec![1, 2, 3])), + ]); + + // elect r1 as leader + nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); + + let mut test_entries = Entry::default(); + test_entries.set_data(b"testdata".to_vec()); + let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries.clone()]); + nt.send(vec![msg.clone(), msg.clone()]); + assert_eq!(nt.peers[&1].raft_log.committed, 14); + assert_eq!(nt.peers[&2].raft_log.committed, 14); + + let mut cs = ConfState::default(); + cs.set_nodes(nt.peers[&1].prs().nodes()); + let ents = nt + .peers + .get_mut(&1) + .unwrap() + .raft_log + .unstable_entries() + .unwrap_or(&[]) + .to_vec(); + nt.storage[&1].wl().append(&ents).unwrap(); + nt.peers.get_mut(&1).unwrap().raft_log.applied = 14; + let s = nt.storage[&1] + .wl() + .create_snapshot(14, Some(cs), vec![7; 7]) + .unwrap() + .to_owned(); + + // Commit a new raft log. + let mut test_entries = Entry::default(); + test_entries.set_data(b"testdata".to_vec()); + let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries.clone()]); + nt.send(vec![msg.clone()]); + + (nt, s) +} + +// Test if an up-to-date follower can request a snapshot from leader. +#[test] +fn test_follower_request_snapshot() { + setup_for_test(); + let (mut nt, s) = prepare_request_snapshot(); + + // Request the latest snapshot. + let prev_snapshot_idx = s.get_metadata().get_index(); + let request_idx = nt.peers[&1].raft_log.committed; + assert!(prev_snapshot_idx < request_idx); + nt.peers + .get_mut(&2) + .unwrap() + .request_snapshot(request_idx) + .unwrap(); + + // Send the request snapshot message. + let req_snap = nt.peers.get_mut(&2).unwrap().msgs.pop().unwrap(); + assert!( + req_snap.get_msg_type() == MessageType::MsgAppendResponse + && req_snap.get_reject() + && req_snap.get_request_snapshot() == request_idx, + "{:?}", + req_snap + ); + nt.peers.get_mut(&1).unwrap().step(req_snap).unwrap(); + + // New proposes can not be replicated to peer 2. + let mut test_entries = Entry::default(); + test_entries.set_data(b"testdata".to_vec()); + let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries.clone()]); + nt.send(vec![msg.clone()]); + assert_eq!(nt.peers[&1].raft_log.committed, 16); + assert_eq!( + nt.peers[&1].prs().voters()[&2].state, + ProgressState::Snapshot + ); + assert_eq!(nt.peers[&2].raft_log.committed, 15); + + // Util snapshot success or fail. + let report_ok = new_message(2, 1, MessageType::MsgSnapStatus, 0); + nt.send(vec![report_ok]); + let hb_resp = new_message(2, 1, MessageType::MsgHeartbeatResponse, 0); + nt.send(vec![hb_resp]); + nt.send(vec![msg]); + assert_eq!(nt.peers[&1].raft_log.committed, 17); + assert_eq!(nt.peers[&2].raft_log.committed, 17); +} + +// Test if request snapshot can make progress when it meets SnapshotTemporarilyUnavailable. +#[test] +fn test_request_snapshot_unavailable() { + setup_for_test(); + let (mut nt, s) = prepare_request_snapshot(); + + // Request the latest snapshot. + let prev_snapshot_idx = s.get_metadata().get_index(); + let request_idx = nt.peers[&1].raft_log.committed; + assert!(prev_snapshot_idx < request_idx); + nt.peers + .get_mut(&2) + .unwrap() + .request_snapshot(request_idx) + .unwrap(); + + // Send the request snapshot message. + let req_snap = nt.peers.get_mut(&2).unwrap().msgs.pop().unwrap(); + assert!( + req_snap.get_msg_type() == MessageType::MsgAppendResponse + && req_snap.get_reject() + && req_snap.get_request_snapshot() == request_idx, + "{:?}", + req_snap + ); + + // Peer 2 is still in probe state due to SnapshotTemporarilyUnavailable. + nt.peers[&1].get_store().wl().trigger_snap_unavailable(); + nt.peers + .get_mut(&1) + .unwrap() + .step(req_snap.clone()) + .unwrap(); + assert_eq!(nt.peers[&1].prs().voters()[&2].state, ProgressState::Probe); + + // Next index is decreased. + nt.peers[&1].get_store().wl().trigger_snap_unavailable(); + nt.peers + .get_mut(&1) + .unwrap() + .step(req_snap.clone()) + .unwrap(); + assert_eq!(nt.peers[&1].prs().voters()[&2].state, ProgressState::Probe); + + // Snapshot will be available if it requests again. This message must not + // be considered stale even if `reject != next - 1` + nt.peers + .get_mut(&1) + .unwrap() + .step(req_snap.clone()) + .unwrap(); + assert_eq!( + nt.peers[&1].prs().voters()[&2].state, + ProgressState::Snapshot, + ); +} + +// Test if request snapshot can make progress when matched is advanced. +#[test] +fn test_request_snapshot_matched_change() { + setup_for_test(); + let (mut nt, _) = prepare_request_snapshot(); + // Let matched be greater than the committed. + nt.peers.get_mut(&2).unwrap().raft_log.committed -= 1; + + // Request the latest snapshot. + let request_idx = nt.peers[&2].raft_log.committed; + nt.peers + .get_mut(&2) + .unwrap() + .request_snapshot(request_idx) + .unwrap(); + let req_snap = nt.peers.get_mut(&2).unwrap().msgs.pop().unwrap(); + // The request snapshot is ignored because it is considered as out of order. + nt.peers.get_mut(&1).unwrap().step(req_snap).unwrap(); + assert_eq!( + nt.peers[&1].prs().voters()[&2].state, + ProgressState::Replicate + ); + + // Heartbeat is responsed with a request snapshot message. + for _ in 0..nt.peers[&1].get_heartbeat_timeout() { + nt.peers.get_mut(&1).unwrap().tick(); + } + let msg_hb = nt.peers.get_mut(&1).unwrap().msgs.pop().unwrap(); + nt.peers.get_mut(&2).unwrap().step(msg_hb).unwrap(); + let req_snap = nt.peers.get_mut(&2).unwrap().msgs.pop().unwrap(); + nt.peers + .get_mut(&1) + .unwrap() + .step(req_snap.clone()) + .unwrap(); + assert_eq!( + nt.peers[&1].prs().voters()[&2].state, + ProgressState::Snapshot + ); +} + +// Test if request snapshot can make progress when the peer is not Replicate. +#[test] +fn test_request_snapshot_none_replicate() { + setup_for_test(); + let (mut nt, _) = prepare_request_snapshot(); + nt.peers + .get_mut(&1) + .unwrap() + .mut_prs() + .get_mut(2) + .unwrap() + .state = ProgressState::Probe; + + // Request the latest snapshot. + let request_idx = nt.peers[&2].raft_log.committed; + nt.peers + .get_mut(&2) + .unwrap() + .request_snapshot(request_idx) + .unwrap(); + let req_snap = nt.peers.get_mut(&2).unwrap().msgs.pop().unwrap(); + nt.peers.get_mut(&1).unwrap().step(req_snap).unwrap(); + assert!(nt.peers[&1].prs().voters()[&2].pending_request_snapshot != 0); +} + +// Test if request snapshot can make progress when leader steps down. +#[test] +fn test_request_snapshot_step_down() { + setup_for_test(); + let (mut nt, _) = prepare_request_snapshot(); + + // Commit a new entry and leader steps down while peer 2 is isolated. + nt.isolate(2); + let mut test_entries = Entry::default(); + test_entries.set_data(b"testdata".to_vec()); + let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries.clone()]); + nt.send(vec![msg.clone()]); + nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]); + assert_eq!(nt.peers[&3].state, StateRole::Leader); + + // Recover and request the latest snapshot. + nt.recover(); + let request_idx = nt.peers[&2].raft_log.committed; + nt.peers + .get_mut(&2) + .unwrap() + .request_snapshot(request_idx) + .unwrap(); + nt.send(vec![new_message(3, 3, MessageType::MsgBeat, 0)]); + assert!( + nt.peers[&2].pending_request_snapshot == INVALID_INDEX, + "{}", + nt.peers[&2].pending_request_snapshot + ); +} + +// Abort request snapshot if it becomes leader or candidate. +#[test] +fn test_request_snapshot_on_role_change() { + setup_for_test(); + let (mut nt, _) = prepare_request_snapshot(); + + let request_idx = nt.peers[&2].raft_log.committed; + nt.peers + .get_mut(&2) + .unwrap() + .request_snapshot(request_idx) + .unwrap(); + + // Becoming follower does not reset pending_request_snapshot. + let (term, id) = (nt.peers[&1].term, nt.peers[&1].id); + nt.peers.get_mut(&2).unwrap().become_follower(term, id); + assert!( + nt.peers[&2].pending_request_snapshot != INVALID_INDEX, + "{}", + nt.peers[&2].pending_request_snapshot + ); + + // Becoming candidate resets pending_request_snapshot. + nt.peers.get_mut(&2).unwrap().become_candidate(); + assert!( + nt.peers[&2].pending_request_snapshot == INVALID_INDEX, + "{}", + nt.peers[&2].pending_request_snapshot + ); +} diff --git a/tests/integration_cases/test_raft_snap.rs b/tests/integration_cases/test_raft_snap.rs index 7a3741704..cf70330c4 100644 --- a/tests/integration_cases/test_raft_snap.rs +++ b/tests/integration_cases/test_raft_snap.rs @@ -27,6 +27,7 @@ use crate::test_util::*; use raft::eraftpb::*; +use raft::{Error, ProgressState, INVALID_INDEX}; fn testing_snap() -> Snapshot { new_snapshot(11, 11, vec![1, 2]) @@ -128,3 +129,83 @@ fn test_snapshot_abort() { assert_eq!(sm.prs().voters()[&2].pending_snapshot, 0); assert_eq!(sm.prs().voters()[&2].next_idx, 12); } + +#[test] +fn test_request_snapshot() { + setup_for_test(); + let mut sm = new_test_raft(1, vec![1, 2], 10, 1, new_storage()); + sm.restore(testing_snap()); + + // Raft can not step request snapshot if there is no leader. + assert_eq!( + sm.raft + .as_mut() + .unwrap() + .request_snapshot(INVALID_INDEX + 1) + .unwrap_err(), + Error::RequestSnapshotDropped + ); + + sm.become_candidate(); + sm.become_leader(); + + // Raft can not step request snapshot if itself is a leader. + assert_eq!( + sm.raft + .as_mut() + .unwrap() + .request_snapshot(INVALID_INDEX + 1) + .unwrap_err(), + Error::RequestSnapshotDropped + ); + + // Advance matched. + let mut m = new_message(2, 1, MessageType::MsgAppendResponse, 0); + m.set_index(11); + sm.step(m).unwrap(); + assert_eq!(sm.prs().voters()[&2].state, ProgressState::Replicate); + + let request_snapshot_idx = sm.raft_log.committed; + let mut m = new_message(2, 1, MessageType::MsgAppendResponse, 0); + m.set_index(11); + m.set_reject(true); + m.set_reject_hint(INVALID_INDEX); + m.set_request_snapshot(request_snapshot_idx); + + // Ignore out of order request snapshot messages. + let mut out_of_order = m.clone(); + out_of_order.set_index(9); + sm.step(out_of_order).unwrap(); + assert_eq!(sm.prs().voters()[&2].state, ProgressState::Replicate); + + // Request snapshot. + sm.step(m.clone()).unwrap(); + assert_eq!(sm.prs().voters()[&2].state, ProgressState::Snapshot); + assert_eq!(sm.prs().voters()[&2].pending_snapshot, 11); + assert_eq!(sm.prs().voters()[&2].next_idx, 12); + assert!(sm.prs().voters()[&2].is_paused()); + let snap = sm.msgs.pop().unwrap(); + assert!( + snap.get_msg_type() == MessageType::MsgSnapshot + && snap.get_snapshot().get_metadata().get_index() == request_snapshot_idx, + "{:?}", + snap + ); + + // Append/heartbeats does not set the state from snapshot to probe. + let mut m = new_message(2, 1, MessageType::MsgAppendResponse, 0); + m.set_index(11); + sm.step(m).unwrap(); + assert_eq!(sm.prs().voters()[&2].state, ProgressState::Snapshot); + assert_eq!(sm.prs().voters()[&2].pending_snapshot, 11); + assert_eq!(sm.prs().voters()[&2].next_idx, 12); + assert!(sm.prs().voters()[&2].is_paused()); + + // However snapshot status report does set the stat to probe. + let m = new_message(2, 1, MessageType::MsgSnapStatus, 0); + sm.step(m).unwrap(); + assert_eq!(sm.prs().voters()[&2].state, ProgressState::Probe); + assert_eq!(sm.prs().voters()[&2].pending_snapshot, 0); + assert_eq!(sm.prs().voters()[&2].next_idx, 12); + assert!(sm.prs().voters()[&2].is_paused()); +} diff --git a/tests/test_util/mod.rs b/tests/test_util/mod.rs index 07d2ae13d..ebe6c6111 100644 --- a/tests/test_util/mod.rs +++ b/tests/test_util/mod.rs @@ -59,6 +59,7 @@ pub fn new_test_config(id: u64, peers: Vec, election: usize, heartbeat: usi heartbeat_tick: heartbeat, max_size_per_msg: NO_LIMIT, max_inflight_msgs: 256, + tag: format!("{}", id), ..Default::default() } } @@ -265,6 +266,9 @@ impl Network { } Some(mut p) => { p.initial(id, &peer_addrs); + if let Some(raft) = p.raft.as_mut() { + nstorage.insert(id, raft.raft_log.store.clone()); + } npeers.insert(id, p); } }