From 813fc4e54e22ed2185edce026f27dc679897de62 Mon Sep 17 00:00:00 2001 From: Jay Date: Mon, 29 Apr 2019 16:05:24 +0800 Subject: [PATCH 1/8] Check pending conf change before campaign (#225) Fix #221. From f112f301bfbe5fca3f51a188a2412e5db04be3f6 Mon Sep 17 00:00:00 2001 From: Jay Date: Mon, 29 Apr 2019 19:00:40 +0800 Subject: [PATCH 2/8] Add more convenient lite-weight interfaces (#227) This PR introduces two simple and lite weight interfaces: - ping to trigger heartbeats without ticking, - status_ref to borrow the progress set instead of cloning. --- src/lib.rs | 4 ++-- src/raft.rs | 4 +--- src/raw_node.rs | 9 ++++++++- src/status.rs | 32 ++++++++++++++++++++++++++++++++ 4 files changed, 43 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 30ebd493b..668ca54c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -406,7 +406,7 @@ pub use self::raft::{vote_resp_msg_type, Raft, SoftState, StateRole, INVALID_ID, pub use self::raft_log::{RaftLog, NO_LIMIT}; pub use self::raw_node::{is_empty_snap, Peer, RawNode, Ready, SnapshotStatus}; pub use self::read_only::{ReadOnlyOption, ReadState}; -pub use self::status::Status; +pub use self::status::{Status, StatusRef}; pub use self::storage::{RaftState, Storage}; pub use raft_proto::eraftpb; use slog::{Drain, Logger}; @@ -435,7 +435,7 @@ pub mod prelude { pub use crate::progress::Progress; - pub use crate::status::Status; + pub use crate::status::{Status, StatusRef}; pub use crate::read_only::{ReadOnlyOption, ReadState}; } diff --git a/src/raft.rs b/src/raft.rs index dac0c7974..8f70d946b 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -681,9 +681,7 @@ impl Raft { self.set_prs(prs); } - /// Broadcast heartbeats to all the followers. - /// - /// If it's not leader, nothing will happen. + /// Broadcasts heartbeats to all the followers if it's leader. pub fn ping(&mut self) { if self.state == StateRole::Leader { self.bcast_heartbeat(); diff --git a/src/raw_node.rs b/src/raw_node.rs index e2f7a4c4f..28c2b0d97 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -42,7 +42,7 @@ use crate::eraftpb::{ }; use crate::errors::{Error, Result}; use crate::read_only::ReadState; -use crate::{Raft, SoftState, Status, Storage, INVALID_ID}; +use crate::{Raft, SoftState, Status, StatusRef, Storage, INVALID_ID}; use slog::Logger; /// Represents a Peer node in the cluster. @@ -456,6 +456,13 @@ impl RawNode { Status::new(&self.raft) } + /// Returns the current status of the given group. + /// + /// It's borrows the internal progress set instead of copying. + pub fn status_ref(&self) -> StatusRef { + StatusRef::new(&self.raft) + } + /// ReportUnreachable reports the given node is not reachable for the last send. pub fn report_unreachable(&mut self, id: u64) { let mut m = Message::default(); diff --git a/src/status.rs b/src/status.rs index 776810b52..64091c517 100644 --- a/src/status.rs +++ b/src/status.rs @@ -62,3 +62,35 @@ impl<'a> Status<'a> { s } } + +/// Represents the current status of the raft +#[derive(Default)] +pub struct StatusRef<'a> { + /// The ID of the current node. + pub id: u64, + /// The hardstate of the raft, representing voted state. + pub hs: HardState, + /// The softstate of the raft, representing proposed state. + pub ss: SoftState, + /// The index of the last entry to have been applied. + pub applied: u64, + /// The progress towards catching up and applying logs. + pub progress: Option<&'a ProgressSet>, +} + +impl<'a> StatusRef<'a> { + /// Gets the current raft status. + pub fn new(raft: &'a Raft) -> StatusRef<'a> { + let mut s = StatusRef { + id: raft.id, + ..Default::default() + }; + s.hs = raft.hard_state(); + s.ss = raft.soft_state(); + s.applied = raft.raft_log.get_applied(); + if s.ss.raft_state == StateRole::Leader { + s.progress = Some(raft.prs()); + } + s + } +} From c4d07102a440f4db3b5e0e731332610e705d5a8d Mon Sep 17 00:00:00 2001 From: Jay Date: Mon, 29 Apr 2019 19:16:41 +0800 Subject: [PATCH 3/8] *: bump to 0.4.2 (#228) From 0f59e7fdc64cd5c066d94e6c84a234e70af87c32 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Wed, 8 May 2019 16:27:18 +0800 Subject: [PATCH 4/8] Bump to v0.4.3 (#231) * raft: leader respond to learner read index message (#220) Signed-off-by: nolouch * Bump to v0.4.3 Signed-off-by: Neil Shen --- src/lib.rs | 1 + tests/integration_cases/test_raft.rs | 73 ++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 668ca54c8..12bf8148f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -366,6 +366,7 @@ before taking old, removed peers offline. */ +#![cfg_attr(not(feature = "cargo-clippy"), allow(unknown_lints))] #![deny(clippy::all)] #![deny(missing_docs)] #![recursion_limit = "128"] diff --git a/tests/integration_cases/test_raft.rs b/tests/integration_cases/test_raft.rs index e8deea943..b00ca174d 100644 --- a/tests/integration_cases/test_raft.rs +++ b/tests/integration_cases/test_raft.rs @@ -2265,6 +2265,79 @@ fn test_read_only_with_learner() { } } +#[test] +fn test_read_only_with_learner() { + setup_for_test(); + let a = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage()); + let b = new_test_learner_raft(2, vec![1], vec![2], 10, 1, new_storage()); + + let mut nt = Network::new(vec![Some(a), Some(b)]); + + // we can not let system choose the value of randomizedElectionTimeout + // otherwise it will introduce some uncertainty into this test case + // we need to ensure randomizedElectionTimeout > electionTimeout here + let b_election_timeout = nt.peers[&2].get_election_timeout(); + nt.peers + .get_mut(&2) + .unwrap() + .set_randomized_election_timeout(b_election_timeout + 1); + + for _ in 0..b_election_timeout { + nt.peers.get_mut(&2).unwrap().tick(); + } + nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); + + assert_eq!(nt.peers[&1].state, StateRole::Leader); + assert_eq!(nt.peers[&2].state, StateRole::Follower); + + let mut tests = vec![ + (1, 10, 11, "ctx1"), + (2, 10, 21, "ctx2"), + (1, 10, 31, "ctx3"), + (2, 10, 41, "ctx4"), + ]; + + for (i, (id, proposals, wri, wctx)) in tests.drain(..).enumerate() { + for _ in 0..proposals { + nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]); + } + + let e = new_entry(0, 0, Some(wctx)); + nt.send(vec![new_message_with_entries( + id, + id, + MessageType::MsgReadIndex, + vec![e], + )]); + + let read_states: Vec = nt + .peers + .get_mut(&id) + .unwrap() + .read_states + .drain(..) + .collect(); + assert_eq!( + read_states.is_empty(), + false, + "#{}: read_states is empty, want non-empty", + i + ); + let rs = &read_states[0]; + assert_eq!( + rs.index, wri, + "#{}: read_index = {}, want {}", + i, rs.index, wri + ); + let vec_wctx = wctx.as_bytes().to_vec(); + assert_eq!( + rs.request_ctx, vec_wctx, + "#{}: request_ctx = {:?}, want {:?}", + i, rs.request_ctx, vec_wctx + ); + } +} + #[test] fn test_read_only_option_lease() { let l = testing_logger().new(o!("test" => "read_only_option_lease")); From e6d98a4ce826b2a4ab27ffb12aa4bdaf09805dd6 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Mon, 1 Jul 2019 22:00:08 +0800 Subject: [PATCH 5/8] Request snapshot (#243) Signed-off-by: Neil Shen --- proto/proto/eraftpb.proto | 1 + proto/src/prost/eraftpb.rs | 2 + proto/src/prost/wrapper_eraftpb.rs | 4 + src/errors.rs | 5 + src/lib.rs | 2 +- src/progress/mod.rs | 41 +- src/raft.rs | 109 ++++-- src/raft_log.rs | 12 +- src/raw_node.rs | 6 + src/storage.rs | 41 +- .../test_membership_changes.rs | 4 +- tests/integration_cases/test_raft.rs | 358 ++++++++++++++---- tests/integration_cases/test_raft_snap.rs | 81 ++++ 13 files changed, 543 insertions(+), 123 deletions(-) diff --git a/proto/proto/eraftpb.proto b/proto/proto/eraftpb.proto index 664a4fea7..d09a93af8 100644 --- a/proto/proto/eraftpb.proto +++ b/proto/proto/eraftpb.proto @@ -73,6 +73,7 @@ message Message { repeated Entry entries = 7; uint64 commit = 8; Snapshot snapshot = 9; + uint64 request_snapshot = 13; bool reject = 10; uint64 reject_hint = 11; bytes context = 12; diff --git a/proto/src/prost/eraftpb.rs b/proto/src/prost/eraftpb.rs index f8c9174e7..ce651c9de 100644 --- a/proto/src/prost/eraftpb.rs +++ b/proto/src/prost/eraftpb.rs @@ -65,6 +65,8 @@ pub struct Message { pub commit: u64, #[prost(message, optional, tag = "9")] pub snapshot: ::std::option::Option, + #[prost(uint64, tag = "13")] + pub request_snapshot: u64, #[prost(bool, tag = "10")] pub reject: bool, #[prost(uint64, tag = "11")] diff --git a/proto/src/prost/wrapper_eraftpb.rs b/proto/src/prost/wrapper_eraftpb.rs index 0a39f3097..9e0f71609 100644 --- a/proto/src/prost/wrapper_eraftpb.rs +++ b/proto/src/prost/wrapper_eraftpb.rs @@ -466,6 +466,10 @@ impl Message { self.snapshot.take().unwrap_or_else(Snapshot::default) } #[inline] + pub fn clear_request_snapshot(&mut self) { + self.request_snapshot = 0 + } + #[inline] pub fn clear_reject(&mut self) { self.reject = false } diff --git a/src/errors.rs b/src/errors.rs index 84b8a9886..cd6b8bbf2 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -83,6 +83,10 @@ quick_error! { ViolatesContract(contract: String) { display("An argument violate a calling contract: {}", contract) } + /// The request snapshot is dropped. + RequestSnapshotDropped { + description("raft: request snapshot dropped") + } } } @@ -96,6 +100,7 @@ impl cmp::PartialEq for Error { (&Error::Io(ref e1), &Error::Io(ref e2)) => e1.kind() == e2.kind(), (&Error::StepLocalMsg, &Error::StepLocalMsg) => true, (&Error::ConfigInvalid(ref e1), &Error::ConfigInvalid(ref e2)) => e1 == e2, + (&Error::RequestSnapshotDropped, &Error::RequestSnapshotDropped) => true, _ => false, } } diff --git a/src/lib.rs b/src/lib.rs index 12bf8148f..6148f9a63 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -385,12 +385,12 @@ extern crate getset; mod config; mod errors; mod log_unstable; -mod progress; #[cfg(test)] pub mod raft; #[cfg(not(test))] mod raft; mod raft_log; +mod progress; pub mod raw_node; mod read_only; mod status; diff --git a/src/progress/mod.rs b/src/progress/mod.rs index 8c3716fb1..e82b4bed2 100644 --- a/src/progress/mod.rs +++ b/src/progress/mod.rs @@ -24,9 +24,10 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +use std::cmp; +use crate::raft::INVALID_INDEX; use self::inflights::Inflights; -use std::cmp; pub mod inflights; pub mod progress_set; @@ -73,6 +74,10 @@ pub struct Progress { /// this Progress will be paused. raft will not resend snapshot until the pending one /// is reported to be failed. pub pending_snapshot: u64, + /// This field is used in request snapshot. + /// If there is a pending request snapshot, this will be set to the request + /// index of the snapshot. + pub pending_request_snapshot: u64, /// This is true if the progress is recently active. Receiving any messages /// from the corresponding follower indicates the progress is active. @@ -98,6 +103,7 @@ impl Progress { state: ProgressState::default(), paused: false, pending_snapshot: 0, + pending_request_snapshot: 0, recent_active: false, ins: Inflights::new(ins_size), } @@ -116,6 +122,7 @@ impl Progress { self.state = ProgressState::default(); self.paused = false; self.pending_snapshot = 0; + self.pending_request_snapshot = INVALID_INDEX; self.recent_active = false; debug_assert!(self.ins.cap() != 0); self.ins.reset(); @@ -188,25 +195,41 @@ impl Progress { /// Returns false if the given index comes from an out of order message. /// Otherwise it decreases the progress next index to min(rejected, last) /// and returns true. - pub fn maybe_decr_to(&mut self, rejected: u64, last: u64) -> bool { + pub fn maybe_decr_to(&mut self, rejected: u64, last: u64, request_snapshot: u64) -> bool { if self.state == ProgressState::Replicate { // the rejection must be stale if the progress has matched and "rejected" // is smaller than "match". - if rejected <= self.matched { + // Or rejected equals to matched and request_snapshot is the INVALID_INDEX. + if rejected < self.matched + || (rejected == self.matched && request_snapshot == INVALID_INDEX) + { return false; } - self.next_idx = self.matched + 1; + if request_snapshot == INVALID_INDEX { + self.next_idx = self.matched + 1; + } else { + self.pending_request_snapshot = request_snapshot; + } return true; } - // the rejection must be stale if "rejected" does not match next - 1 - if self.next_idx == 0 || self.next_idx - 1 != rejected { + // The rejection must be stale if "rejected" does not match next - 1. + // Do not consider it stale if it is a request snapshot message. + if (self.next_idx == 0 || self.next_idx - 1 != rejected) + && request_snapshot == INVALID_INDEX + { return false; } - self.next_idx = cmp::min(rejected, last + 1); - if self.next_idx < 1 { - self.next_idx = 1; + // Do not decrease next index if it's requesting snapshot. + if request_snapshot == INVALID_INDEX { + self.next_idx = cmp::min(rejected, last + 1); + if self.next_idx < 1 { + self.next_idx = 1; + } + } else if self.pending_request_snapshot == INVALID_INDEX { + // Allow requesting snapshot even if it's not Replicate. + self.pending_request_snapshot = request_snapshot; } self.resume(); true diff --git a/src/raft.rs b/src/raft.rs index 8f70d946b..5dce3b282 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -112,6 +112,10 @@ pub struct Raft { /// The maximum length (in bytes) of all the entries. pub max_msg_size: u64, + /// The peer is requesting snapshot, it is the index that the follower + /// needs it to be included in a snapshot. + pub pending_request_snapshot: u64, + prs: Option, /// The current role of this node. @@ -253,6 +257,7 @@ impl Raft { max_inflight: c.max_inflight_msgs, max_msg_size: c.max_size_per_msg, prs: Some(ProgressSet::with_capacity(peers.len(), learners.len())), + pending_request_snapshot: INVALID_INDEX, state: StateRole::Follower, is_learner: false, check_quorum: c.check_quorum, @@ -524,7 +529,7 @@ impl Raft { } m.set_msg_type(MessageType::MsgSnapshot); - let snapshot_r = self.raft_log.snapshot(); + let snapshot_r = self.raft_log.snapshot(pr.pending_request_snapshot); if let Err(e) = snapshot_r { if e == Error::Store(StorageError::SnapshotTemporarilyUnavailable) { debug!( @@ -619,34 +624,28 @@ impl Raft { ); return; } - let term = self.raft_log.term(pr.next_idx - 1); - let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size); let mut m = Message::default(); m.to = to; - if term.is_err() || ents.is_err() { - // send snapshot if we failed to get term or entries - trace!( - self.logger, - "Skipping sending to {to}", - to = to; - "index" => pr.next_idx, - "tag" => &self.tag, - "term" => ?term, - "ents" => ?ents, - ); + if pr.pending_request_snapshot != INVALID_INDEX { + // Check pending request snapshot first to avoid unnecessary loading entries. if !self.prepare_send_snapshot(&mut m, pr, to) { return; } } else { - let mut ents = ents.unwrap(); - if self.batch_append { - let batched = self.try_batching(to, pr, &mut ents); - if batched { + let term = self.raft_log.term(pr.next_idx - 1); + let ents = self.raft_log.entries(pr.next_idx, self.max_msg_size); + if term.is_err() || ents.is_err() { + // send snapshot if we failed to get term or entries. + if !self.prepare_send_snapshot(&mut m, pr, to) { return; } + } else { + let mut ents = ents.unwrap(); + if self.batch_append && self.try_batching(to, pr, &mut ents) { + return; + } + self.prepare_send_entries(&mut m, pr, term.unwrap(), ents); } - let term = term.unwrap(); - self.prepare_send_entries(&mut m, pr, term, ents); } self.send(m); } @@ -768,6 +767,7 @@ impl Raft { self.pending_conf_index = 0; self.read_only = ReadOnly::new(self.read_only.option); + self.pending_request_snapshot = INVALID_INDEX; let last_index = self.raft_log.last_index(); let self_id = self.id; @@ -857,9 +857,11 @@ impl Raft { /// Converts this node to a follower. pub fn become_follower(&mut self, term: u64, leader_id: u64) { + let pending_request_snapshot = self.pending_request_snapshot; self.reset(term); self.leader_id = leader_id; self.state = StateRole::Follower; + self.pending_request_snapshot = pending_request_snapshot; info!( self.logger, "became follower at term {term}", @@ -1459,7 +1461,7 @@ impl Raft { "tag" => &self.tag, ); - if pr.maybe_decr_to(m.index, m.reject_hint) { + if pr.maybe_decr_to(m.index, m.reject_hint, m.request_snapshot) { debug!( self.logger, "decreased progress of {}", @@ -1531,7 +1533,8 @@ impl Raft { if pr.state == ProgressState::Replicate && pr.ins.full() { pr.ins.free_first_one(); } - if pr.matched < self.raft_log.last_index() { + // Does it request snapshot? + if pr.matched < self.raft_log.last_index() || pr.pending_request_snapshot != INVALID_INDEX { *send_append = true; } @@ -1657,6 +1660,7 @@ impl Raft { // out the next msgAppend. // If snapshot failure, wait for a heartbeat interval before next try pr.pause(); + pr.pending_request_snapshot = INVALID_INDEX; } /// Check message's progress to decide which action should be taken. @@ -2062,9 +2066,47 @@ impl Raft { Ok(()) } + /// Request a snapshot from a leader. + pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> { + if self.state == StateRole::Leader { + info!( + self.logger, + "can not request snapshot on leader; dropping request snapshot"; + "tag" => &self.tag, + ); + } else if self.leader_id == INVALID_ID { + info!( + self.logger, + "drop request snapshot because of no leader"; + "tag" => &self.tag, "term" => self.term, + ); + } else if self.get_snap().is_some() { + info!( + self.logger, + "there is a pending snapshot; dropping request snapshot"; + "tag" => &self.tag, + ); + } else if self.pending_request_snapshot != INVALID_INDEX { + info!( + self.logger, + "there is a pending snapshot; dropping request snapshot"; + "tag" => &self.tag, + ); + } else { + self.pending_request_snapshot = request_index; + self.send_request_snapshot(); + return Ok(()); + } + Err(Error::RequestSnapshotDropped) + } + // TODO: revoke pub when there is a better way to test. /// For a given message, append the entries to the log. pub fn handle_append_entries(&mut self, m: &Message) { + if self.pending_request_snapshot != INVALID_INDEX { + self.send_request_snapshot(); + return; + } if m.index < self.raft_log.committed { debug!( self.logger, @@ -2072,6 +2114,7 @@ impl Raft { "tag" => &self.tag, ); let mut to_send = Message::default(); + to_send.to = m.from; to_send.set_msg_type(MessageType::MsgAppendResponse); to_send.to = m.from; to_send.index = self.raft_log.committed; @@ -2115,7 +2158,12 @@ impl Raft { /// For a message, commit and send out heartbeat. pub fn handle_heartbeat(&mut self, mut m: Message) { self.raft_log.commit_to(m.commit); + if self.pending_request_snapshot != INVALID_INDEX { + self.send_request_snapshot(); + return; + } let mut to_send = Message::default(); + to_send.to = m.from; to_send.set_msg_type(MessageType::MsgHeartbeatResponse); to_send.to = m.from; to_send.context = m.take_context(); @@ -2160,7 +2208,10 @@ impl Raft { fn restore_raft(&mut self, snap: &Snapshot) -> Option { let meta = snap.get_metadata(); - if self.raft_log.match_term(meta.index, meta.term) { + // Do not fast-forward commit if we are requesting snapshot. + if self.pending_request_snapshot == INVALID_INDEX + && self.raft_log.match_term(meta.index, meta.term) + { info!( self.logger, "[commit: {commit}, lastindex: {last_index}, lastterm: {last_term}] fast-forwarded commit to \ @@ -2220,6 +2271,7 @@ impl Raft { conf_change.start_index = meta.pending_membership_change_index; self.pending_membership_change = Some(conf_change); } + self.pending_request_snapshot = INVALID_INDEX; None } @@ -2503,4 +2555,15 @@ impl Raft { pub fn is_in_membership_change(&self) -> bool { self.prs().is_in_membership_change() } + + fn send_request_snapshot(&mut self) { + let mut m = Message::default(); + m.set_msg_type(MessageType::MsgAppendResponse); + m.index = self.raft_log.committed; + m.reject = true; + m.reject_hint = self.raft_log.last_index(); + m.to = self.leader_id; + m.request_snapshot = self.pending_request_snapshot; + self.send(m); + } } diff --git a/src/raft_log.rs b/src/raft_log.rs index 1768af4df..6e1779ab4 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -424,11 +424,13 @@ impl RaftLog { } /// Returns the current snapshot - pub fn snapshot(&self) -> Result { - self.unstable - .snapshot - .clone() - .map_or_else(|| self.store.snapshot(), Ok) + pub fn snapshot(&self, request_index: u64) -> Result { + if let Some(snap) = self.unstable.snapshot.as_ref() { + if snap.get_metadata().index >= request_index { + return Ok(snap.clone()); + } + } + self.store.snapshot(request_index) } fn must_check_outofbounds(&self, low: u64, high: u64) -> Option { diff --git a/src/raw_node.rs b/src/raw_node.rs index 28c2b0d97..debcca2f2 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -483,6 +483,12 @@ impl RawNode { let _ = self.raft.step(m); } + /// Request a snapshot from a leader. + /// The snapshot's index must be greater or equal to the request_index. + pub fn request_snapshot(&mut self, request_index: u64) -> Result<()> { + self.raft.request_snapshot(request_index) + } + /// TransferLeader tries to transfer leadership to the given transferee. pub fn transfer_leader(&mut self, transferee: u64) { let mut m = Message::default(); diff --git a/src/storage.rs b/src/storage.rs index a46166160..97c36d518 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -124,7 +124,8 @@ pub trait Storage { /// If snapshot is temporarily unavailable, it should return SnapshotTemporarilyUnavailable, /// so raft state machine could know that Storage needs some time to prepare /// snapshot and call snapshot later. - fn snapshot(&self) -> Result; + /// A snapshot's index must not less than the `request_index`. + fn snapshot(&self, request_index: u64) -> Result; } /// The Memory Storage Core instance holds the actual state of the storage struct. To access this @@ -135,6 +136,9 @@ pub struct MemStorageCore { entries: Vec, // Metadata of the last snapshot received. snapshot_metadata: SnapshotMetadata, + // If it is true, the next snapshot will return a + // SnapshotTemporarilyUnavailable error. + trigger_snap_unavailable: bool, } impl Default for MemStorageCore { @@ -144,6 +148,8 @@ impl Default for MemStorageCore { entries: vec![], // Every time a snapshot is applied to the storage, the metadata will be stored here. snapshot_metadata: Default::default(), + // When starting from scratch populate the list with a dummy entry at term zero. + trigger_snap_unavailable: false, } } } @@ -343,6 +349,11 @@ impl MemStorageCore { } Ok(()) } + + /// Trigger a SnapshotTemporarilyUnavailable error. + pub fn trigger_snap_unavailable(&mut self) { + self.trigger_snap_unavailable = true; + } } /// `MemStorage` is a thread-safe but incomplete implementation of `Storage`, mainly for tests. @@ -473,9 +484,18 @@ impl Storage for MemStorage { } /// Implements the Storage trait. - fn snapshot(&self) -> Result { - let core = self.rl(); - Ok(core.snapshot()) + fn snapshot(&self, request_index: u64) -> Result { + let mut core = self.wl(); + if core.trigger_snap_unavailable { + core.trigger_snap_unavailable = false; + Err(Error::Store(StorageError::SnapshotTemporarilyUnavailable)) + } else { + let mut snap = core.snapshot(); + if snap.get_metadata().index < request_index { + snap.mut_metadata().index = request_index; + } + Ok(snap) + } } } @@ -669,18 +689,23 @@ mod test { let mut conf_state = ConfState::default(); conf_state.nodes = nodes.clone(); + let unavailable = Err(RaftError::Store( + StorageError::SnapshotTemporarilyUnavailable, + )); let mut tests = vec![ - (4, Ok(new_snapshot(4, 4, nodes.clone()))), - (5, Ok(new_snapshot(5, 5, nodes.clone()))), + (4, Ok(new_snapshot(4, 4, nodes.clone())), 0), + (5, Ok(new_snapshot(5, 5, nodes.clone())), 5), + (5, Ok(new_snapshot(6, 5, nodes.clone())), 6), + (5, unavailable, 6), ]; - for (i, (idx, wresult)) in tests.drain(..).enumerate() { + for (i, (idx, wresult, windex)) in tests.drain(..).enumerate() { let storage = MemStorage::new(); storage.wl().entries = ents.clone(); storage.wl().raft_state.hard_state.commit = idx; storage.wl().raft_state.hard_state.term = idx; storage.wl().raft_state.conf_state = conf_state.clone(); - let result = storage.snapshot(); + let result = storage.snapshot(windex); if result != wresult { panic!("#{}: want {:?}, got {:?}", i, wresult, result); } diff --git a/tests/integration_cases/test_membership_changes.rs b/tests/integration_cases/test_membership_changes.rs index 5e6762509..fd7473bee 100644 --- a/tests/integration_cases/test_membership_changes.rs +++ b/tests/integration_cases/test_membership_changes.rs @@ -654,7 +654,7 @@ mod three_peers_replace_voter { ConfState::from(peer.prs().configuration().clone()).into(), peer.pending_membership_change().clone(), )?; - let snapshot = peer.raft_log.snapshot()?; + let snapshot = peer.raft_log.snapshot(0)?; peer.raft_log.store.wl().compact(3)?; snapshot }; @@ -683,7 +683,7 @@ mod three_peers_replace_voter { scenario.peers.get_mut(&4).unwrap().raft_log.unstable.offset ); let new_peer = scenario.peers.get_mut(&4).unwrap(); - let snap = new_peer.raft_log.snapshot().unwrap(); + let snap = new_peer.raft_log.snapshot(0).unwrap(); new_peer.raft_log.store.wl().apply_snapshot(snap).unwrap(); new_peer .raft_log diff --git a/tests/integration_cases/test_raft.rs b/tests/integration_cases/test_raft.rs index b00ca174d..f43404871 100644 --- a/tests/integration_cases/test_raft.rs +++ b/tests/integration_cases/test_raft.rs @@ -251,7 +251,7 @@ fn test_progress_maybe_decr() { ]; for (i, &(state, m, n, rejected, last, w, wn)) in tests.iter().enumerate() { let mut p = new_progress(state, m, n, 0, 0); - if p.maybe_decr_to(rejected, last) != w { + if p.maybe_decr_to(rejected, last, 0) != w { panic!("#{}: maybeDecrTo= {}, want {}", i, !w, w); } if p.matched != m { @@ -288,7 +288,7 @@ fn test_progress_is_paused() { fn test_progress_resume() { let mut p = Progress::new(2, 256); p.paused = true; - p.maybe_decr_to(1, 1); + p.maybe_decr_to(1, 1, INVALID_INDEX); assert!(!p.paused, "paused= true, want false"); p.paused = true; p.maybe_update(2); @@ -2265,79 +2265,6 @@ fn test_read_only_with_learner() { } } -#[test] -fn test_read_only_with_learner() { - setup_for_test(); - let a = new_test_learner_raft(1, vec![1], vec![2], 10, 1, new_storage()); - let b = new_test_learner_raft(2, vec![1], vec![2], 10, 1, new_storage()); - - let mut nt = Network::new(vec![Some(a), Some(b)]); - - // we can not let system choose the value of randomizedElectionTimeout - // otherwise it will introduce some uncertainty into this test case - // we need to ensure randomizedElectionTimeout > electionTimeout here - let b_election_timeout = nt.peers[&2].get_election_timeout(); - nt.peers - .get_mut(&2) - .unwrap() - .set_randomized_election_timeout(b_election_timeout + 1); - - for _ in 0..b_election_timeout { - nt.peers.get_mut(&2).unwrap().tick(); - } - nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); - - assert_eq!(nt.peers[&1].state, StateRole::Leader); - assert_eq!(nt.peers[&2].state, StateRole::Follower); - - let mut tests = vec![ - (1, 10, 11, "ctx1"), - (2, 10, 21, "ctx2"), - (1, 10, 31, "ctx3"), - (2, 10, 41, "ctx4"), - ]; - - for (i, (id, proposals, wri, wctx)) in tests.drain(..).enumerate() { - for _ in 0..proposals { - nt.send(vec![new_message(1, 1, MessageType::MsgPropose, 1)]); - } - - let e = new_entry(0, 0, Some(wctx)); - nt.send(vec![new_message_with_entries( - id, - id, - MessageType::MsgReadIndex, - vec![e], - )]); - - let read_states: Vec = nt - .peers - .get_mut(&id) - .unwrap() - .read_states - .drain(..) - .collect(); - assert_eq!( - read_states.is_empty(), - false, - "#{}: read_states is empty, want non-empty", - i - ); - let rs = &read_states[0]; - assert_eq!( - rs.index, wri, - "#{}: read_index = {}, want {}", - i, rs.index, wri - ); - let vec_wctx = wctx.as_bytes().to_vec(); - assert_eq!( - rs.request_ctx, vec_wctx, - "#{}: request_ctx = {:?}, want {:?}", - i, rs.request_ctx, vec_wctx - ); - } -} - #[test] fn test_read_only_option_lease() { let l = testing_logger().new(o!("test" => "read_only_option_lease")); @@ -4364,3 +4291,284 @@ fn test_conf_change_check_before_campaign() { } assert_eq!(nt.peers[&1].state, StateRole::Candidate); } + +fn prepare_request_snapshot() -> (Network, Snapshot) { + /************************ + fn index_term_11(id: u64, ids: Vec) -> Interface { + let store = MemStorage::new(); + store + .wl() + .apply_snapshot(new_snapshot(11, 11, ids.clone())) + .unwrap(); + let mut raft = new_test_raft(id, vec![], 5, 1, store); + raft.reset(11); + raft + } + let l = testing_logger().new(o!("test" => "log_replication")); + let mut nt = Network::new(vec![ + Some(index_term_11(1, vec![1, 2, 3])), + Some(index_term_11(2, vec![1, 2, 3])), + Some(index_term_11(3, vec![1, 2, 3])), + ],&l); + + // elect r1 as leader + nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); + + let mut test_entries = Entry::default(); + test_entries.data = b"testdata".to_vec(); + let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries.clone()]); + nt.send(vec![msg.clone(), msg.clone()]); + assert_eq!(nt.peers[&1].raft_log.committed, 14); + assert_eq!(nt.peers[&2].raft_log.committed, 14); + + let mut cs = ConfState::default(); + cs.nodes = nt.peers[&1].prs().voter_ids().into_iter().collect(); + let ents = nt + .peers + .get_mut(&1) + .unwrap() + .raft_log + .unstable_entries() + .unwrap_or(&[]) + .to_vec(); + nt.storage[&1].wl().append(&ents).unwrap(); + nt.peers.get_mut(&1).unwrap().raft_log.applied = 14; + let s = nt.storage[&1] + .wl() + .create_snapshot(14, Some(cs), vec![7; 7]) + .unwrap() + .to_owned(); + + // Commit a new raft log. + let mut test_entries = Entry::new(); + test_entries.set_data(b"testdata".to_vec()); + let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries.clone()]); + nt.send(vec![msg.clone()]); + + (nt, s) + ************************/ + + let l = testing_logger().new(o!("test" => "log_replication")); + let nt = Network::new(vec![], &l); + let s = Snapshot::default(); + (nt, s) +} + +// Test if an up-to-date follower can request a snapshot from leader. +#[test] +fn test_follower_request_snapshot() { + let (mut nt, s) = prepare_request_snapshot(); + + // Request the latest snapshot. + let prev_snapshot_idx = s.get_metadata().index; + let request_idx = nt.peers[&1].raft_log.committed; + assert!(prev_snapshot_idx < request_idx); + nt.peers + .get_mut(&2) + .unwrap() + .request_snapshot(request_idx) + .unwrap(); + + // Send the request snapshot message. + let req_snap = nt.peers.get_mut(&2).unwrap().msgs.pop().unwrap(); + assert!( + req_snap.get_msg_type() == MessageType::MsgAppendResponse + && req_snap.reject + && req_snap.request_snapshot == request_idx, + "{:?}", + req_snap + ); + nt.peers.get_mut(&1).unwrap().step(req_snap).unwrap(); + + // New proposes can not be replicated to peer 2. + let mut test_entries = Entry::default(); + test_entries.data = b"testdata".to_vec(); + let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries.clone()]); + nt.send(vec![msg.clone()]); + assert_eq!(nt.peers[&1].raft_log.committed, 16); + assert_eq!( + nt.peers[&1].prs().get(2).unwrap().state, + ProgressState::Snapshot + ); + assert_eq!(nt.peers[&2].raft_log.committed, 15); + + // Util snapshot success or fail. + let report_ok = new_message(2, 1, MessageType::MsgSnapStatus, 0); + nt.send(vec![report_ok]); + let hb_resp = new_message(2, 1, MessageType::MsgHeartbeatResponse, 0); + nt.send(vec![hb_resp]); + nt.send(vec![msg]); + assert_eq!(nt.peers[&1].raft_log.committed, 17); + assert_eq!(nt.peers[&2].raft_log.committed, 17); +} + +// Test if request snapshot can make progress when it meets SnapshotTemporarilyUnavailable. +#[test] +fn test_request_snapshot_unavailable() { + let (mut nt, s) = prepare_request_snapshot(); + + // Request the latest snapshot. + let prev_snapshot_idx = s.get_metadata().index; + let request_idx = nt.peers[&1].raft_log.committed; + assert!(prev_snapshot_idx < request_idx); + nt.peers + .get_mut(&2) + .unwrap() + .request_snapshot(request_idx) + .unwrap(); + + // Send the request snapshot message. + let req_snap = nt.peers.get_mut(&2).unwrap().msgs.pop().unwrap(); + assert!( + req_snap.get_msg_type() == MessageType::MsgAppendResponse + && req_snap.reject + && req_snap.request_snapshot == request_idx, + "{:?}", + req_snap + ); + + // Peer 2 is still in probe state due to SnapshotTemporarilyUnavailable. + nt.peers[&1].get_store().wl().trigger_snap_unavailable(); + nt.peers + .get_mut(&1) + .unwrap() + .step(req_snap.clone()) + .unwrap(); + assert_eq!(nt.peers[&1].prs().get(2).unwrap().state, ProgressState::Probe); + + // Next index is decreased. + nt.peers[&1].get_store().wl().trigger_snap_unavailable(); + nt.peers + .get_mut(&1) + .unwrap() + .step(req_snap.clone()) + .unwrap(); + assert_eq!(nt.peers[&1].prs().get(2).unwrap().state, ProgressState::Probe); + + // Snapshot will be available if it requests again. This message must not + // be considered stale even if `reject != next - 1` + nt.peers + .get_mut(&1) + .unwrap() + .step(req_snap.clone()) + .unwrap(); + assert_eq!(nt.peers[&1].prs().get(2).unwrap().state, ProgressState::Snapshot); +} + +// Test if request snapshot can make progress when matched is advanced. +#[test] +fn test_request_snapshot_matched_change() { + let (mut nt, _) = prepare_request_snapshot(); + // Let matched be greater than the committed. + nt.peers.get_mut(&2).unwrap().raft_log.committed -= 1; + + // Request the latest snapshot. + let request_idx = nt.peers[&2].raft_log.committed; + nt.peers + .get_mut(&2) + .unwrap() + .request_snapshot(request_idx) + .unwrap(); + let req_snap = nt.peers.get_mut(&2).unwrap().msgs.pop().unwrap(); + // The request snapshot is ignored because it is considered as out of order. + nt.peers.get_mut(&1).unwrap().step(req_snap).unwrap(); + assert_eq!(nt.peers[&1].prs().get(2).unwrap().state, ProgressState::Replicate); + + // Heartbeat is responsed with a request snapshot message. + for _ in 0..nt.peers[&1].get_heartbeat_timeout() { + nt.peers.get_mut(&1).unwrap().tick(); + } + let msg_hb = nt.peers.get_mut(&1).unwrap().msgs.pop().unwrap(); + nt.peers.get_mut(&2).unwrap().step(msg_hb).unwrap(); + let req_snap = nt.peers.get_mut(&2).unwrap().msgs.pop().unwrap(); + nt.peers + .get_mut(&1) + .unwrap() + .step(req_snap.clone()) + .unwrap(); + assert_eq!(nt.peers[&1].prs().get(2).unwrap().state, ProgressState::Snapshot); +} + +// Test if request snapshot can make progress when the peer is not Replicate. +#[test] +fn test_request_snapshot_none_replicate() { + let (mut nt, _) = prepare_request_snapshot(); + nt.peers + .get_mut(&1) + .unwrap() + .mut_prs() + .get_mut(2) + .unwrap() + .state = ProgressState::Probe; + + // Request the latest snapshot. + let request_idx = nt.peers[&2].raft_log.committed; + nt.peers + .get_mut(&2) + .unwrap() + .request_snapshot(request_idx) + .unwrap(); + let req_snap = nt.peers.get_mut(&2).unwrap().msgs.pop().unwrap(); + nt.peers.get_mut(&1).unwrap().step(req_snap).unwrap(); + assert!(nt.peers[&1].prs().get(2).unwrap().pending_request_snapshot != 0); +} + +// Test if request snapshot can make progress when leader steps down. +#[test] +fn test_request_snapshot_step_down() { + let (mut nt, _) = prepare_request_snapshot(); + + // Commit a new entry and leader steps down while peer 2 is isolated. + nt.isolate(2); + let mut test_entries = Entry::default(); + test_entries.data = b"testdata".to_vec(); + let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries.clone()]); + nt.send(vec![msg.clone()]); + nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]); + assert_eq!(nt.peers[&3].state, StateRole::Leader); + + // Recover and request the latest snapshot. + nt.recover(); + let request_idx = nt.peers[&2].raft_log.committed; + nt.peers + .get_mut(&2) + .unwrap() + .request_snapshot(request_idx) + .unwrap(); + nt.send(vec![new_message(3, 3, MessageType::MsgBeat, 0)]); + assert!( + nt.peers[&2].pending_request_snapshot == INVALID_INDEX, + "{}", + nt.peers[&2].pending_request_snapshot + ); +} + +// Abort request snapshot if it becomes leader or candidate. +#[test] +fn test_request_snapshot_on_role_change() { + let (mut nt, _) = prepare_request_snapshot(); + + let request_idx = nt.peers[&2].raft_log.committed; + nt.peers + .get_mut(&2) + .unwrap() + .request_snapshot(request_idx) + .unwrap(); + + // Becoming follower does not reset pending_request_snapshot. + let (term, id) = (nt.peers[&1].term, nt.peers[&1].id); + nt.peers.get_mut(&2).unwrap().become_follower(term, id); + assert!( + nt.peers[&2].pending_request_snapshot != INVALID_INDEX, + "{}", + nt.peers[&2].pending_request_snapshot + ); + + // Becoming candidate resets pending_request_snapshot. + nt.peers.get_mut(&2).unwrap().become_candidate(); + assert!( + nt.peers[&2].pending_request_snapshot == INVALID_INDEX, + "{}", + nt.peers[&2].pending_request_snapshot + ); +} diff --git a/tests/integration_cases/test_raft_snap.rs b/tests/integration_cases/test_raft_snap.rs index c990eca8e..4bf7e3485 100644 --- a/tests/integration_cases/test_raft_snap.rs +++ b/tests/integration_cases/test_raft_snap.rs @@ -29,6 +29,7 @@ use crate::test_util::*; use crate::testing_logger; use harness::Network; use raft::eraftpb::*; +use raft::{INVALID_INDEX, Error, ProgressState}; fn testing_snap() -> Snapshot { new_snapshot(11, 11, vec![1, 2]) @@ -152,3 +153,83 @@ fn test_snapshot_with_min_term() { do_test(true); do_test(false); } + +#[test] +fn test_request_snapshot() { + let l = testing_logger().new(o!("test" => "snapshot_with_min_term")); + let mut sm = new_test_raft(1, vec![1, 2], 10, 1, new_storage(), &l); + sm.restore(testing_snap()); + + // Raft can not step request snapshot if there is no leader. + assert_eq!( + sm.raft + .as_mut() + .unwrap() + .request_snapshot(INVALID_INDEX + 1) + .unwrap_err(), + Error::RequestSnapshotDropped + ); + + sm.become_candidate(); + sm.become_leader(); + + // Raft can not step request snapshot if itself is a leader. + assert_eq!( + sm.raft + .as_mut() + .unwrap() + .request_snapshot(INVALID_INDEX + 1) + .unwrap_err(), + Error::RequestSnapshotDropped + ); + + // Advance matched. + let mut m = new_message(2, 1, MessageType::MsgAppendResponse, 0); + m.index = 11; + sm.step(m).unwrap(); + assert_eq!(sm.prs().get(2).unwrap().state, ProgressState::Replicate); + + let request_snapshot_idx = sm.raft_log.committed; + let mut m = new_message(2, 1, MessageType::MsgAppendResponse, 0); + m.index = 11; + m.reject = true; + m.reject_hint = INVALID_INDEX; + m.request_snapshot = request_snapshot_idx; + + // Ignore out of order request snapshot messages. + let mut out_of_order = m.clone(); + out_of_order.index = 9; + sm.step(out_of_order).unwrap(); + assert_eq!(sm.prs().get(2).unwrap().state, ProgressState::Replicate); + + // Request snapshot. + sm.step(m.clone()).unwrap(); + assert_eq!(sm.prs().get(2).unwrap().state, ProgressState::Snapshot); + assert_eq!(sm.prs().get(2).unwrap().pending_snapshot, 11); + assert_eq!(sm.prs().get(2).unwrap().next_idx, 12); + assert!(sm.prs().get(2).unwrap().is_paused()); + let snap = sm.msgs.pop().unwrap(); + assert!( + snap.get_msg_type() == MessageType::MsgSnapshot + && snap.get_snapshot().get_metadata().index == request_snapshot_idx, + "{:?}", + snap + ); + + // Append/heartbeats does not set the state from snapshot to probe. + let mut m = new_message(2, 1, MessageType::MsgAppendResponse, 0); + m.index = 11; + sm.step(m).unwrap(); + assert_eq!(sm.prs().get(2).unwrap().state, ProgressState::Snapshot); + assert_eq!(sm.prs().get(2).unwrap().pending_snapshot, 11); + assert_eq!(sm.prs().get(2).unwrap().next_idx, 12); + assert!(sm.prs().get(2).unwrap().is_paused()); + + // However snapshot status report does set the stat to probe. + let m = new_message(2, 1, MessageType::MsgSnapStatus, 0); + sm.step(m).unwrap(); + assert_eq!(sm.prs().get(2).unwrap().state, ProgressState::Probe); + assert_eq!(sm.prs().get(2).unwrap().pending_snapshot, 0); + assert_eq!(sm.prs().get(2).unwrap().next_idx, 12); + assert!(sm.prs().get(2).unwrap().is_paused()); +} From d3bf11800f68c829e3fd65391991366d0f19ce16 Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 17 Jul 2019 17:32:30 +0800 Subject: [PATCH 6/8] fix tests --- harness/src/network.rs | 8 +++++-- src/storage.rs | 4 ++++ tests/integration_cases/test_raft.rs | 35 +++++++++++----------------- 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/harness/src/network.rs b/harness/src/network.rs index 82f4790dd..d27343716 100644 --- a/harness/src/network.rs +++ b/harness/src/network.rs @@ -107,8 +107,12 @@ impl Network { npeers.insert(*id, r); } Some(r) => { - if r.raft.as_ref().map_or(false, |r| r.id != *id) { - panic!("peer {} in peers has a wrong position", r.id); + if let Some(raft) = r.raft.as_ref() { + if raft.id != *id { + panic!("peer {} in peers has a wrong position", r.id); + } + let store = raft.raft_log.store.clone(); + nstorage.insert(*id, store); } npeers.insert(*id, r); } diff --git a/src/storage.rs b/src/storage.rs index 97c36d518..f36047e4f 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -705,6 +705,10 @@ mod test { storage.wl().raft_state.hard_state.term = idx; storage.wl().raft_state.conf_state = conf_state.clone(); + if wresult.is_err() { + storage.wl().trigger_snap_unavailable(); + } + let result = storage.snapshot(windex); if result != wresult { panic!("#{}: want {:?}, got {:?}", i, wresult, result); diff --git a/tests/integration_cases/test_raft.rs b/tests/integration_cases/test_raft.rs index f43404871..d486ef19f 100644 --- a/tests/integration_cases/test_raft.rs +++ b/tests/integration_cases/test_raft.rs @@ -4293,22 +4293,23 @@ fn test_conf_change_check_before_campaign() { } fn prepare_request_snapshot() -> (Network, Snapshot) { - /************************ - fn index_term_11(id: u64, ids: Vec) -> Interface { + let l = testing_logger().new(o!("test" => "log_replication")); + + fn index_term_11(id: u64, ids: Vec, l: &Logger) -> Interface { let store = MemStorage::new(); store .wl() .apply_snapshot(new_snapshot(11, 11, ids.clone())) .unwrap(); - let mut raft = new_test_raft(id, vec![], 5, 1, store); + let mut raft = new_test_raft(id, ids, 5, 1, store, &l); raft.reset(11); raft } - let l = testing_logger().new(o!("test" => "log_replication")); + let mut nt = Network::new(vec![ - Some(index_term_11(1, vec![1, 2, 3])), - Some(index_term_11(2, vec![1, 2, 3])), - Some(index_term_11(3, vec![1, 2, 3])), + Some(index_term_11(1, vec![1, 2, 3], &l)), + Some(index_term_11(2, vec![1, 2, 3], &l)), + Some(index_term_11(3, vec![1, 2, 3], &l)), ],&l); // elect r1 as leader @@ -4321,8 +4322,6 @@ fn prepare_request_snapshot() -> (Network, Snapshot) { assert_eq!(nt.peers[&1].raft_log.committed, 14); assert_eq!(nt.peers[&2].raft_log.committed, 14); - let mut cs = ConfState::default(); - cs.nodes = nt.peers[&1].prs().voter_ids().into_iter().collect(); let ents = nt .peers .get_mut(&1) @@ -4332,25 +4331,16 @@ fn prepare_request_snapshot() -> (Network, Snapshot) { .unwrap_or(&[]) .to_vec(); nt.storage[&1].wl().append(&ents).unwrap(); + nt.storage[&1].wl().commit_to(14).unwrap(); nt.peers.get_mut(&1).unwrap().raft_log.applied = 14; - let s = nt.storage[&1] - .wl() - .create_snapshot(14, Some(cs), vec![7; 7]) - .unwrap() - .to_owned(); // Commit a new raft log. - let mut test_entries = Entry::new(); - test_entries.set_data(b"testdata".to_vec()); + let mut test_entries = Entry::default(); + test_entries.data = b"testdata".to_vec(); let msg = new_message_with_entries(1, 1, MessageType::MsgPropose, vec![test_entries.clone()]); nt.send(vec![msg.clone()]); - (nt, s) - ************************/ - - let l = testing_logger().new(o!("test" => "log_replication")); - let nt = Network::new(vec![], &l); - let s = Snapshot::default(); + let s = nt.storage[&1].snapshot(0).unwrap(); (nt, s) } @@ -4398,6 +4388,7 @@ fn test_follower_request_snapshot() { let hb_resp = new_message(2, 1, MessageType::MsgHeartbeatResponse, 0); nt.send(vec![hb_resp]); nt.send(vec![msg]); + assert_eq!(nt.peers[&1].raft_log.committed, 17); assert_eq!(nt.peers[&2].raft_log.committed, 17); } From 22a016ce87c338b18726b93bf40d0175c6340e8c Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 17 Jul 2019 17:32:47 +0800 Subject: [PATCH 7/8] cargo fmt --- src/lib.rs | 2 +- src/progress/mod.rs | 2 +- src/raft.rs | 4 ++- tests/integration_cases/test_raft.rs | 38 +++++++++++++++++------ tests/integration_cases/test_raft_snap.rs | 2 +- 5 files changed, 34 insertions(+), 14 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 6148f9a63..12bf8148f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -385,12 +385,12 @@ extern crate getset; mod config; mod errors; mod log_unstable; +mod progress; #[cfg(test)] pub mod raft; #[cfg(not(test))] mod raft; mod raft_log; -mod progress; pub mod raw_node; mod read_only; mod status; diff --git a/src/progress/mod.rs b/src/progress/mod.rs index e82b4bed2..2aa4eb027 100644 --- a/src/progress/mod.rs +++ b/src/progress/mod.rs @@ -26,8 +26,8 @@ // limitations under the License. use std::cmp; -use crate::raft::INVALID_INDEX; use self::inflights::Inflights; +use crate::raft::INVALID_INDEX; pub mod inflights; pub mod progress_set; diff --git a/src/raft.rs b/src/raft.rs index 5dce3b282..e9d702094 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -1534,7 +1534,9 @@ impl Raft { pr.ins.free_first_one(); } // Does it request snapshot? - if pr.matched < self.raft_log.last_index() || pr.pending_request_snapshot != INVALID_INDEX { + if pr.matched < self.raft_log.last_index() + || pr.pending_request_snapshot != INVALID_INDEX + { *send_append = true; } diff --git a/tests/integration_cases/test_raft.rs b/tests/integration_cases/test_raft.rs index d486ef19f..29f797d2e 100644 --- a/tests/integration_cases/test_raft.rs +++ b/tests/integration_cases/test_raft.rs @@ -4306,11 +4306,14 @@ fn prepare_request_snapshot() -> (Network, Snapshot) { raft } - let mut nt = Network::new(vec![ - Some(index_term_11(1, vec![1, 2, 3], &l)), - Some(index_term_11(2, vec![1, 2, 3], &l)), - Some(index_term_11(3, vec![1, 2, 3], &l)), - ],&l); + let mut nt = Network::new( + vec![ + Some(index_term_11(1, vec![1, 2, 3], &l)), + Some(index_term_11(2, vec![1, 2, 3], &l)), + Some(index_term_11(3, vec![1, 2, 3], &l)), + ], + &l, + ); // elect r1 as leader nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]); @@ -4425,7 +4428,10 @@ fn test_request_snapshot_unavailable() { .unwrap() .step(req_snap.clone()) .unwrap(); - assert_eq!(nt.peers[&1].prs().get(2).unwrap().state, ProgressState::Probe); + assert_eq!( + nt.peers[&1].prs().get(2).unwrap().state, + ProgressState::Probe + ); // Next index is decreased. nt.peers[&1].get_store().wl().trigger_snap_unavailable(); @@ -4434,7 +4440,10 @@ fn test_request_snapshot_unavailable() { .unwrap() .step(req_snap.clone()) .unwrap(); - assert_eq!(nt.peers[&1].prs().get(2).unwrap().state, ProgressState::Probe); + assert_eq!( + nt.peers[&1].prs().get(2).unwrap().state, + ProgressState::Probe + ); // Snapshot will be available if it requests again. This message must not // be considered stale even if `reject != next - 1` @@ -4443,7 +4452,10 @@ fn test_request_snapshot_unavailable() { .unwrap() .step(req_snap.clone()) .unwrap(); - assert_eq!(nt.peers[&1].prs().get(2).unwrap().state, ProgressState::Snapshot); + assert_eq!( + nt.peers[&1].prs().get(2).unwrap().state, + ProgressState::Snapshot + ); } // Test if request snapshot can make progress when matched is advanced. @@ -4463,7 +4475,10 @@ fn test_request_snapshot_matched_change() { let req_snap = nt.peers.get_mut(&2).unwrap().msgs.pop().unwrap(); // The request snapshot is ignored because it is considered as out of order. nt.peers.get_mut(&1).unwrap().step(req_snap).unwrap(); - assert_eq!(nt.peers[&1].prs().get(2).unwrap().state, ProgressState::Replicate); + assert_eq!( + nt.peers[&1].prs().get(2).unwrap().state, + ProgressState::Replicate + ); // Heartbeat is responsed with a request snapshot message. for _ in 0..nt.peers[&1].get_heartbeat_timeout() { @@ -4477,7 +4492,10 @@ fn test_request_snapshot_matched_change() { .unwrap() .step(req_snap.clone()) .unwrap(); - assert_eq!(nt.peers[&1].prs().get(2).unwrap().state, ProgressState::Snapshot); + assert_eq!( + nt.peers[&1].prs().get(2).unwrap().state, + ProgressState::Snapshot + ); } // Test if request snapshot can make progress when the peer is not Replicate. diff --git a/tests/integration_cases/test_raft_snap.rs b/tests/integration_cases/test_raft_snap.rs index 4bf7e3485..5673230a4 100644 --- a/tests/integration_cases/test_raft_snap.rs +++ b/tests/integration_cases/test_raft_snap.rs @@ -29,7 +29,7 @@ use crate::test_util::*; use crate::testing_logger; use harness::Network; use raft::eraftpb::*; -use raft::{INVALID_INDEX, Error, ProgressState}; +use raft::{Error, ProgressState, INVALID_INDEX}; fn testing_snap() -> Snapshot { new_snapshot(11, 11, vec![1, 2]) From bd436340b9cafee235e8952475d4635865055144 Mon Sep 17 00:00:00 2001 From: qupeng Date: Thu, 18 Jul 2019 15:40:56 +0800 Subject: [PATCH 8/8] address comments. --- src/raft.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index e9d702094..6b8c45504 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -2116,7 +2116,6 @@ impl Raft { "tag" => &self.tag, ); let mut to_send = Message::default(); - to_send.to = m.from; to_send.set_msg_type(MessageType::MsgAppendResponse); to_send.to = m.from; to_send.index = self.raft_log.committed; @@ -2165,7 +2164,6 @@ impl Raft { return; } let mut to_send = Message::default(); - to_send.to = m.from; to_send.set_msg_type(MessageType::MsgHeartbeatResponse); to_send.to = m.from; to_send.context = m.take_context();