From ab3aa16b6b717e4421dc8d27e86d38b3fc5fede6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Mon, 14 Oct 2024 16:03:53 +0800 Subject: [PATCH] Feature: `Raft::trigger()::allow_next_revert()` allow to reset replication for next detected follower log revert This method requests the RaftCore to allow to reset replication for a specific node when log revert is detected. - `allow=true`: This method instructs the RaftCore to allow the target node's log to revert to a previous state for one time. - `allow=false`: This method instructs the RaftCore to panic if the target node's log revert ### Behavior - If this node is the Leader, it will attempt to replicate logs to the target node from the beginning. - If this node is not the Leader, the request is ignored. - If the target node is not found, the request is ignored. ### Automatic Replication Reset When the `loosen-follower-log-revert` feature flag is enabled, the Leader automatically reset replication if it detects that the target node's log has reverted. This feature is primarily useful in testing environments. ### Production Considerations In production environments, state reversion is a critical issue that should not be automatically handled. However, there may be scenarios where a Follower's data is intentionally removed and needs to rejoin the cluster(without membership changes). In such cases, the Leader should reinitialize replication for that node with the following steps: - Shut down the target node. - call [`Self::allow_next_revert`] on the Leader. - Clear the target node's data directory. - Restart the target node. - Fix: #1251 --- openraft/src/core/raft_core.rs | 7 ++ .../src/core/raft_msg/external_command.rs | 11 +++ .../engine/handler/replication_handler/mod.rs | 25 +++++++ openraft/src/engine/tests/startup_test.rs | 6 +- openraft/src/progress/entry/mod.rs | 24 ++++++- openraft/src/raft/mod.rs | 3 +- openraft/src/raft/trigger.rs | 37 ++++++++++ openraft/src/replication/mod.rs | 27 -------- tests/tests/replication/main.rs | 1 + .../t61_allow_follower_log_revert.rs | 68 +++++++++++++++++++ 10 files changed, 176 insertions(+), 33 deletions(-) create mode 100644 tests/tests/replication/t61_allow_follower_log_revert.rs diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 4f1246e98..f314d6e92 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -1312,6 +1312,13 @@ where ExternalCommand::TriggerTransferLeader { to } => { self.engine.trigger_transfer_leader(to); } + ExternalCommand::AllowNextRevert { to, allow } => { + if let Ok(mut l) = self.engine.leader_handler() { + l.replication_handler().allow_next_revert(to, allow); + } else { + tracing::warn!("AllowNextRevert: current node is not a Leader"); + } + } ExternalCommand::StateMachineCommand { sm_cmd } => { let res = self.sm_handle.send(sm_cmd); if let Err(e) = res { diff --git a/openraft/src/core/raft_msg/external_command.rs b/openraft/src/core/raft_msg/external_command.rs index a54f99e5c..4b8b8e93f 100644 --- a/openraft/src/core/raft_msg/external_command.rs +++ b/openraft/src/core/raft_msg/external_command.rs @@ -36,6 +36,9 @@ pub(crate) enum ExternalCommand { /// Submit a command to inform RaftCore to transfer leadership to the specified node. TriggerTransferLeader { to: C::NodeId }, + /// Allow or not the next revert of the replication to the specified node. + AllowNextRevert { to: C::NodeId, allow: bool }, + /// Send a [`sm::Command`] to [`sm::worker::Worker`]. /// This command is run in the sm task. StateMachineCommand { sm_cmd: sm::Command }, @@ -72,6 +75,14 @@ where C: RaftTypeConfig ExternalCommand::TriggerTransferLeader { to } => { write!(f, "TriggerTransferLeader: to {}", to) } + ExternalCommand::AllowNextRevert { to, allow } => { + write!( + f, + "{}-on-next-log-revert: to {}", + if *allow { "AllowReset" } else { "Panic" }, + to + ) + } ExternalCommand::StateMachineCommand { sm_cmd } => { write!(f, "StateMachineCommand: {}", sm_cmd) } diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index 80f7ec41a..5e941e139 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -216,6 +216,31 @@ where C: RaftTypeConfig prog_entry.update_conflicting(conflict.index); } + /// Enable one-time replication reset for a specific node upon log reversion detection. + /// + /// This method sets a flag to allow the replication process to be reset once for the specified + /// target node when a log reversion is detected. This is typically used to handle scenarios + /// where a follower node's log has unexpectedly reverted to a previous state. + /// + /// # Behavior + /// + /// - Sets the `reset_on_reversion` flag to `true` for the specified node in the leader's + /// progress tracker. + /// - This flag will be consumed upon the next log reversion detection, allowing for a one-time + /// reset. + /// - If the node is not found in the progress tracker, this method ignore it. + pub(crate) fn allow_next_revert(&mut self, target: C::NodeId, allow: bool) { + let Some(prog_entry) = self.leader.progress.get_mut(&target) else { + tracing::warn!( + "target node {} not found in progress tracker, when {}", + target, + func_name!() + ); + return; + }; + prog_entry.reset_on_reversion = allow; + } + /// Update replication progress when a response is received. #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn update_progress(&mut self, target: C::NodeId, repl_res: Result, String>) { diff --git a/openraft/src/engine/tests/startup_test.rs b/openraft/src/engine/tests/startup_test.rs index cb4d66812..7b60a93b5 100644 --- a/openraft/src/engine/tests/startup_test.rs +++ b/openraft/src/engine/tests/startup_test.rs @@ -77,7 +77,8 @@ fn test_startup_as_leader_without_logs() -> anyhow::Result<()> { targets: vec![ReplicationProgress(3, ProgressEntry { matching: None, inflight: Inflight::None, - searching_end: 4 + searching_end: 4, + reset_on_reversion: false, })] }, Command::AppendInputEntries { @@ -128,7 +129,8 @@ fn test_startup_as_leader_with_proposed_logs() -> anyhow::Result<()> { targets: vec![ReplicationProgress(3, ProgressEntry { matching: None, inflight: Inflight::None, - searching_end: 7 + searching_end: 7, + reset_on_reversion: false, })] }, Command::Replicate { diff --git a/openraft/src/progress/entry/mod.rs b/openraft/src/progress/entry/mod.rs index de91279f7..323893c01 100644 --- a/openraft/src/progress/entry/mod.rs +++ b/openraft/src/progress/entry/mod.rs @@ -29,6 +29,15 @@ where C: RaftTypeConfig /// One plus the max log index on the following node that might match the leader log. pub(crate) searching_end: u64, + + /// If true, reset the progress, by setting [`Self::matching`] to `None`, when the follower's + /// log is found reverted to an early state. + /// + /// This allows the target node to clean its data and wait for the leader to replicate all data + /// to it. + /// + /// This flag will be cleared after the progress entry is reset. + pub(crate) reset_on_reversion: bool, } impl ProgressEntry @@ -40,6 +49,7 @@ where C: RaftTypeConfig matching: matching.clone(), inflight: Inflight::None, searching_end: matching.next_index(), + reset_on_reversion: false, } } @@ -51,6 +61,7 @@ where C: RaftTypeConfig matching: None, inflight: Inflight::None, searching_end: end, + reset_on_reversion: false, } } @@ -117,8 +128,14 @@ where C: RaftTypeConfig // // - If log reversion is allowed, just restart the binary search from the beginning. // - Otherwise, panic it. - { - #[cfg(feature = "loosen-follower-log-revert")] + + let allow_reset = if cfg!(feature = "loosen-follower-log-revert") { + true + } else { + self.reset_on_reversion + }; + + if allow_reset { if conflict < self.matching.next_index() { tracing::warn!( "conflict {} < last matching {}: follower log is reverted; with 'loosen-follower-log-revert' enabled, this is allowed.", @@ -127,8 +144,9 @@ where C: RaftTypeConfig ); self.matching = None; + self.reset_on_reversion = false; } - + } else { debug_assert!( conflict >= self.matching.next_index(), "follower log reversion is not allowed \ diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index fab839259..99a5c3ba0 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -352,7 +352,8 @@ where C: RaftTypeConfig &self.inner.config } - /// Return a handle to manually trigger raft actions, such as elect or build snapshot. + /// Return a [`Trigger`] handle to manually trigger raft actions, such as elect or build + /// snapshot. /// /// Example: /// ```ignore diff --git a/openraft/src/raft/trigger.rs b/openraft/src/raft/trigger.rs index a2d40659d..09f498e5c 100644 --- a/openraft/src/raft/trigger.rs +++ b/openraft/src/raft/trigger.rs @@ -86,4 +86,41 @@ where C: RaftTypeConfig .send_external_command(ExternalCommand::TriggerTransferLeader { to }, "transfer_leader") .await } + + /// Request the RaftCore to allow to reset replication for a specific node when log revert is + /// detected. + /// + /// - `allow=true`: This method instructs the RaftCore to allow the target node's log to revert + /// to a previous state for one time. + /// - `allow=false`: This method instructs the RaftCore to panic if the target node's log revert + /// + /// ### Behavior + /// + /// - If this node is the Leader, it will attempt to replicate logs to the target node from the + /// beginning. + /// - If this node is not the Leader, the request is ignored. + /// - If the target node is not found, the request is ignored. + /// + /// ### Automatic Replication Reset + /// + /// When the [`loosen-follower-log-revert`](`crate::docs::feature_flags# + /// feature-flag-loosen-follower-log-revert) feature flag is enabled, the Leader automatically + /// reset replication if it detects that the target node's log has reverted. This + /// feature is primarily useful in testing environments. + /// + /// ### Production Considerations + /// + /// In production environments, state reversion is a critical issue that should not be + /// automatically handled. However, there may be scenarios where a Follower's data is + /// intentionally removed and needs to rejoin the cluster(without membership changes). In such + /// cases, the Leader should reinitialize replication for that node with the following steps: + /// - Shut down the target node. + /// - call [`Self::allow_next_revert`] on the Leader. + /// - Clear the target node's data directory. + /// - Restart the target node. + pub async fn allow_next_revert(&self, to: &C::NodeId, allow: bool) -> Result<(), Fatal> { + self.raft_inner + .send_external_command(ExternalCommand::AllowNextRevert { to: to.clone(), allow }, func_name!()) + .await + } } diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 9584e2aab..cc0c9fb0f 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -550,7 +550,6 @@ where match &replication_result.0 { Ok(matching) => { - self.validate_matching(matching); self.matching = matching.clone(); } Err(_conflict) => { @@ -569,32 +568,6 @@ where }); } - /// Validate the value for updating matching log id. - /// - /// If the matching log id is reverted to a smaller value: - /// - log a warning message if [`loosen-follower-log-revert`] feature flag is enabled; - /// - otherwise panic, consider it as a bug. - /// - /// [`loosen-follower-log-revert`]: crate::docs::feature_flags#feature_flag_loosen_follower_log_revert - fn validate_matching(&self, matching: &Option>) { - if cfg!(feature = "loosen-follower-log-revert") { - if &self.matching > matching { - tracing::warn!( - "follower log is reverted from {} to {}; with 'loosen-follower-log-revert' enabled, this is allowed", - self.matching.display(), - matching.display(), - ); - } - } else { - debug_assert!( - &self.matching <= matching, - "follower log is reverted from {} to {}", - self.matching.display(), - matching.display(), - ); - } - } - /// Drain all events in the channel in backoff mode, i.e., there was an un-retry-able error and /// should not send out anything before backoff interval expired. /// diff --git a/tests/tests/replication/main.rs b/tests/tests/replication/main.rs index 2846be37e..76bf870e3 100644 --- a/tests/tests/replication/main.rs +++ b/tests/tests/replication/main.rs @@ -10,3 +10,4 @@ mod t50_append_entries_backoff_rejoin; mod t51_append_entries_too_large; #[cfg(feature = "loosen-follower-log-revert")] mod t60_feature_loosen_follower_log_revert; +mod t61_allow_follower_log_revert; diff --git a/tests/tests/replication/t61_allow_follower_log_revert.rs b/tests/tests/replication/t61_allow_follower_log_revert.rs new file mode 100644 index 000000000..247ff0ef9 --- /dev/null +++ b/tests/tests/replication/t61_allow_follower_log_revert.rs @@ -0,0 +1,68 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use maplit::btreeset; +use openraft::Config; + +use crate::fixtures::ut_harness; +use crate::fixtures::RaftRouter; + +/// With `Trigger::allow_next_revert()` the leader allows follower to revert its log to an +/// earlier state for one time. +#[tracing::instrument] +#[test_harness::test(harness = ut_harness)] +async fn allow_follower_log_revert() -> Result<()> { + let config = Arc::new( + Config { + enable_tick: false, + enable_heartbeat: false, + // Make sure the replication is done in more than one steps + max_payload_entries: 1, + ..Default::default() + } + .validate()?, + ); + + let mut router = RaftRouter::new(config.clone()); + + tracing::info!("--- initializing cluster"); + let mut log_index = router.new_cluster(btreeset! {0}, btreeset! {1}).await?; + + tracing::info!(log_index, "--- write 10 logs"); + { + log_index += router.client_request_many(0, "0", 10).await?; + for i in [0, 1] { + router.wait(&i, timeout()).applied_index(Some(log_index), format!("{} writes", 10)).await?; + } + } + tracing::info!(log_index, "--- allow next detected log revert"); + { + let n0 = router.get_raft_handle(&0)?; + n0.trigger().allow_next_revert(&1, true).await?; + } + + tracing::info!(log_index, "--- erase Learner-1 and restart"); + { + let (_raft, _ls, _sm) = router.remove_node(1).unwrap(); + let (log, sm) = openraft_memstore::new_mem_store(); + + router.new_raft_node_with_sto(1, log, sm).await; + router.add_learner(0, 1).await?; + log_index += 1; // add learner + } + + tracing::info!(log_index, "--- write another 10 logs, leader should not panic"); + { + log_index += router.client_request_many(0, "0", 10).await?; + for i in [0, 1] { + router.wait(&i, timeout()).applied_index(Some(log_index), format!("{} writes", 10)).await?; + } + } + + Ok(()) +} + +fn timeout() -> Option { + Some(Duration::from_millis(1_000)) +}