From 9906d6e9c01f1aa73e747f863dfe0a866915045f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sat, 18 Feb 2023 21:03:03 +0800 Subject: [PATCH] Change: remove non-blocking membership change When changing membership in nonblocking mode, the leader submits a membership config log but does not wait for the log to be committed. This is useless because the caller has to assert the log is committed, by periodically querying the metrics of a raft node, until it is finally committed. Which actually makes it a blocking routine. API changes: - Removes `allow_lagging` paramenter from `Raft::change_membership()` - Removes error `LearnerIsLagging` Upgrade tip: Adjust API calls to make it compile. Refactor: move `leader_append_entries()` to `LeaderHandler`. --- .../src/network/management.rs | 2 +- .../raft-kv-rocksdb/src/network/management.rs | 2 +- openraft/src/core/mod.rs | 2 - openraft/src/core/raft_core.rs | 184 +---- openraft/src/core/replication_expectation.rs | 7 - openraft/src/engine/engine_impl.rs | 129 ++-- openraft/src/engine/handler/leader_handler.rs | 651 ++++++++++++++++++ openraft/src/engine/handler/mod.rs | 1 + .../src/engine/leader_append_entries_test.rs | 525 -------------- openraft/src/engine/mod.rs | 3 +- openraft/src/error.rs | 14 +- openraft/src/membership/membership.rs | 4 +- openraft/src/membership/membership_state.rs | 48 +- openraft/src/raft.rs | 24 +- openraft/src/raft_state.rs | 5 - openraft/src/raft_state_test.rs | 44 -- .../client_api/t50_lagging_network_write.rs | 2 +- openraft/tests/fixtures/mod.rs | 2 +- openraft/tests/membership/t10_add_learner.rs | 2 +- .../t12_concurrent_write_and_add_learner.rs | 2 +- .../membership/t15_add_remove_follower.rs | 2 +- .../membership/t16_change_membership_cases.rs | 6 +- .../tests/membership/t20_change_membership.rs | 71 +- .../membership/t30_commit_joint_config.rs | 6 +- .../tests/membership/t30_remove_leader.rs | 4 +- .../tests/membership/t40_removed_follower.rs | 2 +- .../t45_remove_unreachable_follower.rs | 2 +- ...99_issue_584_replication_state_reverted.rs | 2 +- openraft/tests/metrics/t30_leader_metrics.rs | 4 +- .../t20_state_machine_apply_membership.rs | 2 +- 30 files changed, 820 insertions(+), 934 deletions(-) delete mode 100644 openraft/src/core/replication_expectation.rs create mode 100644 openraft/src/engine/handler/leader_handler.rs delete mode 100644 openraft/src/engine/leader_append_entries_test.rs diff --git a/examples/raft-kv-memstore/src/network/management.rs b/examples/raft-kv-memstore/src/network/management.rs index ba5bfe014..98c5376ba 100644 --- a/examples/raft-kv-memstore/src/network/management.rs +++ b/examples/raft-kv-memstore/src/network/management.rs @@ -38,7 +38,7 @@ pub async fn change_membership( app: Data, req: Json>, ) -> actix_web::Result { - let res = app.raft.change_membership(req.0, true, false).await; + let res = app.raft.change_membership(req.0, false).await; Ok(Json(res)) } diff --git a/examples/raft-kv-rocksdb/src/network/management.rs b/examples/raft-kv-rocksdb/src/network/management.rs index 3510973e5..b38b5ccd6 100644 --- a/examples/raft-kv-rocksdb/src/network/management.rs +++ b/examples/raft-kv-rocksdb/src/network/management.rs @@ -39,7 +39,7 @@ async fn add_learner(mut req: Request>) -> tide::Result { /// Changes specified learners to members, or remove members. async fn change_membership(mut req: Request>) -> tide::Result { let body: BTreeSet = req.body_json().await?; - let res = req.state().raft.change_membership(body, true, false).await; + let res = req.state().raft.change_membership(body, false).await; Ok(Response::builder(StatusCode::Ok).body(Body::from_json(&res)?).build()) } diff --git a/openraft/src/core/mod.rs b/openraft/src/core/mod.rs index 4c23c1bce..31966c3f7 100644 --- a/openraft/src/core/mod.rs +++ b/openraft/src/core/mod.rs @@ -6,7 +6,6 @@ mod install_snapshot; mod raft_core; -mod replication_expectation; mod replication_state; mod server_state; mod snapshot_state; @@ -14,7 +13,6 @@ mod streaming_state; mod tick; pub use raft_core::RaftCore; -pub(crate) use replication_expectation::Expectation; pub(crate) use replication_state::replication_lag; pub use server_state::ServerState; pub(crate) use snapshot_state::SnapshotResult; diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index f306b0061..c05ea9650 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -1,5 +1,4 @@ use std::collections::BTreeMap; -use std::collections::BTreeSet; use std::fmt::Display; use std::mem::swap; use std::pin::Pin; @@ -29,8 +28,6 @@ use tracing::Span; use crate::config::Config; use crate::config::RuntimeConfig; use crate::config::SnapshotPolicy; -use crate::core::replication_lag; -use crate::core::Expectation; use crate::core::ServerState; use crate::core::SnapshotResult; use crate::core::SnapshotState; @@ -39,16 +36,11 @@ use crate::engine::Command; use crate::engine::Engine; use crate::engine::SendResult; use crate::entry::EntryRef; -use crate::error::ChangeMembershipError; use crate::error::CheckIsLeaderError; use crate::error::ClientWriteError; -use crate::error::EmptyMembership; use crate::error::Fatal; use crate::error::ForwardToLeader; -use crate::error::InProgress; use crate::error::InitializeError; -use crate::error::LearnerIsLagging; -use crate::error::LearnerNotFound; use crate::error::QuorumNotEnough; use crate::error::RPCError; use crate::error::Timeout; @@ -377,16 +369,7 @@ impl, S: RaftStorage> RaftCore, ) -> Result<(), Fatal> { - if let Some(l) = &self.leader_data { - tracing::debug!( - "add target node {} as learner; current nodes: {:?}", - target, - l.nodes.keys() - ); - } else { - unreachable!("it has to be a leader!!!"); - } - + // TODO: move these logic to Engine? let curr = &self.engine.state.membership_state.effective().membership; let new_membership = curr.add_learner(target, node); @@ -405,116 +388,19 @@ impl, S: RaftStorage> RaftCore, - expectation: Option, turn_to_learner: bool, tx: RaftRespTx, ClientWriteError>, ) -> Result<(), Fatal> { - let last = self.engine.state.membership_state.effective().membership.get_joint_config().last().unwrap(); - let members = changes.apply_to(last); - - // Ensure cluster will have at least one node. - if members.is_empty() { - let _ = tx.send(Err(ClientWriteError::ChangeMembershipError( - ChangeMembershipError::EmptyMembership(EmptyMembership {}), - ))); - return Ok(()); - } - - let res = self.check_membership_committed(); - if let Err(e) = res { - let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(e))); - return Ok(()); - } - - let mem = &self.engine.state.membership_state.effective(); - let curr = mem.membership.clone(); - - let old_members = mem.voter_ids().collect::>(); - let only_in_new = members.difference(&old_members); - - let new_config = curr.next_safe(members.clone(), turn_to_learner); - - tracing::debug!(?new_config, "new_config"); - - for node_id in only_in_new.clone() { - if !mem.contains(node_id) { - let not_found = LearnerNotFound { node_id: *node_id }; - let _ = tx.send(Err(ClientWriteError::ChangeMembershipError( - ChangeMembershipError::LearnerNotFound(not_found), - ))); + let res = self.engine.state.membership_state.next_membership(changes, turn_to_learner); + let new_membership = match res { + Ok(x) => x, + Err(e) => { + let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(e))); return Ok(()); } - } - - if let Err(e) = self.check_replication_states(only_in_new, expectation) { - let _ = tx.send(Err(e.into())); - return Ok(()); - } - - self.write_entry(EntryPayload::Membership(new_config), Some(tx)).await?; - Ok(()) - } - - /// Check if the effective membership is committed, so that a new membership is allowed to be - /// proposed. - fn check_membership_committed(&self) -> Result<(), ChangeMembershipError> { - let st = &self.engine.state; - - if st.is_membership_committed() { - return Ok(()); - } - - Err(ChangeMembershipError::InProgress(InProgress { - committed: st.committed().copied(), - membership_log_id: st.membership_state.effective().log_id, - })) - } - - /// return Ok if all the current replication states satisfy the `expectation` for changing - /// membership. - fn check_replication_states<'n>( - &self, - nodes: impl Iterator, - expectation: Option, - ) -> Result<(), ChangeMembershipError> { - let expectation = match &expectation { - None => { - // No expectation, whatever is OK. - return Ok(()); - } - Some(x) => x, }; - let last_log_id = self.engine.state.last_log_id(); - - for node_id in nodes { - match expectation { - Expectation::AtLineRate => { - // Expect to be at line rate but not. - - let matching = if let Some(l) = &self.engine.internal_server_state.leading() { - *l.progress.get(node_id) - } else { - unreachable!("it has to be a leader!!!"); - }; - - let distance = replication_lag(&matching.matching.index(), &last_log_id.index()); - - if distance <= self.config.replication_lag_threshold { - continue; - } - - let lagging = LearnerIsLagging { - node_id: *node_id, - matched: matching.matching, - distance, - }; - - return Err(ChangeMembershipError::LearnerIsLagging(lagging)); - } - } - } - + self.write_entry(EntryPayload::Membership(new_membership), Some(tx)).await?; Ok(()) } @@ -530,16 +416,22 @@ impl, S: RaftStorage> RaftCore, resp_tx: Option>, - ) -> Result<(), Fatal> { + ) -> Result> { tracing::debug!(payload = display(payload.summary()), "write_entry"); + let (mut lh, tx) = if let Some((lh, tx)) = self.engine.get_leader_handler_or_reject(resp_tx) { + (lh, tx) + } else { + return Ok(false); + }; + let mut entry_refs = [EntryRef::new(&payload)]; // TODO: it should returns membership config error etc. currently this is done by the - // caller. - self.engine.leader_append_entries(&mut entry_refs); + // caller. + lh.leader_append_entries(&mut entry_refs); // Install callback channels. - if let Some(tx) = resp_tx { + if let Some(tx) = tx { if let Some(l) = &mut self.leader_data { l.client_resp_channels.insert(entry_refs[0].log_id.index, tx); } @@ -547,7 +439,7 @@ impl, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore { - if self.engine.state.is_leader(&self.engine.config.id) { - self.write_entry(rpc, Some(tx)).await?; - } else { - self.reject_with_forward_to_leader(tx); - } + RaftMsg::ClientWriteRequest { payload, tx } => { + self.write_entry(payload, Some(tx)).await?; } RaftMsg::Initialize { members, tx } => { self.handle_initialize(members, tx).await?; @@ -1166,15 +1054,10 @@ impl, S: RaftStorage> RaftCore { - if self.engine.state.is_leader(&self.engine.config.id) { - self.change_membership(changes, when, turn_to_learner, tx).await?; - } else { - self.reject_with_forward_to_leader(tx); - } + self.change_membership(changes, turn_to_learner, tx).await?; } RaftMsg::ExternalRequest { req } => { req(&self.engine.state, &mut self.storage, &mut self.network); @@ -1192,13 +1075,16 @@ impl, S: RaftStorage> RaftCore { - // TODO: reject if it is not leader? - self.write_entry(EntryPayload::Blank, None).await?; - let log_id = self.engine.state.last_log_id(); - tracing::debug!( - log_id = display(log_id.summary()), - "ExternalCommand: sent heartbeat log" - ); + let is_leader = self.write_entry(EntryPayload::Blank, None).await?; + if is_leader { + let log_id = self.engine.state.last_log_id(); + tracing::debug!( + log_id = display(log_id.summary()), + "ExternalCommand: sent heartbeat log" + ); + } else { + tracing::warn!("ExternalCommand: failed to send heartbeat log, not a leader"); + } } ExternalCommand::Snapshot => self.trigger_snapshot_if_needed(true).await, } @@ -1239,9 +1125,11 @@ impl, S: RaftStorage> RaftCore( + &mut self, + tx: Option>, + ) -> Option<(LeaderHandler, Option>)> + where + E: From>, + { + let res = self.leader_handler(); + let forward_err = match res { + Ok(lh) => { + tracing::debug!("this node is a leader"); + return Some((lh, tx)); + } + Err(forward_err) => forward_err, + }; + + if let Some(tx) = tx { + let _ = tx.send(Err(forward_err.into())); + } + + None + } + #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn handle_vote_req(&mut self, req: VoteRequest) -> VoteResponse { tracing::info!(req = display(req.summary()), "Engine::handle_vote_req"); @@ -344,83 +375,6 @@ where // The only thing that needs to do is update election timer. } - /// Append new log entries by a leader. - /// - /// Also Update effective membership if the payload contains - /// membership config. - /// - /// If there is a membership config log entry, the caller has to guarantee the previous one is - /// committed. - /// - /// TODO(xp): metrics flag needs to be dealt with. - /// TODO(xp): if vote indicates this node is not the leader, refuse append - #[tracing::instrument(level = "debug", skip(self, entries))] - pub(crate) fn leader_append_entries<'a, Ent: RaftEntry + 'a>(&mut self, entries: &mut [Ent]) { - let l = entries.len(); - if l == 0 { - return; - } - - self.state.assign_log_ids(entries.iter_mut()); - self.state.extend_log_ids_from_same_leader(entries); - - self.output.push_command(Command::AppendInputEntries { range: 0..l }); - - // Fast commit: - // If the cluster has only one voter, then an entry will be committed as soon as it is - // appended. But if there is a membership log in the middle of the input entries, - // the condition to commit will change. Thus we have to deal with entries before and - // after a membership entry differently: - // - // When a membership entry is seen, update progress for all former entries. - // Then upgrade the quorum set for the Progress. - // - // E.g., if the input entries are `2..6`, entry 4 changes membership from `a` to `abc`. - // Then it will output a LeaderCommit command to commit entries `2,3`. - // ```text - // 1 2 3 4 5 6 - // a x x a y y - // b - // c - // ``` - // - // If the input entries are `2..6`, entry 4 changes membership from `abc` to `a`. - // Then it will output a LeaderCommit command to commit entries `2,3,4,5,6`. - // ```text - // 1 2 3 4 5 6 - // a x x a y y - // b - // c - // ``` - - let mut rh = self.replication_handler(); - - for entry in entries.iter() { - if let Some(m) = entry.get_membership() { - let log_index = entry.get_log_id().index; - - if log_index > 0 { - let prev_log_id = rh.state.get_log_id(log_index - 1); - rh.update_local_progress(prev_log_id); - } - - // since this entry, the condition to commit has been changed. - rh.append_membership(entry.get_log_id(), m); - } - } - - let last_log_id = { - // Safe unwrap(): entries.len() > 0 - let last = entries.last().unwrap(); - Some(*last.get_log_id()) - }; - - rh.update_local_progress(last_log_id); - rh.initiate_replication(); - - self.output.push_command(Command::MoveInputCursorBy { n: l }); - } - /// Append entries to follower/learner. /// /// Also clean conflicting entries and update membership state. @@ -602,6 +556,27 @@ where } } + pub(crate) fn leader_handler(&mut self) -> Result, ForwardToLeader> { + let leader = match self.internal_server_state.leading_mut() { + None => { + tracing::debug!("this node is NOT a leader: {:?}", self.state.server_state); + return Err(self.state.forward_to_leader()); + } + Some(x) => x, + }; + + if !self.state.is_leader(&self.config.id) { + return Err(self.state.forward_to_leader()); + } + + Ok(LeaderHandler { + config: &mut self.config, + leader, + state: &mut self.state, + output: &mut self.output, + }) + } + pub(crate) fn replication_handler(&mut self) -> ReplicationHandler { let leader = match self.internal_server_state.leading_mut() { None => { diff --git a/openraft/src/engine/handler/leader_handler.rs b/openraft/src/engine/handler/leader_handler.rs new file mode 100644 index 000000000..cd43b7c2c --- /dev/null +++ b/openraft/src/engine/handler/leader_handler.rs @@ -0,0 +1,651 @@ +use crate::engine::engine_impl::EngineOutput; +use crate::engine::handler::replication_handler::ReplicationHandler; +use crate::engine::Command; +use crate::engine::EngineConfig; +use crate::entry::RaftEntry; +use crate::internal_server_state::LeaderQuorumSet; +use crate::leader::Leader; +use crate::raft_state::LogStateReader; +use crate::Node; +use crate::NodeId; +use crate::RaftState; + +/// Handle leader operations. +/// +/// - Append new logs; +/// - Change membership; +/// - etc +pub(crate) struct LeaderHandler<'x, NID, N> +where + NID: NodeId, + N: Node, +{ + pub(crate) config: &'x mut EngineConfig, + pub(crate) leader: &'x mut Leader>, + pub(crate) state: &'x mut RaftState, + pub(crate) output: &'x mut EngineOutput, +} + +impl<'x, NID, N> LeaderHandler<'x, NID, N> +where + NID: NodeId, + N: Node, +{ + /// Append new log entries by a leader. + /// + /// Also Update effective membership if the payload contains + /// membership config. + /// + /// If there is a membership config log entry, the caller has to guarantee the previous one is + /// committed. + /// + /// TODO(xp): metrics flag needs to be dealt with. + /// TODO(xp): if vote indicates this node is not the leader, refuse append + #[tracing::instrument(level = "debug", skip(self, entries))] + pub(crate) fn leader_append_entries<'a, Ent: RaftEntry + 'a>(&mut self, entries: &mut [Ent]) { + let l = entries.len(); + if l == 0 { + return; + } + + self.state.assign_log_ids(entries.iter_mut()); + self.state.extend_log_ids_from_same_leader(entries); + + self.output.push_command(Command::AppendInputEntries { range: 0..l }); + + // Fast commit: + // If the cluster has only one voter, then an entry will be committed as soon as it is + // appended. But if there is a membership log in the middle of the input entries, + // the condition to commit will change. Thus we have to deal with entries before and + // after a membership entry differently: + // + // When a membership entry is seen, update progress for all former entries. + // Then upgrade the quorum set for the Progress. + // + // E.g., if the input entries are `2..6`, entry 4 changes membership from `a` to `abc`. + // Then it will output a LeaderCommit command to commit entries `2,3`. + // ```text + // 1 2 3 4 5 6 + // a x x a y y + // b + // c + // ``` + // + // If the input entries are `2..6`, entry 4 changes membership from `abc` to `a`. + // Then it will output a LeaderCommit command to commit entries `2,3,4,5,6`. + // ```text + // 1 2 3 4 5 6 + // a x x a y y + // b + // c + // ``` + + let mut rh = self.replication_handler(); + + for entry in entries.iter() { + if let Some(m) = entry.get_membership() { + let log_index = entry.get_log_id().index; + + if log_index > 0 { + let prev_log_id = rh.state.get_log_id(log_index - 1); + rh.update_local_progress(prev_log_id); + } + + // since this entry, the condition to commit has been changed. + rh.append_membership(entry.get_log_id(), m); + } + } + + let last_log_id = { + // Safe unwrap(): entries.len() > 0 + let last = entries.last().unwrap(); + Some(*last.get_log_id()) + }; + + rh.update_local_progress(last_log_id); + rh.initiate_replication(); + + self.output.push_command(Command::MoveInputCursorBy { n: l }); + } + + pub(crate) fn replication_handler(&mut self) -> ReplicationHandler { + ReplicationHandler { + config: self.config, + leader: self.leader, + state: self.state, + output: self.output, + } + } +} + +#[cfg(test)] +mod tests { + mod test_leader_append_entries { + use std::sync::Arc; + + use maplit::btreeset; + #[allow(unused_imports)] use pretty_assertions::assert_eq; + #[allow(unused_imports)] use pretty_assertions::assert_ne; + #[allow(unused_imports)] use pretty_assertions::assert_str_eq; + + use crate::engine::Command; + use crate::engine::Engine; + use crate::progress::entry::ProgressEntry; + use crate::progress::Inflight; + use crate::raft_state::LogStateReader; + use crate::vote::CommittedLeaderId; + use crate::EffectiveMembership; + use crate::Entry; + use crate::EntryPayload; + use crate::LogId; + use crate::Membership; + use crate::MembershipState; + use crate::MetricsChangeFlags; + use crate::ServerState; + use crate::Vote; + + crate::declare_raft_types!( + pub(crate) Foo: D=(), R=(), NodeId=u64, Node=() + ); + + use crate::testing::log_id; + + fn blank(term: u64, index: u64) -> Entry { + Entry { + log_id: log_id(term, index), + payload: EntryPayload::::Blank, + } + } + + fn m01() -> Membership { + Membership::::new(vec![btreeset! {0,1}], None) + } + + fn m1() -> Membership { + Membership::::new(vec![btreeset! {1}], None) + } + + /// members: {1}, learners: {2} + fn m1_2() -> Membership { + Membership::::new(vec![btreeset! {1}], Some(btreeset! {2})) + } + + fn m13() -> Membership { + Membership::::new(vec![btreeset! {1,3}], None) + } + + fn m23() -> Membership { + Membership::::new(vec![btreeset! {2,3}], None) + } + + fn m34() -> Membership { + Membership::::new(vec![btreeset! {3,4}], None) + } + + fn eng() -> Engine { + let mut eng = Engine::default(); + eng.state.enable_validate = false; // Disable validation for incomplete state + + eng.config.id = 1; + eng.state.committed = Some(log_id(0, 0)); + eng.state.vote = Vote::new_committed(3, 1); + eng.state.log_ids.append(log_id(1, 1)); + eng.state.log_ids.append(log_id(2, 3)); + eng.state.membership_state = MembershipState::new( + Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), + Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m23())), + ); + eng.state.server_state = eng.calc_server_state(); + + eng + } + + #[test] + fn test_leader_append_entries_empty() -> anyhow::Result<()> { + let mut eng = eng(); + eng.vote_handler().become_leading(); + + eng.leader_handler()?.leader_append_entries(&mut Vec::>::new()); + + assert_eq!( + &[ + log_id(1, 1), // + log_id(2, 3), + ], + eng.state.log_ids.key_log_ids() + ); + assert_eq!(Some(&log_id(2, 3)), eng.state.last_log_id()); + assert_eq!( + MembershipState::new( + Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), + Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m23())), + ), + eng.state.membership_state + ); + + assert_eq!( + MetricsChangeFlags { + replication: false, + local_data: false, + cluster: false, + }, + eng.output.metrics_flags + ); + + assert_eq!(0, eng.output.commands.len()); + + Ok(()) + } + + #[test] + fn test_leader_append_entries_normal() -> anyhow::Result<()> { + let mut eng = eng(); + eng.vote_handler().become_leading(); + + // log id will be assigned by eng. + eng.leader_handler()?.leader_append_entries(&mut [ + blank(1, 1), // + blank(1, 1), + blank(1, 1), + ]); + + assert_eq!( + &[ + log_id(1, 1), // + log_id(2, 3), + LogId::new(CommittedLeaderId::new(3, 1), 4), + LogId::new(CommittedLeaderId::new(3, 1), 6), + ], + eng.state.log_ids.key_log_ids() + ); + assert_eq!( + Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)), + eng.state.last_log_id() + ); + assert_eq!( + MembershipState::new( + Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), + Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m23())), + ), + eng.state.membership_state + ); + + assert_eq!( + MetricsChangeFlags { + replication: false, + local_data: true, + cluster: false, + }, + eng.output.metrics_flags + ); + + assert_eq!( + vec![ + Command::AppendInputEntries { range: 0..3 }, + Command::Replicate { + target: 2, + req: Inflight::logs(None, Some(log_id(3, 6))).with_id(1), + }, + Command::Replicate { + target: 3, + req: Inflight::logs(None, Some(log_id(3, 6))).with_id(1), + }, + Command::MoveInputCursorBy { n: 3 }, + ], + eng.output.commands + ); + + Ok(()) + } + + #[test] + fn test_leader_append_entries_fast_commit() -> anyhow::Result<()> { + let mut eng = eng(); + eng.state + .membership_state + .set_effective(Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m1()))); + eng.vote_handler().become_leading(); + + eng.output.commands = vec![]; + eng.output.metrics_flags.reset(); + + // log id will be assigned by eng. + eng.leader_handler()?.leader_append_entries(&mut [ + blank(1, 1), // + blank(1, 1), + blank(1, 1), + ]); + + assert_eq!( + &[ + log_id(1, 1), // + log_id(2, 3), + LogId::new(CommittedLeaderId::new(3, 1), 4), + LogId::new(CommittedLeaderId::new(3, 1), 6), + ], + eng.state.log_ids.key_log_ids() + ); + assert_eq!( + Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)), + eng.state.last_log_id() + ); + assert_eq!( + MembershipState::new( + Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m1())), + Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m1())), + ), + eng.state.membership_state + ); + assert_eq!( + Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)), + eng.state.committed() + ); + + assert_eq!( + vec![ + Command::AppendInputEntries { range: 0..3 }, + Command::ReplicateCommitted { + committed: Some(log_id(3, 6)) + }, + Command::LeaderCommit { + already_committed: Some(log_id(0, 0)), + upto: LogId::new(CommittedLeaderId::new(3, 1), 6) + }, + Command::MoveInputCursorBy { n: 3 }, + ], + eng.output.commands + ); + + assert_eq!( + MetricsChangeFlags { + replication: false, + local_data: true, + cluster: false, + }, + eng.output.metrics_flags + ); + + Ok(()) + } + + /// With membership log, fast-commit upto membership entry that is not a single-node + /// cluster. Leader is no longer a voter should work. + #[test] + fn test_leader_append_entries_fast_commit_upto_membership_entry() -> anyhow::Result<()> { + let mut eng = eng(); + eng.state + .membership_state + .set_effective(Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m1()))); + eng.state.server_state = ServerState::Leader; + eng.vote_handler().become_leading(); + + // log id will be assigned by eng. + eng.leader_handler()?.leader_append_entries(&mut [ + blank(1, 1), // + Entry { + log_id: log_id(1, 1), + payload: EntryPayload::Membership(m34()), + }, + blank(1, 1), + ]); + + assert_eq!( + &[ + log_id(1, 1), // + log_id(2, 3), + LogId::new(CommittedLeaderId::new(3, 1), 4), + LogId::new(CommittedLeaderId::new(3, 1), 6), + ], + eng.state.log_ids.key_log_ids() + ); + assert_eq!( + Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)), + eng.state.last_log_id() + ); + assert_eq!( + MembershipState::new( + // previous effective become committed. + Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m1())), + // new effective. + Arc::new(EffectiveMembership::new(Some(log_id(3, 5)), m34())), + ), + eng.state.membership_state + ); + assert_eq!( + Some(&LogId::new(CommittedLeaderId::new(3, 1), 4)), + eng.state.committed() + ); + + assert_eq!( + MetricsChangeFlags { + replication: true, + local_data: true, + cluster: true, + }, + eng.output.metrics_flags + ); + + assert_eq!( + vec![ + Command::AppendInputEntries { range: 0..3 }, + Command::ReplicateCommitted { + committed: Some(log_id(3, 4)) + }, + Command::LeaderCommit { + already_committed: Some(log_id(0, 0)), + upto: LogId::new(CommittedLeaderId::new(3, 1), 4) + }, + Command::UpdateMembership { + membership: Arc::new(EffectiveMembership::new( + Some(LogId::new(CommittedLeaderId::new(3, 1), 5)), + m34() + )), + }, + Command::RebuildReplicationStreams { + targets: vec![(3, ProgressEntry::empty(7)), (4, ProgressEntry::empty(7))] + }, + Command::Replicate { + target: 3, + req: Inflight::logs(None, Some(log_id(3, 6))).with_id(1), + }, + Command::Replicate { + target: 4, + req: Inflight::logs(None, Some(log_id(3, 6))).with_id(1), + }, + Command::MoveInputCursorBy { n: 3 }, + ], + eng.output.commands + ); + + Ok(()) + } + + /// With membership log, fast-commit is allowed if no voter change. + #[test] + fn test_leader_append_entries_fast_commit_membership_no_voter_change() -> anyhow::Result<()> { + let mut eng = eng(); + eng.state + .membership_state + .set_effective(Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m1()))); + eng.vote_handler().become_leading(); + eng.state.server_state = eng.calc_server_state(); + + eng.output.commands = vec![]; + eng.output.metrics_flags.reset(); + + // log id will be assigned by eng. + eng.leader_handler()?.leader_append_entries(&mut [ + blank(1, 1), // + Entry { + log_id: log_id(1, 1), + payload: EntryPayload::Membership(m1_2()), + }, + blank(1, 1), + ]); + + assert_eq!( + &[ + log_id(1, 1), // + log_id(2, 3), + LogId::new(CommittedLeaderId::new(3, 1), 4), + LogId::new(CommittedLeaderId::new(3, 1), 6), + ], + eng.state.log_ids.key_log_ids() + ); + assert_eq!( + Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)), + eng.state.last_log_id() + ); + assert_eq!( + MembershipState::new( + Arc::new(EffectiveMembership::new(Some(log_id(3, 5)), m1_2())), + Arc::new(EffectiveMembership::new(Some(log_id(3, 5)), m1_2())), + ), + eng.state.membership_state + ); + assert_eq!( + Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)), + eng.state.committed() + ); + + assert_eq!( + MetricsChangeFlags { + replication: true, + local_data: true, + cluster: true, + }, + eng.output.metrics_flags + ); + + assert_eq!( + vec![ + Command::AppendInputEntries { range: 0..3 }, + // first commit upto the membership entry(exclusive). + Command::ReplicateCommitted { + committed: Some(log_id(3, 4)) + }, + Command::LeaderCommit { + already_committed: Some(log_id(0, 0)), + upto: LogId::new(CommittedLeaderId::new(3, 1), 4) + }, + Command::UpdateMembership { + membership: Arc::new(EffectiveMembership::new( + Some(LogId::new(CommittedLeaderId::new(3, 1), 5)), + m1_2() + )), + }, + Command::RebuildReplicationStreams { + targets: vec![(2, ProgressEntry::empty(7))] + }, + Command::Replicate { + target: 2, + req: Inflight::logs(None, Some(log_id(3, 6))).with_id(1), + }, + // second commit upto the end. + Command::ReplicateCommitted { + committed: Some(log_id(3, 6)) + }, + Command::LeaderCommit { + already_committed: Some(LogId::new(CommittedLeaderId::new(3, 1), 4)), + upto: LogId::new(CommittedLeaderId::new(3, 1), 6) + }, + Command::MoveInputCursorBy { n: 3 }, + ], + eng.output.commands + ); + + Ok(()) + } + + // TODO(xp): check progress + + /// With membership log, fast-commit all if the membership log changes to one voter. + #[test] + fn test_leader_append_entries_fast_commit_if_membership_voter_change_to_1() -> anyhow::Result<()> { + let mut eng = eng(); + eng.state + .membership_state + .set_effective(Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m13()))); + eng.vote_handler().become_leading(); + eng.state.server_state = eng.calc_server_state(); + + eng.output.commands = vec![]; + eng.output.metrics_flags.reset(); + + // log id will be assigned by eng. + eng.leader_handler()?.leader_append_entries(&mut [ + blank(1, 1), // + Entry { + log_id: log_id(1, 1), + payload: EntryPayload::Membership(m1_2()), + }, + blank(1, 1), + ]); + + assert_eq!( + &[ + log_id(1, 1), // + log_id(2, 3), + LogId::new(CommittedLeaderId::new(3, 1), 4), + LogId::new(CommittedLeaderId::new(3, 1), 6), + ], + eng.state.log_ids.key_log_ids() + ); + assert_eq!( + Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)), + eng.state.last_log_id() + ); + assert_eq!( + MembershipState::new( + Arc::new(EffectiveMembership::new(Some(log_id(3, 5)), m1_2())), + Arc::new(EffectiveMembership::new(Some(log_id(3, 5)), m1_2())), + ), + eng.state.membership_state + ); + assert_eq!( + Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)), + eng.state.committed() + ); + + assert_eq!( + vec![ + Command::AppendInputEntries { range: 0..3 }, + Command::UpdateMembership { + membership: Arc::new(EffectiveMembership::new( + Some(LogId::new(CommittedLeaderId::new(3, 1), 5)), + m1_2() + )), + }, + Command::RebuildReplicationStreams { + targets: vec![(2, ProgressEntry::empty(7))] + }, + Command::Replicate { + target: 2, + req: Inflight::logs(None, Some(log_id(3, 6))).with_id(1), + }, + // It is correct to commit if the membership change ot a one node cluster. + Command::ReplicateCommitted { + committed: Some(log_id(3, 6)) + }, + Command::LeaderCommit { + already_committed: Some(log_id(0, 0)), + upto: LogId::new(CommittedLeaderId::new(3, 1), 6) + }, + Command::MoveInputCursorBy { n: 3 }, + ], + eng.output.commands + ); + + assert_eq!( + MetricsChangeFlags { + replication: true, + local_data: true, + cluster: true, + }, + eng.output.metrics_flags + ); + + Ok(()) + } + } +} diff --git a/openraft/src/engine/handler/mod.rs b/openraft/src/engine/handler/mod.rs index dcc7963e2..4ac246801 100644 --- a/openraft/src/engine/handler/mod.rs +++ b/openraft/src/engine/handler/mod.rs @@ -1,4 +1,5 @@ pub(crate) mod following_handler; +pub(crate) mod leader_handler; pub(crate) mod log_handler; pub(crate) mod replication_handler; pub(crate) mod server_state_handler; diff --git a/openraft/src/engine/leader_append_entries_test.rs b/openraft/src/engine/leader_append_entries_test.rs deleted file mode 100644 index 7eadc074d..000000000 --- a/openraft/src/engine/leader_append_entries_test.rs +++ /dev/null @@ -1,525 +0,0 @@ -use std::sync::Arc; - -use maplit::btreeset; -#[allow(unused_imports)] use pretty_assertions::assert_eq; -#[allow(unused_imports)] use pretty_assertions::assert_ne; -#[allow(unused_imports)] use pretty_assertions::assert_str_eq; - -use crate::engine::Command; -use crate::engine::Engine; -use crate::progress::entry::ProgressEntry; -use crate::progress::Inflight; -use crate::raft_state::LogStateReader; -use crate::vote::CommittedLeaderId; -use crate::EffectiveMembership; -use crate::Entry; -use crate::EntryPayload; -use crate::LogId; -use crate::Membership; -use crate::MembershipState; -use crate::MetricsChangeFlags; -use crate::ServerState; -use crate::Vote; - -crate::declare_raft_types!( - pub(crate) Foo: D=(), R=(), NodeId=u64, Node=() -); - -use crate::testing::log_id; - -fn blank(term: u64, index: u64) -> Entry { - Entry { - log_id: log_id(term, index), - payload: EntryPayload::::Blank, - } -} - -fn m01() -> Membership { - Membership::::new(vec![btreeset! {0,1}], None) -} - -fn m1() -> Membership { - Membership::::new(vec![btreeset! {1}], None) -} - -/// members: {1}, learners: {2} -fn m1_2() -> Membership { - Membership::::new(vec![btreeset! {1}], Some(btreeset! {2})) -} - -fn m13() -> Membership { - Membership::::new(vec![btreeset! {1,3}], None) -} - -fn m23() -> Membership { - Membership::::new(vec![btreeset! {2,3}], None) -} - -fn m34() -> Membership { - Membership::::new(vec![btreeset! {3,4}], None) -} - -fn eng() -> Engine { - let mut eng = Engine::default(); - eng.state.enable_validate = false; // Disable validation for incomplete state - - eng.config.id = 1; - eng.state.committed = Some(log_id(0, 0)); - eng.state.vote = Vote::new_committed(3, 1); - eng.state.log_ids.append(log_id(1, 1)); - eng.state.log_ids.append(log_id(2, 3)); - eng.state.membership_state = MembershipState::new( - Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), - Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m23())), - ); - eng.state.server_state = eng.calc_server_state(); - - eng -} - -#[test] -fn test_leader_append_entries_empty() -> anyhow::Result<()> { - let mut eng = eng(); - - eng.leader_append_entries(&mut Vec::>::new()); - - assert_eq!( - &[ - log_id(1, 1), // - log_id(2, 3), - ], - eng.state.log_ids.key_log_ids() - ); - assert_eq!(Some(&log_id(2, 3)), eng.state.last_log_id()); - assert_eq!( - MembershipState::new( - Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), - Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m23())), - ), - eng.state.membership_state - ); - - assert_eq!( - MetricsChangeFlags { - replication: false, - local_data: false, - cluster: false, - }, - eng.output.metrics_flags - ); - - assert_eq!(0, eng.output.commands.len()); - - Ok(()) -} - -#[test] -fn test_leader_append_entries_normal() -> anyhow::Result<()> { - let mut eng = eng(); - eng.vote_handler().become_leading(); - - // log id will be assigned by eng. - eng.leader_append_entries(&mut [ - blank(1, 1), // - blank(1, 1), - blank(1, 1), - ]); - - assert_eq!( - &[ - log_id(1, 1), // - log_id(2, 3), - LogId::new(CommittedLeaderId::new(3, 1), 4), - LogId::new(CommittedLeaderId::new(3, 1), 6), - ], - eng.state.log_ids.key_log_ids() - ); - assert_eq!( - Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)), - eng.state.last_log_id() - ); - assert_eq!( - MembershipState::new( - Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), - Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m23())), - ), - eng.state.membership_state - ); - - assert_eq!( - MetricsChangeFlags { - replication: false, - local_data: true, - cluster: false, - }, - eng.output.metrics_flags - ); - - assert_eq!( - vec![ - Command::AppendInputEntries { range: 0..3 }, - Command::Replicate { - target: 2, - req: Inflight::logs(None, Some(log_id(3, 6))).with_id(1), - }, - Command::Replicate { - target: 3, - req: Inflight::logs(None, Some(log_id(3, 6))).with_id(1), - }, - Command::MoveInputCursorBy { n: 3 }, - ], - eng.output.commands - ); - - Ok(()) -} - -#[test] -fn test_leader_append_entries_fast_commit() -> anyhow::Result<()> { - let mut eng = eng(); - eng.state - .membership_state - .set_effective(Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m1()))); - eng.vote_handler().become_leading(); - - eng.output.commands = vec![]; - eng.output.metrics_flags.reset(); - - // log id will be assigned by eng. - eng.leader_append_entries(&mut [ - blank(1, 1), // - blank(1, 1), - blank(1, 1), - ]); - - assert_eq!( - &[ - log_id(1, 1), // - log_id(2, 3), - LogId::new(CommittedLeaderId::new(3, 1), 4), - LogId::new(CommittedLeaderId::new(3, 1), 6), - ], - eng.state.log_ids.key_log_ids() - ); - assert_eq!( - Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)), - eng.state.last_log_id() - ); - assert_eq!( - MembershipState::new( - Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m1())), - Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m1())), - ), - eng.state.membership_state - ); - assert_eq!( - Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)), - eng.state.committed() - ); - - assert_eq!( - vec![ - Command::AppendInputEntries { range: 0..3 }, - Command::ReplicateCommitted { - committed: Some(log_id(3, 6)) - }, - Command::LeaderCommit { - already_committed: Some(log_id(0, 0)), - upto: LogId::new(CommittedLeaderId::new(3, 1), 6) - }, - Command::MoveInputCursorBy { n: 3 }, - ], - eng.output.commands - ); - - assert_eq!( - MetricsChangeFlags { - replication: false, - local_data: true, - cluster: false, - }, - eng.output.metrics_flags - ); - - Ok(()) -} - -/// With membership log, fast-commit upto membership entry that is not a single-node cluster. -/// Leader is no longer a voter should work. -#[test] -fn test_leader_append_entries_fast_commit_upto_membership_entry() -> anyhow::Result<()> { - let mut eng = eng(); - eng.state - .membership_state - .set_effective(Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m1()))); - eng.state.server_state = ServerState::Leader; - eng.vote_handler().become_leading(); - - // log id will be assigned by eng. - eng.leader_append_entries(&mut [ - blank(1, 1), // - Entry { - log_id: log_id(1, 1), - payload: EntryPayload::Membership(m34()), - }, - blank(1, 1), - ]); - - assert_eq!( - &[ - log_id(1, 1), // - log_id(2, 3), - LogId::new(CommittedLeaderId::new(3, 1), 4), - LogId::new(CommittedLeaderId::new(3, 1), 6), - ], - eng.state.log_ids.key_log_ids() - ); - assert_eq!( - Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)), - eng.state.last_log_id() - ); - assert_eq!( - MembershipState::new( - // previous effective become committed. - Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m1())), - // new effective. - Arc::new(EffectiveMembership::new(Some(log_id(3, 5)), m34())), - ), - eng.state.membership_state - ); - assert_eq!( - Some(&LogId::new(CommittedLeaderId::new(3, 1), 4)), - eng.state.committed() - ); - - assert_eq!( - MetricsChangeFlags { - replication: true, - local_data: true, - cluster: true, - }, - eng.output.metrics_flags - ); - - assert_eq!( - vec![ - Command::AppendInputEntries { range: 0..3 }, - Command::ReplicateCommitted { - committed: Some(log_id(3, 4)) - }, - Command::LeaderCommit { - already_committed: Some(log_id(0, 0)), - upto: LogId::new(CommittedLeaderId::new(3, 1), 4) - }, - Command::UpdateMembership { - membership: Arc::new(EffectiveMembership::new( - Some(LogId::new(CommittedLeaderId::new(3, 1), 5)), - m34() - )), - }, - Command::RebuildReplicationStreams { - targets: vec![(3, ProgressEntry::empty(7)), (4, ProgressEntry::empty(7))] - }, - Command::Replicate { - target: 3, - req: Inflight::logs(None, Some(log_id(3, 6))).with_id(1), - }, - Command::Replicate { - target: 4, - req: Inflight::logs(None, Some(log_id(3, 6))).with_id(1), - }, - Command::MoveInputCursorBy { n: 3 }, - ], - eng.output.commands - ); - - Ok(()) -} - -/// With membership log, fast-commit is allowed if no voter change. -#[test] -fn test_leader_append_entries_fast_commit_membership_no_voter_change() -> anyhow::Result<()> { - let mut eng = eng(); - eng.state - .membership_state - .set_effective(Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m1()))); - eng.vote_handler().become_leading(); - eng.state.server_state = eng.calc_server_state(); - - eng.output.commands = vec![]; - eng.output.metrics_flags.reset(); - - // log id will be assigned by eng. - eng.leader_append_entries(&mut [ - blank(1, 1), // - Entry { - log_id: log_id(1, 1), - payload: EntryPayload::Membership(m1_2()), - }, - blank(1, 1), - ]); - - assert_eq!( - &[ - log_id(1, 1), // - log_id(2, 3), - LogId::new(CommittedLeaderId::new(3, 1), 4), - LogId::new(CommittedLeaderId::new(3, 1), 6), - ], - eng.state.log_ids.key_log_ids() - ); - assert_eq!( - Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)), - eng.state.last_log_id() - ); - assert_eq!( - MembershipState::new( - Arc::new(EffectiveMembership::new(Some(log_id(3, 5)), m1_2())), - Arc::new(EffectiveMembership::new(Some(log_id(3, 5)), m1_2())), - ), - eng.state.membership_state - ); - assert_eq!( - Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)), - eng.state.committed() - ); - - assert_eq!( - MetricsChangeFlags { - replication: true, - local_data: true, - cluster: true, - }, - eng.output.metrics_flags - ); - - assert_eq!( - vec![ - Command::AppendInputEntries { range: 0..3 }, - // first commit upto the membership entry(exclusive). - Command::ReplicateCommitted { - committed: Some(log_id(3, 4)) - }, - Command::LeaderCommit { - already_committed: Some(log_id(0, 0)), - upto: LogId::new(CommittedLeaderId::new(3, 1), 4) - }, - Command::UpdateMembership { - membership: Arc::new(EffectiveMembership::new( - Some(LogId::new(CommittedLeaderId::new(3, 1), 5)), - m1_2() - )), - }, - Command::RebuildReplicationStreams { - targets: vec![(2, ProgressEntry::empty(7))] - }, - Command::Replicate { - target: 2, - req: Inflight::logs(None, Some(log_id(3, 6))).with_id(1), - }, - // second commit upto the end. - Command::ReplicateCommitted { - committed: Some(log_id(3, 6)) - }, - Command::LeaderCommit { - already_committed: Some(LogId::new(CommittedLeaderId::new(3, 1), 4)), - upto: LogId::new(CommittedLeaderId::new(3, 1), 6) - }, - Command::MoveInputCursorBy { n: 3 }, - ], - eng.output.commands - ); - - Ok(()) -} - -// TODO(xp): check progress - -/// With membership log, fast-commit all if the membership log changes to one voter. -#[test] -fn test_leader_append_entries_fast_commit_if_membership_voter_change_to_1() -> anyhow::Result<()> { - let mut eng = eng(); - eng.state - .membership_state - .set_effective(Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m13()))); - eng.vote_handler().become_leading(); - eng.state.server_state = eng.calc_server_state(); - - eng.output.commands = vec![]; - eng.output.metrics_flags.reset(); - - // log id will be assigned by eng. - eng.leader_append_entries(&mut [ - blank(1, 1), // - Entry { - log_id: log_id(1, 1), - payload: EntryPayload::Membership(m1_2()), - }, - blank(1, 1), - ]); - - assert_eq!( - &[ - log_id(1, 1), // - log_id(2, 3), - LogId::new(CommittedLeaderId::new(3, 1), 4), - LogId::new(CommittedLeaderId::new(3, 1), 6), - ], - eng.state.log_ids.key_log_ids() - ); - assert_eq!( - Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)), - eng.state.last_log_id() - ); - assert_eq!( - MembershipState::new( - Arc::new(EffectiveMembership::new(Some(log_id(3, 5)), m1_2())), - Arc::new(EffectiveMembership::new(Some(log_id(3, 5)), m1_2())), - ), - eng.state.membership_state - ); - assert_eq!( - Some(&LogId::new(CommittedLeaderId::new(3, 1), 6)), - eng.state.committed() - ); - - assert_eq!( - vec![ - Command::AppendInputEntries { range: 0..3 }, - Command::UpdateMembership { - membership: Arc::new(EffectiveMembership::new( - Some(LogId::new(CommittedLeaderId::new(3, 1), 5)), - m1_2() - )), - }, - Command::RebuildReplicationStreams { - targets: vec![(2, ProgressEntry::empty(7))] - }, - Command::Replicate { - target: 2, - req: Inflight::logs(None, Some(log_id(3, 6))).with_id(1), - }, - // It is correct to commit if the membership change ot a one node cluster. - Command::ReplicateCommitted { - committed: Some(log_id(3, 6)) - }, - Command::LeaderCommit { - already_committed: Some(log_id(0, 0)), - upto: LogId::new(CommittedLeaderId::new(3, 1), 6) - }, - Command::MoveInputCursorBy { n: 3 }, - ], - eng.output.commands - ); - - assert_eq!( - MetricsChangeFlags { - replication: true, - local_data: true, - cluster: true, - }, - eng.output.metrics_flags - ); - - Ok(()) -} diff --git a/openraft/src/engine/mod.rs b/openraft/src/engine/mod.rs index fabe2920c..ca1e6318c 100644 --- a/openraft/src/engine/mod.rs +++ b/openraft/src/engine/mod.rs @@ -29,7 +29,7 @@ mod command; mod engine_impl; -mod handler; +pub(crate) mod handler; mod log_id_list; #[cfg(test)] mod elect_test; @@ -37,7 +37,6 @@ mod log_id_list; #[cfg(test)] mod handle_vote_req_test; #[cfg(test)] mod handle_vote_resp_test; #[cfg(test)] mod initialize_test; -#[cfg(test)] mod leader_append_entries_test; #[cfg(test)] mod log_id_list_test; #[cfg(test)] mod startup_test; #[cfg(test)] mod testing; diff --git a/openraft/src/error.rs b/openraft/src/error.rs index 951395127..0e91b8123 100644 --- a/openraft/src/error.rs +++ b/openraft/src/error.rs @@ -195,9 +195,6 @@ pub enum ChangeMembershipError { #[error(transparent)] LearnerNotFound(#[from] LearnerNotFound), - - #[error(transparent)] - LearnerIsLagging(#[from] LearnerIsLagging), } /// The set of errors which may take place when initializing a pristine Raft node. @@ -401,7 +398,7 @@ pub struct QuorumNotEnough { #[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -#[error("the cluster is already undergoing a configuration change at log {membership_log_id:?}, committed log id: {committed:?}")] +#[error("the cluster is already undergoing a configuration change at log {membership_log_id:?}, last committed membership log id: {committed:?}")] pub struct InProgress { pub committed: Option>, pub membership_log_id: Option>, @@ -414,15 +411,6 @@ pub struct LearnerNotFound { pub node_id: NID, } -#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] -#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] -#[error("replication to learner {node_id} is lagging {distance}, matched: {matched:?}, can not add as member")] -pub struct LearnerIsLagging { - pub node_id: NID, - pub matched: Option>, - pub distance: u64, -} - #[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] #[error("not allowed to initialize due to current raft state: last_log_id: {last_log_id:?} vote: {vote}")] diff --git a/openraft/src/membership/membership.rs b/openraft/src/membership/membership.rs index b7db00645..18207ea14 100644 --- a/openraft/src/membership/membership.rs +++ b/openraft/src/membership/membership.rs @@ -304,7 +304,7 @@ where /// curr = next; /// } /// ``` - pub(crate) fn next_safe(&self, goal: T, turn_to_learner: bool) -> Self + pub(crate) fn next_safe(&self, goal: T, removed_to_learner: bool) -> Self where T: IntoNodes { let goal = goal.into_nodes(); @@ -314,7 +314,7 @@ where let mut nodes = Self::extend_nodes(self.nodes.clone(), &goal); - if !turn_to_learner { + if !removed_to_learner { let old_voter_ids = self.configs.as_joint().ids().collect::>(); let new_voter_ids = config.as_joint().ids().collect::>(); diff --git a/openraft/src/membership/membership_state.rs b/openraft/src/membership/membership_state.rs index 862d6f5a2..2f92c4106 100644 --- a/openraft/src/membership/membership_state.rs +++ b/openraft/src/membership/membership_state.rs @@ -1,12 +1,18 @@ use std::error::Error; use std::sync::Arc; +use crate::error::ChangeMembershipError; +use crate::error::EmptyMembership; +use crate::error::InProgress; +use crate::error::LearnerNotFound; use crate::less_equal; use crate::node::Node; use crate::validate::Validate; +use crate::ChangeMembers; use crate::EffectiveMembership; use crate::LogId; use crate::LogIdOptionExt; +use crate::Membership; use crate::MessageSummary; use crate::NodeId; @@ -75,7 +81,47 @@ where self.effective.membership.is_voter(id) } - // --- + /// Build a new membership config by applying changes to the current config. + /// + /// The removed voter is left in membership config as learner if `removed_to_learner` is true. + pub(crate) fn next_membership( + &self, + changes: ChangeMembers, + removed_to_learner: bool, + ) -> Result, ChangeMembershipError> { + let effective = self.effective(); + let committed = self.committed(); + + let last = effective.membership.get_joint_config().last().unwrap(); + let new_voter_ids = changes.apply_to(last); + + // Ensure cluster will have at least one voter. + if new_voter_ids.is_empty() { + return Err(EmptyMembership {}.into()); + } + + // There has to be corresponding `Node` for every voter_id + for node_id in new_voter_ids.iter() { + if !effective.contains(node_id) { + return Err(LearnerNotFound { node_id: *node_id }.into()); + } + } + + if committed.log_id == effective.log_id { + // Ok: last membership(effective) is committed + } else { + return Err(InProgress { + committed: committed.log_id, + membership_log_id: effective.log_id, + } + .into()); + } + + let new_membership = effective.membership.next_safe(new_voter_ids, removed_to_learner); + + tracing::debug!(?new_membership, "new membership config"); + Ok(new_membership) + } /// Update membership state if the specified committed_log_id is greater than `self.effective` pub(crate) fn commit(&mut self, committed_log_id: &Option>) { diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index cd884c781..4437cb5a0 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -21,7 +21,6 @@ use tracing::Level; use crate::config::Config; use crate::config::RuntimeConfig; use crate::core::replication_lag; -use crate::core::Expectation; use crate::core::RaftCore; use crate::core::SnapshotResult; use crate::core::SnapshotState; @@ -609,28 +608,16 @@ impl, S: RaftStorage> Raft>, - allow_lagging: bool, turn_to_learner: bool, ) -> Result, RaftError>> { let changes: ChangeMembers = members.into(); tracing::info!( changes = debug(&changes), - allow_lagging = display(allow_lagging), turn_to_learner = display(turn_to_learner), "change_membership: start to commit joint config" ); - let when = if allow_lagging { - None - } else { - match &changes { - // Removing voters will never be blocked by replication. - ChangeMembers::Remove(_) => None, - _ => Some(Expectation::AtLineRate), - } - }; - let (tx, rx) = oneshot::channel(); // res is error if membership can not be changed. // If no error, it will enter a joint state @@ -638,7 +625,6 @@ impl, S: RaftStorage> Raft, S: RaftStorage> Raft, S: RaftStor ChangeMembership { changes: ChangeMembers, - /// Defines what conditions the replication states has to satisfy before change membership. - /// If expectation is not satisfied, a corresponding error will return. - when: Option, - /// If `turn_to_learner` is `true`, then all the members which do not exist in the new /// membership will be turned into learners, otherwise they will be removed. turn_to_learner: bool, @@ -1010,13 +991,12 @@ where } RaftMsg::ChangeMembership { changes: members, - when, turn_to_learner, .. } => { format!( - "ChangeMembership: members: {:?}, when: {:?}, turn_to_learner: {}", - members, when, turn_to_learner, + "ChangeMembership: members: {:?}, turn_to_learner: {}", + members, turn_to_learner, ) } RaftMsg::ExternalRequest { .. } => "External Request".to_string(), diff --git a/openraft/src/raft_state.rs b/openraft/src/raft_state.rs index e8bb4ab24..a797a5aaa 100644 --- a/openraft/src/raft_state.rs +++ b/openraft/src/raft_state.rs @@ -210,11 +210,6 @@ where self.log_ids.extend(new_log_id) } - /// Return true if the currently effective membership is committed. - pub(crate) fn is_membership_committed(&self) -> bool { - self.committed() >= self.membership_state.effective().log_id.as_ref() - } - /// Update field `committed` if the input is greater. /// If updated, it returns the previous value in a `Some()`. #[tracing::instrument(level = "debug", skip_all)] diff --git a/openraft/src/raft_state_test.rs b/openraft/src/raft_state_test.rs index 618cbaa71..4c695484b 100644 --- a/openraft/src/raft_state_test.rs +++ b/openraft/src/raft_state_test.rs @@ -161,50 +161,6 @@ fn test_raft_state_last_purged_log_id() -> anyhow::Result<()> { Ok(()) } -#[test] -fn test_raft_state_is_membership_committed() -> anyhow::Result<()> { - // - let rs = RaftState { - committed: None, - membership_state: MembershipState::new( - Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())), - Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())), - ), - ..Default::default() - }; - - assert!( - !rs.is_membership_committed(), - "committed == effective, but not consider this" - ); - - let rs = RaftState { - committed: Some(log_id(2, 2)), - membership_state: MembershipState::new( - Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())), - Arc::new(EffectiveMembership::new(Some(log_id(2, 2)), m12())), - ), - ..Default::default() - }; - - assert!( - rs.is_membership_committed(), - "committed != effective, but rs.committed == effective.log_id" - ); - - let rs = RaftState { - committed: Some(log_id(2, 2)), - membership_state: MembershipState::new( - Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m12())), - Arc::new(EffectiveMembership::new(Some(log_id(3, 3)), m12())), - ), - ..Default::default() - }; - - assert!(!rs.is_membership_committed(), "rs.committed < effective.log_id"); - Ok(()) -} - #[test] fn test_forward_to_leader_vote_not_committed() { let rs = RaftState { diff --git a/openraft/tests/client_api/t50_lagging_network_write.rs b/openraft/tests/client_api/t50_lagging_network_write.rs index 964733a6d..502a18a15 100644 --- a/openraft/tests/client_api/t50_lagging_network_write.rs +++ b/openraft/tests/client_api/t50_lagging_network_write.rs @@ -64,7 +64,7 @@ async fn lagging_network_write() -> Result<()> { router.wait_for_log(&btreeset![0, 1, 2], Some(log_index), timeout(), "write one log").await?; let node = router.get_raft_handle(&0)?; - node.change_membership(btreeset![0, 1, 2], true, false).await?; + node.change_membership(btreeset![0, 1, 2], false).await?; log_index += 2; router.wait_for_state(&btreeset![0], ServerState::Leader, None, "changed").await?; router.wait_for_state(&btreeset![1, 2], ServerState::Follower, None, "changed").await?; diff --git a/openraft/tests/fixtures/mod.rs b/openraft/tests/fixtures/mod.rs index 48bf95e02..15ec3fedc 100644 --- a/openraft/tests/fixtures/mod.rs +++ b/openraft/tests/fixtures/mod.rs @@ -293,7 +293,7 @@ where tracing::info!("--- change membership to setup voters: {:?}", node_ids); let node = self.get_raft_handle(&C::NodeId::default())?; - node.change_membership(node_ids.clone(), true, false).await?; + node.change_membership(node_ids.clone(), false).await?; log_index += 2; self.wait_for_log( diff --git a/openraft/tests/membership/t10_add_learner.rs b/openraft/tests/membership/t10_add_learner.rs index 1931e87b0..2baff7e42 100644 --- a/openraft/tests/membership/t10_add_learner.rs +++ b/openraft/tests/membership/t10_add_learner.rs @@ -169,7 +169,7 @@ async fn check_learner_after_leader_transferred() -> Result<()> { router.wait_for_log(&btreeset![0, 1], Some(log_index), timeout(), "add learner").await?; let node = router.get_raft_handle(&orig_leader_id)?; - node.change_membership(btreeset![1, 3, 4], true, false).await?; + node.change_membership(btreeset![1, 3, 4], false).await?; log_index += 2; // 2 change_membership log tracing::info!("--- old leader commits 2 membership log"); diff --git a/openraft/tests/membership/t12_concurrent_write_and_add_learner.rs b/openraft/tests/membership/t12_concurrent_write_and_add_learner.rs index e28e5bda1..c8a5bdd50 100644 --- a/openraft/tests/membership/t12_concurrent_write_and_add_learner.rs +++ b/openraft/tests/membership/t12_concurrent_write_and_add_learner.rs @@ -73,7 +73,7 @@ async fn concurrent_write_and_add_learner() -> Result<()> { tracing::info!("--- changing cluster config"); let node = router.get_raft_handle(&0)?; - node.change_membership(candidates.clone(), true, false).await?; + node.change_membership(candidates.clone(), false).await?; log_index += 2; // Tow member change logs wait_log(&router, &candidates, log_index).await?; diff --git a/openraft/tests/membership/t15_add_remove_follower.rs b/openraft/tests/membership/t15_add_remove_follower.rs index 81018b573..5f006b2ed 100644 --- a/openraft/tests/membership/t15_add_remove_follower.rs +++ b/openraft/tests/membership/t15_add_remove_follower.rs @@ -43,7 +43,7 @@ async fn add_remove_voter() -> Result<()> { tracing::info!("--- remove n{}", 4); { let node = router.get_raft_handle(&0)?; - node.change_membership(c0123.clone(), true, false).await?; + node.change_membership(c0123.clone(), false).await?; log_index += 2; // two member-change logs router.wait_for_log(&c0123, Some(log_index), timeout(), "removed node-4 from membership").await?; diff --git a/openraft/tests/membership/t16_change_membership_cases.rs b/openraft/tests/membership/t16_change_membership_cases.rs index 9f96a5759..4f0493c9e 100644 --- a/openraft/tests/membership/t16_change_membership_cases.rs +++ b/openraft/tests/membership/t16_change_membership_cases.rs @@ -138,7 +138,7 @@ async fn change_from_to(old: BTreeSet, change_members: ChangeMembers< } let node = router.get_raft_handle(&0)?; - node.change_membership(new.clone(), true, false).await?; + node.change_membership(new.clone(), false).await?; log_index += 1; if new != old { // Two member-change logs. @@ -285,7 +285,7 @@ async fn change_by_add(old: BTreeSet, add: &[MemNodeId]) -> anyhow::R tracing::info!("--- change: {:?}", change); { let node = router.get_raft_handle(&0)?; - node.change_membership(change, true, false).await?; + node.change_membership(change, false).await?; log_index += 1; if new != old { log_index += 1; // two member-change logs @@ -344,7 +344,7 @@ async fn change_by_remove(old: BTreeSet, remove: &[MemNodeId]) -> any tracing::info!("--- change {:?}", &change); { let node = router.get_raft_handle(&0)?; - node.change_membership(change.clone(), true, false).await?; + node.change_membership(change.clone(), false).await?; log_index += 1; if new != old { // Two member-change logs diff --git a/openraft/tests/membership/t20_change_membership.rs b/openraft/tests/membership/t20_change_membership.rs index c5a19fa5f..d322c352c 100644 --- a/openraft/tests/membership/t20_change_membership.rs +++ b/openraft/tests/membership/t20_change_membership.rs @@ -1,9 +1,7 @@ -use std::convert::TryInto; use std::sync::Arc; use std::time::Duration; use maplit::btreeset; -use memstore::MemNodeId; use openraft::error::ChangeMembershipError; use openraft::error::ClientWriteError; use openraft::Config; @@ -31,7 +29,7 @@ async fn update_membership_state() -> anyhow::Result<()> { tracing::info!("--- change membership from 012 to 01234"); { let leader = router.get_raft_handle(&0)?; - let res = leader.change_membership(btreeset! {0,1,2,3,4}, true, false).await?; + let res = leader.change_membership(btreeset! {0,1,2,3,4}, false).await?; log_index += 2; tracing::info!("--- change_membership blocks until success: {:?}", res); @@ -83,7 +81,7 @@ async fn change_with_new_learner_blocking() -> anyhow::Result<()> { router.wait_for_log(&btreeset![0], Some(log_index), timeout(), "add learner").await?; let node = router.get_raft_handle(&0)?; - let res = node.change_membership(btreeset! {0,1}, true, false).await?; + let res = node.change_membership(btreeset! {0,1}, false).await?; log_index += 2; tracing::info!("--- change_membership blocks until success: {:?}", res); @@ -112,7 +110,7 @@ async fn change_without_adding_learner() -> anyhow::Result<()> { tracing::info!("--- change membership without adding-learner, allow_lagging=true"); { - let res = leader.change_membership(btreeset! {0,1}, true, false).await; + let res = leader.change_membership(btreeset! {0,1}, false).await; let raft_err = res.unwrap_err(); match raft_err.api_error().unwrap() { ClientWriteError::ChangeMembershipError(ChangeMembershipError::LearnerNotFound(err)) => { @@ -124,9 +122,9 @@ async fn change_without_adding_learner() -> anyhow::Result<()> { } } - tracing::info!("--- change membership without adding-learner, allow_lagging=false"); + tracing::info!("--- change membership without adding-learner"); { - let res = leader.change_membership(btreeset! {0,1}, false, false).await; + let res = leader.change_membership(btreeset! {0,1}, false).await; let raft_err = res.unwrap_err(); match raft_err.api_error().unwrap() { ClientWriteError::ChangeMembershipError(ChangeMembershipError::LearnerNotFound(err)) => { @@ -141,63 +139,6 @@ async fn change_without_adding_learner() -> anyhow::Result<()> { Ok(()) } -#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] -async fn change_with_lagging_learner_non_blocking() -> anyhow::Result<()> { - // Add a learner into membership config, expect error NonVoterIsLagging. - - let lag_threshold = 1; - - let config = Arc::new( - Config { - replication_lag_threshold: lag_threshold, - enable_heartbeat: false, - ..Default::default() - } - .validate()?, - ); - let mut router = RaftRouter::new(config.clone()); - - let mut log_index = router.new_nodes_from_single(btreeset! {0}, btreeset! {1}).await?; - - tracing::info!("--- stop replication by isolating node 1"); - { - router.isolate_node(1); - } - - tracing::info!("--- write up to 500 logs"); - { - router.client_request_many(0, "non_voter_add", 500 - log_index as usize).await?; - log_index = 500; - - router.wait(&0, timeout()).log(Some(log_index), "received 500 logs").await?; - } - - tracing::info!("--- changing membership expects LearnerIsLagging"); - { - let node = router.get_raft_handle(&0)?; - let res = node.change_membership(btreeset! {0,1}, false, false).await; - - tracing::info!("--- got res: {:?}", res); - - let err = res.unwrap_err(); - let err: ChangeMembershipError = err.into_api_error().unwrap().try_into().unwrap(); - - match err { - ChangeMembershipError::LearnerIsLagging(e) => { - tracing::info!(e.distance, "--- distance"); - assert_eq!(1, e.node_id); - assert!(e.distance >= lag_threshold); - assert!(e.distance < 500); - } - _ => { - panic!("expect ChangeMembershipError::NonVoterNotFound"); - } - } - } - - Ok(()) -} - #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] async fn change_with_turn_removed_voter_to_learner() -> anyhow::Result<()> { // Add a member without adding it as learner, in blocking mode it should finish successfully. @@ -224,7 +165,7 @@ async fn change_with_turn_removed_voter_to_learner() -> anyhow::Result<()> { { let node = router.get_raft_handle(&0)?; - node.change_membership(btreeset![0, 1], true, true).await?; + node.change_membership(btreeset![0, 1], true).await?; // 2 for change_membership log_index += 2; diff --git a/openraft/tests/membership/t30_commit_joint_config.rs b/openraft/tests/membership/t30_commit_joint_config.rs index 5f28e48a2..670cf1fcd 100644 --- a/openraft/tests/membership/t30_commit_joint_config.rs +++ b/openraft/tests/membership/t30_commit_joint_config.rs @@ -66,7 +66,7 @@ async fn commit_joint_config_during_0_to_012() -> Result<()> { let router = router.clone(); async move { let node = router.get_raft_handle(&0).unwrap(); - let _x = node.change_membership(btreeset! {0,1,2}, true, false).await; + let _x = node.change_membership(btreeset! {0,1,2}, false).await; } .instrument(tracing::debug_span!("spawn-change-membership")) }); @@ -113,7 +113,7 @@ async fn commit_joint_config_during_012_to_234() -> Result<()> { tracing::info!("--- changing config to 0,1,2"); let node = router.get_raft_handle(&0)?; - node.change_membership(btreeset![0, 1, 2], true, false).await?; + node.change_membership(btreeset![0, 1, 2], false).await?; log_index += 2; router.wait_for_log(&btreeset![0, 1, 2], Some(log_index), None, "cluster of 0,1,2").await?; @@ -125,7 +125,7 @@ async fn commit_joint_config_during_012_to_234() -> Result<()> { tokio::spawn( async move { let node = router.get_raft_handle(&0)?; - node.change_membership(btreeset![2, 3, 4], true, false).await?; + node.change_membership(btreeset![2, 3, 4], false).await?; Ok::<(), anyhow::Error>(()) } .instrument(tracing::debug_span!("spawn-change-membership")), diff --git a/openraft/tests/membership/t30_remove_leader.rs b/openraft/tests/membership/t30_remove_leader.rs index 76e7e3ee0..3db207196 100644 --- a/openraft/tests/membership/t30_remove_leader.rs +++ b/openraft/tests/membership/t30_remove_leader.rs @@ -42,7 +42,7 @@ async fn remove_leader() -> Result<()> { router.wait_for_log(&btreeset![0, 1], Some(log_index), timeout(), "add learner").await?; let node = router.get_raft_handle(&orig_leader)?; - node.change_membership(btreeset![1, 2, 3], true, false).await?; + node.change_membership(btreeset![1, 2, 3], false).await?; // 2 for change_membership log_index += 2; @@ -133,7 +133,7 @@ async fn remove_leader_access_new_cluster() -> Result<()> { tracing::info!("--- change membership 012 to 2"); { let node = router.get_raft_handle(&orig_leader)?; - node.change_membership(btreeset![2], true, false).await?; + node.change_membership(btreeset![2], false).await?; // 2 change_membership logs log_index += 2; diff --git a/openraft/tests/membership/t40_removed_follower.rs b/openraft/tests/membership/t40_removed_follower.rs index 3801b2af7..4444dfa0d 100644 --- a/openraft/tests/membership/t40_removed_follower.rs +++ b/openraft/tests/membership/t40_removed_follower.rs @@ -37,7 +37,7 @@ async fn stop_replication_to_removed_follower() -> Result<()> { tracing::info!("--- changing config to 0,3,4"); { let node = router.get_raft_handle(&0)?; - node.change_membership(btreeset![0, 3, 4], true, false).await?; + node.change_membership(btreeset![0, 3, 4], false).await?; log_index += 2; for i in [0, 3, 4] { diff --git a/openraft/tests/membership/t45_remove_unreachable_follower.rs b/openraft/tests/membership/t45_remove_unreachable_follower.rs index 0e2fbd9cc..0e7fff4c5 100644 --- a/openraft/tests/membership/t45_remove_unreachable_follower.rs +++ b/openraft/tests/membership/t45_remove_unreachable_follower.rs @@ -36,7 +36,7 @@ async fn stop_replication_to_removed_unreachable_follower_network_failure() -> R tracing::info!("--- changing config to 0,1,2"); { let node = router.get_raft_handle(&0)?; - node.change_membership(btreeset![0, 1, 2], true, false).await?; + node.change_membership(btreeset![0, 1, 2], false).await?; log_index += 2; for i in &[0, 1, 2] { diff --git a/openraft/tests/membership/t99_issue_584_replication_state_reverted.rs b/openraft/tests/membership/t99_issue_584_replication_state_reverted.rs index ee0b3aea3..70e15abe3 100644 --- a/openraft/tests/membership/t99_issue_584_replication_state_reverted.rs +++ b/openraft/tests/membership/t99_issue_584_replication_state_reverted.rs @@ -40,7 +40,7 @@ async fn t99_issue_584_replication_state_reverted() -> Result<()> { tracing::info!("--- change-membership: make learner node-1 a voter. This should not panic"); { let leader = router.get_raft_handle(&0)?; - leader.change_membership(btreeset![0, 1], true, false).await?; + leader.change_membership(btreeset![0, 1], false).await?; log_index += 2; // 2 change_membership log let _ = log_index; diff --git a/openraft/tests/metrics/t30_leader_metrics.rs b/openraft/tests/metrics/t30_leader_metrics.rs index e0cb90aed..ae265654a 100644 --- a/openraft/tests/metrics/t30_leader_metrics.rs +++ b/openraft/tests/metrics/t30_leader_metrics.rs @@ -98,7 +98,7 @@ async fn leader_metrics() -> Result<()> { tracing::info!("--- changing cluster config to 01234"); let node = router.get_raft_handle(&0)?; - node.change_membership(c01234.clone(), true, false).await?; + node.change_membership(c01234.clone(), false).await?; log_index += 2; // 2 member-change logs router.wait_for_log(&c01234, Some(log_index), timeout(), "change members to 0,1,2,3,4").await?; @@ -129,7 +129,7 @@ async fn leader_metrics() -> Result<()> { tracing::info!("--- remove n{}", 4); { let node = router.get_raft_handle(&0)?; - node.change_membership(c0123.clone(), true, false).await?; + node.change_membership(c0123.clone(), false).await?; log_index += 2; // two member-change logs router diff --git a/openraft/tests/state_machine/t20_state_machine_apply_membership.rs b/openraft/tests/state_machine/t20_state_machine_apply_membership.rs index f5caa9009..a77cbe86f 100644 --- a/openraft/tests/state_machine/t20_state_machine_apply_membership.rs +++ b/openraft/tests/state_machine/t20_state_machine_apply_membership.rs @@ -83,7 +83,7 @@ async fn state_machine_apply_membership() -> Result<()> { tracing::info!("--- changing cluster config"); let node = router.get_raft_handle(&0)?; - node.change_membership(btreeset![0, 1, 2], true, false).await?; + node.change_membership(btreeset![0, 1, 2], false).await?; log_index += 2;