From c6fe29d4a53b47f6c43d83a24e1610788a4c0166 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sat, 22 Oct 2022 18:12:49 +0800 Subject: [PATCH] Change: change-membership does not return error when replication lags If `blocking` is `true`, `Raft::change_membership(..., blocking)` will block until repliication to new nodes become upto date. But it won't return an error when proposing change-membership log. - Change: remove two errors: `LearnerIsLagging` and `LearnerNotFound`. - Fix: #581 --- openraft/src/core/admin.rs | 45 ------------------- openraft/src/core/mod.rs | 4 +- openraft/src/error.rs | 20 --------- openraft/src/raft.rs | 32 +++++++------ .../tests/membership/t20_change_membership.rs | 43 ++++++++++-------- 5 files changed, 42 insertions(+), 102 deletions(-) diff --git a/openraft/src/core/admin.rs b/openraft/src/core/admin.rs index c34e71ad5..a164a59cf 100644 --- a/openraft/src/core/admin.rs +++ b/openraft/src/core/admin.rs @@ -11,8 +11,6 @@ use crate::error::ClientWriteError; use crate::error::EmptyMembership; use crate::error::InProgress; use crate::error::InitializeError; -use crate::error::LearnerIsLagging; -use crate::error::LearnerNotFound; use crate::error::RemoveLearnerError; use crate::raft::AddLearnerResponse; use crate::raft::ClientWriteResponse; @@ -144,7 +142,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage pub(super) async fn change_membership( &mut self, members: BTreeSet, - blocking: bool, tx: RaftRespTx, ClientWriteError>, ) -> Result<(), StorageError> { tracing::info!("change_membership: members: {:?}", members); @@ -174,48 +171,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage tracing::info!("change_membership: new_config: {:?}", new_config); - // Check the proposed config for any new nodes. If ALL new nodes already have replication - // streams AND are ready to join, then we can immediately proceed with entering joint - // consensus. Else, new nodes need to first be brought up-to-speed. - // - // Here, all we do is check to see which nodes still need to be synced, which determines - // if we can proceed. - - // TODO(xp): test change membership without adding as learner. - - // TODO(xp): 111 test adding a node that is not learner. - // TODO(xp): 111 test adding a node that is lagging. - for new_node in members.difference(curr.all_nodes()) { - match self.nodes.get(new_node) { - Some(node) => { - if node.is_line_rate(&self.core.last_log_id, &self.core.config) { - // Node is ready to join. - continue; - } - - if !blocking { - // Node has repl stream, but is not yet ready to join. - let _ = tx.send(Err(ClientWriteError::ChangeMembershipError( - ChangeMembershipError::LearnerIsLagging(LearnerIsLagging { - node_id: *new_node, - matched: node.matched, - distance: self.core.last_log_id.next_index().saturating_sub(node.matched.next_index()), - }), - ))); - return Ok(()); - } - } - - // Node does not yet have a repl stream, spawn one. - None => { - let _ = tx.send(Err(ClientWriteError::ChangeMembershipError( - ChangeMembershipError::LearnerNotFound(LearnerNotFound { node_id: *new_node }), - ))); - return Ok(()); - } - } - } - self.append_membership_log(new_config, Some(tx)).await?; Ok(()) } diff --git a/openraft/src/core/mod.rs b/openraft/src/core/mod.rs index 3d02afaa7..65699a6da 100644 --- a/openraft/src/core/mod.rs +++ b/openraft/src/core/mod.rs @@ -809,8 +809,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork, S: RaftStorage RaftMsg::RemoveLearner { id, tx } => { self.remove_learner(id, tx); } - RaftMsg::ChangeMembership { members, blocking, tx } => { - self.change_membership(members, blocking, tx).await?; + RaftMsg::ChangeMembership { members, tx } => { + self.change_membership(members, tx).await?; } RaftMsg::ForceSnapshotting { tx } => { for node in self.nodes.values() { diff --git a/openraft/src/error.rs b/openraft/src/error.rs index cf4180015..0e812fffb 100644 --- a/openraft/src/error.rs +++ b/openraft/src/error.rs @@ -102,12 +102,6 @@ pub enum ChangeMembershipError { #[error(transparent)] EmptyMembership(#[from] EmptyMembership), - - #[error(transparent)] - LearnerNotFound(#[from] LearnerNotFound), - - #[error(transparent)] - LearnerIsLagging(#[from] LearnerIsLagging), } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)] @@ -261,20 +255,6 @@ pub struct InProgress { pub membership_log_id: LogId, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)] -#[error("to add a member {node_id} first need to add it as learner")] -pub struct LearnerNotFound { - pub node_id: NodeId, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)] -#[error("replication to learner {node_id} is lagging {distance}, matched: {matched:?}, can not add as member")] -pub struct LearnerIsLagging { - pub node_id: NodeId, - pub matched: Option, - pub distance: u64, -} - #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)] #[error("new membership can not be empty")] pub struct EmptyMembership {} diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index 298b7213d..5d39d2e59 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -284,14 +284,18 @@ impl, S: RaftStorage> Ra /// If a node in the proposed config but is not yet a voter or learner, it first calls `add_learner` to setup /// replication to the new node. /// - /// Internal: - /// - It proposes a **joint** config. - /// - When the **joint** config is committed, it proposes a uniform config. - /// - /// If blocking is true, it blocks until every learner becomes up to date. - /// Otherwise it returns error `ChangeMembershipError::LearnerIsLagging` if there is a lagging learner. - /// - /// If it lost leadership or crashed before committing the second **uniform** config log, the cluster is left in the + /// Internally it commits two logs to get this task done: + /// - Proposes a **joint** config, e.g. `[{1,2,3}, {4,5,6}]` if trying to change config from `{1,2,3}` to `{4,5,6}`. + /// - When the **joint** config is committed, then proposes a **uniform** config, e.g. `{4,5,6}`. + /// + /// If `blocking` is true, it does not propose the membership log until replication to every learner becomes up to + /// date. + /// Otherwise it just proposes membership log at once. In this case, the change-membership log may be + /// committed very quickly: if the **old** cluster constitutes a quorum. + /// E.g., when changing from `[1,2,3,4]` to `[1,2,3,4,5]`, `[1,2,3]` is a quorum in both the old and the new + /// cluster. Then it will return before the replication to node-5 becomes up to date. + /// + /// If this node crashes before committing the second **uniform** config log, the cluster may be left in the /// **joint** config. #[tracing::instrument(level = "info", skip_all)] pub async fn change_membership( @@ -330,7 +334,6 @@ impl, S: RaftStorage> Ra .call_core( RaftMsg::ChangeMembership { members: members.clone(), - blocking, tx, }, rx, @@ -350,7 +353,7 @@ impl, S: RaftStorage> Ra tracing::info!("the second step is to change to uniform config: {:?}", members); let (tx, rx) = oneshot::channel(); - let res = self.call_core(RaftMsg::ChangeMembership { members, blocking, tx }, rx).await?; + let res = self.call_core(RaftMsg::ChangeMembership { members, tx }, rx).await?; tracing::info!("res of second change_membership: {}", res.summary()); @@ -500,11 +503,6 @@ pub(crate) enum RaftMsg { }, ChangeMembership { members: BTreeSet, - /// with blocking==false, respond to client a ChangeMembershipError::LearnerIsLagging error at once if a - /// non-member is lagging. - /// - /// Otherwise, wait for commit of the member change log. - blocking: bool, tx: RaftRespTx, ClientWriteError>, }, /// Force to switch to snapshot replication to every target. @@ -542,8 +540,8 @@ where RaftMsg::RemoveLearner { id, .. } => { format!("RemoveLearner: id: {}", id) } - RaftMsg::ChangeMembership { members, blocking, .. } => { - format!("ChangeMembership: members: {:?}, blocking: {}", members, blocking) + RaftMsg::ChangeMembership { members, .. } => { + format!("ChangeMembership: members: {:?}", members) } RaftMsg::ForceSnapshotting { .. } => "ForceSnapshotting".to_string(), } diff --git a/openraft/tests/membership/t20_change_membership.rs b/openraft/tests/membership/t20_change_membership.rs index 05469dadb..9eebcc79a 100644 --- a/openraft/tests/membership/t20_change_membership.rs +++ b/openraft/tests/membership/t20_change_membership.rs @@ -1,9 +1,7 @@ -use std::convert::TryInto; use std::sync::Arc; use std::time::Duration; use maplit::btreeset; -use openraft::error::ChangeMembershipError; use openraft::Config; use openraft::StorageHelper; @@ -59,7 +57,7 @@ async fn change_with_new_learner_blocking() -> anyhow::Result<()> { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn change_with_lagging_learner_non_blocking() -> anyhow::Result<()> { - // Add a learner into membership config, expect error NonVoterIsLagging. + // Add a learner into membership config. let (_log_guard, ut_span) = init_ut!(); let _ent = ut_span.enter(); @@ -90,27 +88,36 @@ async fn change_with_lagging_learner_non_blocking() -> anyhow::Result<()> { router.wait(&0, timeout()).await?.log(Some(log_index), "received 500 logs").await?; } - tracing::info!("--- restore replication and change membership at once, expect NonVoterIsLagging"); + tracing::info!( + "--- restore replication and change membership at once, it still blocks until logs are replicated to node-1" + ); { router.restore_node(1).await; let res = router.change_membership_with_blocking(0, btreeset! {0,1}, false).await; + log_index += 2; tracing::info!("--- got res: {:?}", res); + assert!(res.is_ok()); + router.wait(&1, timeout()).await?.log(Some(log_index), "received 500+2 logs").await?; + } - let err = res.unwrap_err(); - let err: ChangeMembershipError = err.try_into().unwrap(); - - match err { - ChangeMembershipError::LearnerIsLagging(e) => { - tracing::info!(e.distance, "--- distance"); - assert_eq!(1, e.node_id); - assert!(e.distance >= lag_threshold); - assert!(e.distance < 500); - } - _ => { - panic!("expect ChangeMembershipError::NonVoterNotFound"); - } - } + tracing::info!("--- add node-3, with blocking=false, won't block, because [0,1] is a quorum"); + { + router.new_raft_node(2).await; + + router.change_membership_with_blocking(0, btreeset! {0,1,2}, false).await?; + log_index += 2; + let m = router.get_metrics(&2).await?; + assert!(m.last_log_index < Some(log_index)); + } + + tracing::info!("--- make sure replication to node-3 works as expected"); + { + router + .wait(&2, timeout()) + .await? + .log(Some(log_index), "received all logs, replication works") + .await?; } Ok(())