From c5067b49d6d35c60915a1e3604ceb0667169caee Mon Sep 17 00:00:00 2001 From: qupeng Date: Thu, 7 Nov 2019 14:36:00 +0800 Subject: [PATCH] refactor progress set to etcd/raft style Signed-off-by: qupeng --- examples/five_mem_node/main.rs | 2 +- src/progress/progress_set.rs | 413 +++++++++++++++++---------------- src/raft.rs | 44 +--- src/raw_node.rs | 2 +- src/read_only.rs | 2 +- 5 files changed, 224 insertions(+), 239 deletions(-) diff --git a/examples/five_mem_node/main.rs b/examples/five_mem_node/main.rs index a453d59c8..9ab5fc5a5 100644 --- a/examples/five_mem_node/main.rs +++ b/examples/five_mem_node/main.rs @@ -301,7 +301,7 @@ fn on_ready( ConfChangeType::RemoveNode => raft_group.raft.remove_node(node_id).unwrap(), ConfChangeType::AddLearnerNode => raft_group.raft.add_learner(node_id).unwrap(), } - let cs = raft_group.raft.prs().configuration().to_conf_state(); + let cs = raft_group.raft.prs().to_conf_state(); store.wl().set_conf_state(cs); } else { // For normal proposals, extract the key-value pair and then diff --git a/src/progress/progress_set.rs b/src/progress/progress_set.rs index 8077a2cb9..55d90cc92 100644 --- a/src/progress/progress_set.rs +++ b/src/progress/progress_set.rs @@ -26,13 +26,14 @@ // limitations under the License. use std::cell::RefCell; +use std::{cmp, fmt, iter, slice, u64}; use slog::Logger; use crate::eraftpb::{ConfState, SnapshotMetadata}; use crate::errors::{Error, Result}; use crate::progress::Progress; -use crate::{DefaultHashBuilder, HashMap, HashSet}; +use crate::{HashMap, HashSet}; /// Get the majority number of given nodes count. #[inline] @@ -43,91 +44,98 @@ pub fn majority(total: usize) -> usize { /// A Raft internal representation of a Configuration. /// /// This is corollary to a ConfState, but optimized for `contains` calls. -#[derive(Clone, Debug, Default, PartialEq, Getters)] +#[derive(Clone, Debug, PartialEq, Default)] pub struct Configuration { - /// The voter set. - #[get = "pub"] - voters: HashSet, - /// The learner set. - #[get = "pub"] - learners: HashSet, -} + auto_leave: bool, -impl Configuration { - /// Create a new configuration with the given configuration. - pub fn new( - voters: impl IntoIterator, - learners: impl IntoIterator, - ) -> Self { - Self { - voters: voters.into_iter().collect(), - learners: learners.into_iter().collect(), - } - } + // Sorted voters. Only the first is valid if it's not in joint. + // Otherwise the first is incoming and the second is outgoing. + voters: [Vec; 2], - /// Create a new `ConfState` from the configuration itself. - pub fn to_conf_state(&self) -> ConfState { - let mut state = ConfState::default(); - state.set_voters(self.voters.iter().cloned().collect()); - state.set_learners(self.learners.iter().cloned().collect()); - state - } + // Sorted learners. Shouldn't intersect with `voters`. + learners: Vec, - /// Create a new `Configuration` from a given `ConfState`. - pub fn from_conf_state(conf_state: &ConfState) -> Self { - Self { - voters: conf_state.voters.iter().cloned().collect(), - learners: conf_state.learners.iter().cloned().collect(), - } + // Demoted learners in joint consensus, because `learners` shouldn't intersect with `voters`. + learners_next: Vec, +} + +impl From for Configuration { + fn from(mut c: ConfState) -> Self { + let mut configuration = Self { + auto_leave: c.get_auto_leave(), + voters: [c.take_voters(), c.take_voters_outgoing()], + learners: c.take_learners(), + learners_next: c.take_learners_next(), + }; + configuration.voters[0].sort(); + configuration.voters[1].sort(); + configuration.learners.sort(); + configuration.learners_next.sort(); + configuration } } -impl From<(Iter1, Iter2)> for Configuration -where - Iter1: IntoIterator, - Iter2: IntoIterator, -{ - fn from((voters, learners): (Iter1, Iter2)) -> Self { - Self { - voters: voters.into_iter().collect(), - learners: learners.into_iter().collect(), - } +impl From for ConfState { + fn from(c: Configuration) -> Self { + let mut state = ConfState::default(); + state.set_auto_leave(c.auto_leave); + state.set_voters(c.voters[0].clone()); + state.set_voters_outgoing(c.voters[1].clone()); + state.set_learners(c.learners.clone()); + state.set_learners_next(c.learners_next.clone()); + state } } impl Configuration { - fn with_capacity(voters: usize, learners: usize) -> Self { - Self { - voters: HashSet::with_capacity_and_hasher(voters, DefaultHashBuilder::default()), - learners: HashSet::with_capacity_and_hasher(learners, DefaultHashBuilder::default()), - } + /// Create a new `ConfState` from the configuration itself. + pub fn to_conf_state(&self) -> ConfState { + self.clone().into() } - /// Validates that the configuration is not problematic. - /// - /// Namely: - /// * There can be no overlap of voters and learners. - /// * There must be at least one voter. - pub fn valid(&self) -> Result<()> { - if let Some(id) = self.voters.intersection(&self.learners).next() { - Err(Error::Exists(*id, "learners")) - } else if self.voters.is_empty() { - Err(Error::ConfigInvalid( - "There must be at least one voter.".into(), - )) - } else { - Ok(()) + /// Create a new `Configuration` from a given `ConfState`. + pub fn from_conf_state(conf_state: &ConfState) -> Self { + Self::from(conf_state.clone()) + } + + // Test the configuration is valid or not. It's invalid when + // 1. `learners` or `learners_next` intersects with `voters`; + // 2. `learners_next` isn't a subset of `voters[1]`; + fn valid(&self) -> bool { + fn find_equal(s1: &[u64], s2: &[u64]) -> bool { + let (mut i, mut j) = (0, 0); + while i < s1.len() && j < s2.len() { + match s1[i].cmp(&s2[j]) { + cmp::Ordering::Equal => return true, + cmp::Ordering::Less => i += 1, + cmp::Ordering::Greater => j += 1, + } + } + false } - } - fn has_quorum(&self, potential_quorum: &HashSet) -> bool { - self.voters.intersection(potential_quorum).count() >= majority(self.voters.len()) + if find_equal(&self.voters[0], &self.learners) + || find_equal(&self.voters[1], &self.learners) + || find_equal(&self.voters[0], &self.learners_next) + { + return false; + } + self.learners_next + .iter() + .all(|l| self.voters[1].binary_search(l).is_ok()) } - /// Returns whether or not the given `id` is a member of this configuration. - #[inline] - pub fn contains(&self, id: u64) -> bool { - self.voters.contains(&id) || self.learners.contains(&id) + fn has_quorum(&self, potential: &[u64]) -> bool { + for cfg in &self.voters { + if cfg.is_empty() { + continue; + } + let c = potential.iter().filter(|p| cfg.binary_search(p).is_ok()); + if c.count() < majority(cfg.len()) { + return false; + } + } + true } } @@ -149,114 +157,59 @@ pub enum CandidacyStatus { #[derive(Clone, Getters)] pub struct ProgressSet { progress: HashMap, - - /// The current configuration state of the cluster. - #[get = "pub"] configuration: Configuration, // A preallocated buffer for sorting in the maximal_committed_index function. // You should not depend on these values unless you just set them. // We use a cell to avoid taking a `&mut self`. sort_buffer: RefCell>, - pub(crate) logger: Logger, + logger: Logger, } impl ProgressSet { - /// Creates a new ProgressSet. + /// Create a new progress set. pub fn new(logger: Logger) -> Self { - Self::with_capacity(0, 0, logger) - } - - /// Create a progress set with the specified sizes already reserved. - pub fn with_capacity(voters: usize, learners: usize, logger: Logger) -> Self { ProgressSet { - progress: HashMap::with_capacity_and_hasher( - voters + learners, - DefaultHashBuilder::default(), - ), - sort_buffer: RefCell::from(Vec::with_capacity(voters)), - configuration: Configuration::with_capacity(voters, learners), + progress: Default::default(), + configuration: Default::default(), + sort_buffer: Default::default(), logger, } } - fn clear(&mut self) { - self.progress.clear(); - self.configuration.voters.clear(); - self.configuration.learners.clear(); - } - pub(crate) fn restore_snapmeta( &mut self, meta: &SnapshotMetadata, next_idx: u64, max_inflight: usize, ) { - self.clear(); + self.restore_conf_state(meta.get_conf_state(), next_idx, max_inflight); + } + + /// Restore a progress set from `conf_state`. + pub fn restore_conf_state( + &mut self, + conf_state: &ConfState, + next_idx: u64, + max_inflight: usize, + ) { + self.configuration = Configuration::from_conf_state(conf_state); let pr = Progress::new(next_idx, max_inflight); - for id in &meta.conf_state.as_ref().unwrap().voters { - self.progress.insert(*id, pr.clone()); - self.configuration.voters.insert(*id); - } - for id in &meta.conf_state.as_ref().unwrap().learners { - self.progress.insert(*id, pr.clone()); - self.configuration.learners.insert(*id); + let mut prs = HashMap::default(); + for id in self.voters().chain(self.learners()) { + prs.insert(id, pr.clone()); } - + self.progress = prs; self.assert_progress_and_configuration_consistent(); } - /// Returns the status of voters. - /// - /// **Note:** Do not use this for majority/quorum calculation. The Raft node may be - /// transitioning to a new configuration and have two qourums. Use `has_quorum` instead. - #[inline] - pub fn voters(&self) -> impl Iterator { - let set = self.voter_ids(); - self.progress.iter().filter(move |(&k, _)| set.contains(&k)) - } - - /// Returns the status of learners. - /// - /// **Note:** Do not use this for majority/quorum calculation. The Raft node may be - /// transitioning to a new configuration and have two qourums. Use `has_quorum` instead. - #[inline] - pub fn learners(&self) -> impl Iterator { - let set = self.learner_ids(); - self.progress.iter().filter(move |(&k, _)| set.contains(&k)) - } - - /// Returns the mutable status of voters. - /// - /// **Note:** Do not use this for majority/quorum calculation. The Raft node may be - /// transitioning to a new configuration and have two qourums. Use `has_quorum` instead. - #[inline] - pub fn voters_mut(&mut self) -> impl Iterator { - let ids = self.voter_ids(); - self.progress - .iter_mut() - .filter(move |(k, _)| ids.contains(k)) - } - - /// Returns the mutable status of learners. - /// - /// **Note:** Do not use this for majority/quorum calculation. The Raft node may be - /// transitioning to a new configuration and have two qourums. Use `has_quorum` instead. - #[inline] - pub fn learners_mut(&mut self) -> impl Iterator { - let ids = self.learner_ids(); - self.progress - .iter_mut() - .filter(move |(k, _)| ids.contains(k)) - } - /// Returns the ids of all known voters. /// /// **Note:** Do not use this for majority/quorum calculation. The Raft node may be /// transitioning to a new configuration and have two qourums. Use `has_quorum` instead. #[inline] pub fn voter_ids(&self) -> HashSet { - self.configuration().voters().clone() + self.voters().collect() } /// Returns the ids of all known learners. @@ -265,7 +218,7 @@ impl ProgressSet { /// transitioning to a new configuration and have two qourums. Use `has_quorum` instead. #[inline] pub fn learner_ids(&self) -> HashSet { - self.configuration().learners().clone() + self.learners().collect() } /// Grabs a reference to the progress of a node. @@ -313,7 +266,8 @@ impl ProgressSet { return Err(Error::Exists(id, "voters")); } - self.configuration.voters.insert(id); + self.configuration.voters[0].push(id); + self.configuration.voters[0].sort(); self.progress.insert(id, pr); self.assert_progress_and_configuration_consistent(); Ok(()) @@ -334,7 +288,8 @@ impl ProgressSet { return Err(Error::Exists(id, "voters")); } - self.configuration.learners.insert(id); + self.configuration.learners.push(id); + self.configuration.learners.sort(); self.progress.insert(id, pr); self.assert_progress_and_configuration_consistent(); Ok(()) @@ -346,10 +301,15 @@ impl ProgressSet { /// pub fn remove(&mut self, id: u64) -> Result> { debug!(self.logger, "Removing peer with id {id}", id = id); - self.configuration.learners.remove(&id); - self.configuration.voters.remove(&id); + if let Ok(pos) = self.configuration.voters[0].binary_search(&id) { + self.configuration.voters[0].swap_remove(pos); + self.configuration.voters[0].sort(); + } + if let Ok(pos) = self.configuration.learners.binary_search(&id) { + self.configuration.learners.swap_remove(pos); + self.configuration.learners.sort(); + } let removed = self.progress.remove(&id); - self.assert_progress_and_configuration_consistent(); Ok(removed) } @@ -358,45 +318,40 @@ impl ProgressSet { pub fn promote_learner(&mut self, id: u64) -> Result<()> { debug!(self.logger, "Promoting peer with id {id}", id = id); - if !self.configuration.learners.remove(&id) { - // Wasn't already a learner. We can't promote what doesn't exist. - return Err(Error::NotExists(id, "learners")); - } - if !self.configuration.voters.insert(id) { - // Already existed, the caller should know this was a noop. - return Err(Error::Exists(id, "voters")); + match self.configuration.learners.binary_search(&id) { + Ok(pos) => { + self.configuration.learners.swap_remove(pos); + self.configuration.learners.sort(); + } + Err(_) => { + // Wasn't already a learner. We can't promote what doesn't exist. + return Err(Error::NotExists(id, "learners")); + } } + self.configuration.voters[0].push(id); + self.configuration.voters[0].sort(); self.assert_progress_and_configuration_consistent(); Ok(()) } - #[inline(always)] - fn assert_progress_and_configuration_consistent(&self) { - debug_assert!(self - .configuration - .voters - .union(&self.configuration.learners) - .all(|v| self.progress.contains_key(v))); - assert_eq!( - self.voter_ids().len() + self.learner_ids().len(), - self.progress.len() - ); - } - /// Returns the maximal committed index for the cluster. /// /// Eg. If the matched indexes are [2,2,2,4,5], it will return 2. pub fn maximal_committed_index(&self) -> u64 { let mut matched = self.sort_buffer.borrow_mut(); - matched.clear(); - self.configuration.voters().iter().for_each(|id| { - let peer = &self.progress[id]; - matched.push(peer.matched); - }); - // Reverse sort. - matched.sort_by(|a, b| b.cmp(a)); - matched[matched.len() / 2] + let mut committed = u64::MAX; + for cfg in &self.configuration.voters { + if !cfg.is_empty() { + matched.clear(); + for id in cfg { + matched.push(self.progress[id].matched); + } + matched.sort_by(|a, b| b.cmp(a)); + committed = cmp::min(committed, matched[matched.len() / 2]); + } + } + committed } /// Returns the Candidate's eligibility in the current election. @@ -404,21 +359,16 @@ impl ProgressSet { /// If it is still eligible, it should continue polling nodes and checking. /// Eventually, the election will result in this returning either `Elected` /// or `Ineligible`, meaning the election can be concluded. - pub fn candidacy_status<'a>( - &self, - votes: impl IntoIterator, - ) -> CandidacyStatus { - let (accepts, rejects) = votes.into_iter().fold( - (HashSet::default(), HashSet::default()), - |(mut accepts, mut rejects), (&id, &accepted)| { - if accepted { - accepts.insert(id); - } else { - rejects.insert(id); - } - (accepts, rejects) - }, - ); + pub fn candidacy_status(&self, votes: &HashMap) -> CandidacyStatus { + let mut accepts = Vec::with_capacity(votes.len()); + let mut rejects = Vec::with_capacity(votes.len()); + for (id, vote) in votes { + if *vote { + accepts.push(*id); + } else { + rejects.push(*id); + } + } if self.configuration.has_quorum(&accepts) { return CandidacyStatus::Elected; @@ -433,14 +383,14 @@ impl ProgressSet { /// /// This should only be called by the leader. pub fn quorum_recently_active(&mut self, perspective_of: u64) -> bool { - let mut active = HashSet::default(); - for (&id, pr) in self.voters_mut() { - if id == perspective_of { - active.insert(id); + let mut active = Vec::with_capacity(self.progress.len()); + for (id, pr) in &mut self.progress { + if *id == perspective_of { + active.push(*id); continue; } if pr.recent_active { - active.insert(id); + active.push(*id); } } for pr in self.progress.values_mut() { @@ -452,9 +402,68 @@ impl ProgressSet { /// Determine if a quorum is formed from the given set of nodes. /// /// This is the only correct way to verify you have reached a quorum for the whole group. - #[inline] - pub fn has_quorum(&self, potential_quorum: &HashSet) -> bool { - self.configuration.has_quorum(potential_quorum) + pub fn has_quorum(&self, potential: &[u64]) -> bool { + self.configuration.has_quorum(potential) + } + + /// Transform self to `ConfState`. + pub fn to_conf_state(&self) -> ConfState { + self.configuration.to_conf_state() + } + + #[inline(always)] + fn assert_progress_and_configuration_consistent(&self) { + debug_assert!(self.configuration.valid()); + debug_assert!(self.progress.len() == self.voters().count() + self.learners().count()); + } + + pub(crate) fn promotable(&self, id: u64) -> bool { + !self.progress.is_empty() && self.voters().any(|p| p == id) + } +} + +impl fmt::Debug for ProgressSet { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.configuration.fmt(f) + } +} + +impl<'a> ProgressSet { + /// Create an iterator over all nodes which can send vote messages. + pub fn voters(&'a self) -> VotersIter<'a> { + VotersIter { + incoming: self.configuration.voters[0].iter().peekable(), + outgoing: self.configuration.voters[1].iter().peekable(), + } + } + + /// Create an iterator over all nodes which can't send vote messages. + pub fn learners(&'a self) -> impl Iterator + 'a { + self.configuration.learners.iter().cloned() + } +} + +pub struct VotersIter<'a> { + incoming: iter::Peekable>, + outgoing: iter::Peekable>, +} + +impl<'a> Iterator for VotersIter<'a> { + type Item = u64; + fn next(&mut self) -> Option { + match (self.incoming.peek(), self.outgoing.peek()) { + (Some(v1), Some(v2)) => match v1.cmp(v2) { + cmp::Ordering::Equal => { + self.incoming.next(); + self.outgoing.next().cloned() + } + cmp::Ordering::Less => self.incoming.next().cloned(), + cmp::Ordering::Greater => self.outgoing.next().cloned(), + }, + (Some(_), None) => self.incoming.next().cloned(), + (None, Some(_)) => self.outgoing.next().cloned(), + _ => None, + } } } diff --git a/src/raft.rs b/src/raft.rs index 9f68ddbd7..851d5d343 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -25,7 +25,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp; +use std::{cmp, slice}; use crate::eraftpb::{Entry, EntryType, HardState, Message, MessageType, Snapshot}; use rand::{self, Rng}; @@ -39,7 +39,7 @@ use super::read_only::{ReadOnly, ReadOnlyOption, ReadState}; use super::storage::Storage; use super::Config; use crate::util; -use crate::{HashMap, HashSet}; +use crate::HashMap; // CAMPAIGN_PRE_ELECTION represents the first phase of a normal election when // Config.pre_vote is true. @@ -220,9 +220,9 @@ impl Raft { c.validate()?; let logger = logger.new(o!("raft_id" => c.id)); let raft_state = store.initial_state()?; - let conf_state = &raft_state.conf_state; - let voters = &conf_state.voters; - let learners = &conf_state.learners; + let mut prs = ProgressSet::new(logger.clone()); + prs.restore_conf_state(&raft_state.conf_state, 1, c.max_inflight_msgs); + let promotable = prs.promotable(c.id); let mut r = Raft { id: c.id, @@ -230,14 +230,10 @@ impl Raft { raft_log: RaftLog::new(store, logger.clone()), max_inflight: c.max_inflight_msgs, max_msg_size: c.max_size_per_msg, - prs: Some(ProgressSet::with_capacity( - voters.len(), - learners.len(), - logger.clone(), - )), + prs: Some(prs), pending_request_snapshot: INVALID_INDEX, state: StateRole::Follower, - promotable: false, + promotable, check_quorum: c.check_quorum, pre_vote: c.pre_vote, read_only: ReadOnly::new(c.read_only_option), @@ -259,21 +255,6 @@ impl Raft { batch_append: c.batch_append, logger, }; - for p in voters { - let pr = Progress::new(1, r.max_inflight); - if let Err(e) = r.mut_prs().insert_voter(*p, pr) { - fatal!(r.logger, "{}", e); - } - if *p == r.id { - r.promotable = true; - } - } - for p in learners { - let pr = Progress::new(1, r.max_inflight); - if let Err(e) = r.mut_prs().insert_learner(*p, pr) { - fatal!(r.logger, "{}", e); - }; - } if raft_state.hard_state != HardState::default() { r.load_state(&raft_state.hard_state); @@ -292,7 +273,7 @@ impl Raft { "applied" => r.raft_log.applied, "last index" => r.raft_log.last_index(), "last term" => r.raft_log.last_term(), - "peers" => ?r.prs().voters().collect::>(), + "peers" => ?r.prs(), ); Ok(r) } @@ -1547,8 +1528,7 @@ impl Raft { return Ok(()); } - let mut self_set = HashSet::default(); - self_set.insert(self.id); + let self_set = slice::from_ref(&self.id); if !self.prs().has_quorum(&self_set) { // thinking: use an interally defined context instead of the user given context. // We can express this in terms of the term and index instead of @@ -1996,12 +1976,8 @@ impl Raft { let next_idx = self.raft_log.last_index() + 1; prs.restore_snapmeta(meta, next_idx, self.max_inflight); prs.get_mut(self.id).unwrap().matched = next_idx - 1; - if prs.configuration().voters().contains(&self.id) { - self.promotable = true; - } else if prs.configuration().learners().contains(&self.id) { - self.promotable = false; - } self.prs = Some(prs); + self.promotable = self.prs().promotable(self.id); self.pending_request_snapshot = INVALID_INDEX; None diff --git a/src/raw_node.rs b/src/raw_node.rs index d93c5ad74..cd2af0ff3 100644 --- a/src/raw_node.rs +++ b/src/raw_node.rs @@ -336,7 +336,7 @@ impl RawNode { ConfChangeType::RemoveNode => self.raft.remove_node(nid)?, }; - Ok(self.raft.prs().configuration().to_conf_state()) + Ok(self.raft.prs().to_conf_state()) } /// Step advances the state machine using the given message. diff --git a/src/read_only.rs b/src/read_only.rs index 0f4679234..7c3f1024d 100644 --- a/src/read_only.rs +++ b/src/read_only.rs @@ -114,7 +114,7 @@ impl ReadOnly { /// Notifies the ReadOnly struct that the raft state machine received /// an acknowledgment of the heartbeat that attached with the read only request /// context. - pub fn recv_ack(&mut self, m: &Message) -> HashSet { + pub fn recv_ack(&mut self, m: &Message) -> Vec { match self.pending_read_index.get_mut(&m.context) { None => Default::default(), Some(rs) => {