diff --git a/guide/src/SUMMARY.md b/guide/src/SUMMARY.md index 466c0d756..d4d8fda7b 100644 --- a/guide/src/SUMMARY.md +++ b/guide/src/SUMMARY.md @@ -19,7 +19,6 @@ - [Architecture](./architecture.md) - [Threads](./threading.md) - [Log data layout](./log-data-layout.md) - - [Heartbeat](./heartbeat.md) - [Vote](./vote.md) - [Replication](./replication.md) - [Delete-conflicting-logs](./delete_log.md) @@ -29,3 +28,7 @@ - [Upgrade Tips](./upgrade-tips.md) - [Upgrade from 0.6 to 0.7](./upgrade-v06-v07.md) - [Upgrade from 0.7 to 0.8](./upgrade-v07-v08.md) + +- [Obsolete design](./obsolete-design.md) + - [Heartbeat](./heartbeat.md) + diff --git a/guide/src/heartbeat.md b/guide/src/heartbeat.md index 71c8741e9..0215d3cad 100644 --- a/guide/src/heartbeat.md +++ b/guide/src/heartbeat.md @@ -1,3 +1,50 @@ +# Obsolete: blank log heartbeat + +https://github.com/datafuselabs/openraft/issues/698 + +This design has two problems: + +- The heartbeat that sends a blank log introduces additional I/O, as a follower has to persist every log to maintain correctness. + +- Although `(term, log_index)` serves as a pseudo time in Raft, measuring whether a node has caught up with the leader and is capable of becoming a new leader, leadership is not solely determined by this pseudo time. + Wall clock time is also taken into account. + + There may be a case where the pseudo time is not upto date but the clock time is, and the node should not become the leader. + For example, in a cluster of three nodes, if the leader (node-1) is busy sending a snapshot to node-2(it has not yet replicated the latest logs to a quorum, but node-2 received message from the leader(node-1), thus it knew there is an active leader), node-3 should not seize leadership from node-1. + This is why there needs to be two types of time, pseudo time `(term, log_index)` and wall clock time, to protect leadership. + + In the follow graph: + - node-1 is the leader, has 4 log entries, and is sending a snapshot to + node-2, + - node-2 received several chunks of snapshot, and it perceived an active + leader thus extended leader lease. + - node-3 tried to send vote request to node-2, although node-2 do not have + as many logs as node-3, it should still reject node-3's vote request + because the leader lease has not yet expired. + + In the obsolete design, extending pseudo time `(term, index)` with a + `tick`, in this case node-3 will seize the leadership from node-2. + + ```text + Ni: Node i + Ei: log entry i + + N1 E1 E2 E3 E4 + | + v + N2 snapshot + +-----------------+ + ^ | + | leader lease + | + N3 E1 E2 E3 | vote-request + ---------------+----------------------------> clock time + now + + ``` + +The original document is presented below for reference. + # Heartbeat in openraft ## Heartbeat in standard raft @@ -25,7 +72,7 @@ Because the leader must have the greatest **pseudo time**, thus by comparing the **pseudo time**, a follower automatically refuse election request from a node unreachable to the leader. And comparing the **pseudo time** is already done by `handle_vote_request()`, -there is no need to add another timer for the active leader. +there is no need to add another timer for the active leader. Thus making heartbeat request a blank log is the simplest way. diff --git a/guide/src/obsolete-design.md b/guide/src/obsolete-design.md new file mode 100644 index 000000000..75fc64804 --- /dev/null +++ b/guide/src/obsolete-design.md @@ -0,0 +1,5 @@ +# Obsolete Designs + +Several designs in Openraft have been discarded due to the problems they caused for applications. +These designs were attempts at optimization or simplification, but ultimately proved to be inappropriate. +They are included in this chapter as an archive to explain why they were discarded. \ No newline at end of file diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 47d1e0877..2711c72ba 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -431,29 +431,26 @@ impl, S: RaftStorage> RaftCore>, - emitter: impl Display, - ) -> Result<(), Fatal> { - tracing::debug!(tick = display(tick), "send_heartbeat"); + pub async fn send_heartbeat(&mut self, emitter: impl Display) -> Result> { + tracing::debug!(now = debug(&self.engine.state.now), "send_heartbeat"); - let _ = tick; - - let is_leader = self.write_entry(EntryPayload::Blank, resp_tx).await?; - if is_leader { - let log_id = self.engine.state.last_log_id(); + let mut lh = if let Some((lh, _)) = + self.engine.get_leader_handler_or_reject::<(), ClientWriteError>(None) + { + lh + } else { tracing::debug!( - log_id = display(log_id.summary()), - tick = display(tick), - "{} sent heartbeat", + now = debug(&self.engine.state.now), + "{} failed to send heartbeat", emitter ); - } else { - tracing::debug!(tick = display(tick), "{} failed to send heartbeat", emitter); - } - Ok(()) + return Ok(false); + }; + + lh.send_heartbeat(); + + tracing::debug!("{} sent heartbeat", emitter); + Ok(true) } /// Flush cached changes of metrics to notify metrics watchers with updated metrics. @@ -1033,6 +1030,16 @@ impl, S: RaftStorage> RaftCore { + // Vote request needs to check if the lease of the last leader expired. + // Thus it is time sensitive. Update the cached time for it. + let now = Instant::now(); + self.engine.state.update_now(now); + tracing::debug!( + vote_request = display(rpc.summary()), + "handle vote request: now: {:?}", + now + ); + self.handle_vote_request(rpc, tx).await?; } RaftMsg::VoteResponse { target, resp, vote } => { @@ -1081,8 +1088,7 @@ impl, S: RaftStorage> RaftCore { - // TODO: use the last tick - self.send_heartbeat(0, None, "ExternalCommand").await?; + self.send_heartbeat("ExternalCommand").await?; } ExternalCommand::Snapshot => self.trigger_snapshot_if_needed(true).await, } @@ -1091,6 +1097,7 @@ impl, S: RaftStorage> RaftCore, S: RaftStorage> RaftCore= t { if self.runtime_config.enable_heartbeat.load(Ordering::Relaxed) { - self.send_heartbeat(i, None, "tick").await?; + self.send_heartbeat("tick").await?; } // Install next heartbeat @@ -1352,15 +1359,12 @@ impl, S: RaftStorage> RaftRuntime } Command::Replicate { req, target } => { if let Some(l) = &self.leader_data { - // TODO(2): consider remove the returned error from new_client(). - // Node may not exist because `RaftNetworkFactory::new_client()` returns an - // error. let node = &l.nodes.get(&target); if let Some(node) = node { match req { Inflight::None => { - unreachable!("Inflight::None"); + let _ = node.tx_repl.send(Replicate::Heartbeat); } Inflight::Logs { id, log_id_range } => { let _ = node.tx_repl.send(Replicate::Logs { id, log_id_range }); diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 7766d2de5..ac9ae740c 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -1,8 +1,11 @@ +use std::time::Duration; + use crate::core::ServerState; use crate::engine::handler::following_handler::FollowingHandler; use crate::engine::handler::leader_handler::LeaderHandler; use crate::engine::handler::log_handler::LogHandler; use crate::engine::handler::replication_handler::ReplicationHandler; +use crate::engine::handler::replication_handler::SendNone; use crate::engine::handler::server_state_handler::ServerStateHandler; use crate::engine::handler::snapshot_handler::SnapshotHandler; use crate::engine::handler::vote_handler::VoteHandler; @@ -47,6 +50,12 @@ pub(crate) struct EngineConfig { /// The maximum number of entries per payload allowed to be transmitted during replication pub(crate) max_payload_entries: u64, + + /// The duration of an active leader's lease. + /// + /// When a follower or learner perceives an active leader, such as by receiving an AppendEntries + /// message, it should not grant another candidate to become the leader during this period. + pub(crate) leader_lease: Duration, } impl Default for EngineConfig { @@ -56,6 +65,7 @@ impl Default for EngineConfig { max_in_snapshot_log_to_keep: 1000, purge_batch_size: 256, max_payload_entries: 300, + leader_lease: Duration::from_millis(150), } } } @@ -139,7 +149,7 @@ where let mut rh = self.replication_handler(); rh.rebuild_replication_streams(); - rh.initiate_replication(); + rh.initiate_replication(SendNone::False); return; } @@ -272,6 +282,21 @@ where "Engine::handle_vote_req" ); + // Current leader lease has not yet expired, reject voting request + if self.state.now <= self.state.leader_expire_at { + tracing::debug!( + "reject: leader lease has not yet expire; now; {:?}, leader lease will expire at: {:?}: after {:?}", + self.state.now, + self.state.leader_expire_at, + self.state.leader_expire_at - self.state.now + ); + return VoteResponse { + vote: *self.state.get_vote(), + vote_granted: false, + last_log_id: self.state.last_log_id().copied(), + }; + } + // The first step is to check log. If the candidate has less log, nothing needs to be done. if req.last_log_id.as_ref() >= self.state.last_log_id() { @@ -366,13 +391,6 @@ where } else { self.output.push_command(Command::InstallElectionTimer { can_be_leader: true }); } - - debug_assert!(self.state.is_voter(&self.config.id)); - - // When vote is rejected, it does not need to leave candidate state. - // Candidate loop, follower loop and learner loop are totally the same. - // - // The only thing that needs to do is update election timer. } /// Append entries to follower/learner. @@ -486,7 +504,7 @@ where rh.rebuild_replication_streams(); rh.append_blank_log(); - rh.initiate_replication(); + rh.initiate_replication(SendNone::False); } /// Check if a raft node is in a state that allows to initialize. diff --git a/openraft/src/engine/handle_vote_req_test.rs b/openraft/src/engine/handle_vote_req_test.rs index d1daa498c..3996b0296 100644 --- a/openraft/src/engine/handle_vote_req_test.rs +++ b/openraft/src/engine/handle_vote_req_test.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Duration; use maplit::btreeset; @@ -33,6 +34,44 @@ fn eng() -> Engine { eng } +#[test] +fn test_handle_vote_req_rejected_by_leader_lease() -> anyhow::Result<()> { + let mut eng = eng(); + // Non expired leader lease + eng.state.now = eng.state.leader_expire_at - Duration::from_millis(500); + + let resp = eng.handle_vote_req(VoteRequest { + vote: Vote::new(1, 2), + last_log_id: Some(log_id(2, 3)), + }); + + assert_eq!( + VoteResponse { + vote: Vote::new(2, 1), + vote_granted: false, + last_log_id: None + }, + resp + ); + + assert_eq!(Vote::new(2, 1), *eng.state.get_vote()); + assert!(eng.internal_server_state.is_leading()); + + assert_eq!(ServerState::Candidate, eng.state.server_state); + assert_eq!( + MetricsChangeFlags { + replication: false, + local_data: false, + cluster: false, + }, + eng.output.metrics_flags + ); + + assert_eq!(0, eng.output.commands.len()); + + Ok(()) +} + #[test] fn test_handle_vote_req_reject_smaller_vote() -> anyhow::Result<()> { let mut eng = eng(); diff --git a/openraft/src/engine/handler/leader_handler.rs b/openraft/src/engine/handler/leader_handler.rs index cd43b7c2c..9a00bc740 100644 --- a/openraft/src/engine/handler/leader_handler.rs +++ b/openraft/src/engine/handler/leader_handler.rs @@ -1,5 +1,6 @@ use crate::engine::engine_impl::EngineOutput; use crate::engine::handler::replication_handler::ReplicationHandler; +use crate::engine::handler::replication_handler::SendNone; use crate::engine::Command; use crate::engine::EngineConfig; use crate::entry::RaftEntry; @@ -103,11 +104,17 @@ where }; rh.update_local_progress(last_log_id); - rh.initiate_replication(); + rh.initiate_replication(SendNone::False); self.output.push_command(Command::MoveInputCursorBy { n: l }); } + #[tracing::instrument(level = "debug", skip_all)] + pub(crate) fn send_heartbeat(&mut self) -> () { + let mut rh = self.replication_handler(); + rh.initiate_replication(SendNone::True); + } + pub(crate) fn replication_handler(&mut self) -> ReplicationHandler { ReplicationHandler { config: self.config, @@ -120,6 +127,112 @@ where #[cfg(test)] mod tests { + + mod test_leader_send_heartbeat { + use std::sync::Arc; + + use maplit::btreeset; + #[allow(unused_imports)] use pretty_assertions::assert_eq; + #[allow(unused_imports)] use pretty_assertions::assert_ne; + #[allow(unused_imports)] use pretty_assertions::assert_str_eq; + + use crate::engine::Command; + use crate::engine::Engine; + use crate::progress::Inflight; + use crate::progress::Progress; + use crate::EffectiveMembership; + use crate::Membership; + use crate::MembershipState; + use crate::Vote; + + crate::declare_raft_types!( + pub(crate) Foo: D=(), R=(), NodeId=u64, Node=() + ); + + use crate::testing::log_id; + + fn m01() -> Membership { + Membership::::new(vec![btreeset! {0,1}], None) + } + + fn m23() -> Membership { + Membership::::new(vec![btreeset! {2,3}], None) + } + + fn eng() -> Engine { + let mut eng = Engine::default(); + eng.state.enable_validate = false; // Disable validation for incomplete state + + eng.config.id = 1; + eng.state.committed = Some(log_id(0, 0)); + eng.state.vote = Vote::new_committed(3, 1); + eng.state.log_ids.append(log_id(1, 1)); + eng.state.log_ids.append(log_id(2, 3)); + eng.state.membership_state = MembershipState::new( + Arc::new(EffectiveMembership::new(Some(log_id(1, 1)), m01())), + Arc::new(EffectiveMembership::new(Some(log_id(2, 3)), m23())), + ); + eng.state.server_state = eng.calc_server_state(); + + eng + } + + #[test] + fn test_leader_send_heartbeat() -> anyhow::Result<()> { + let mut eng = eng(); + eng.vote_handler().become_leading(); + + // A heartbeat is a normal AppendEntries RPC if there are pending data to send. + { + eng.leader_handler()?.send_heartbeat(); + assert_eq!( + vec![ + Command::Replicate { + target: 2, + req: Inflight::logs(None, Some(log_id(2, 3))).with_id(1), + }, + Command::Replicate { + target: 3, + req: Inflight::logs(None, Some(log_id(2, 3))).with_id(1), + }, + ], + eng.output.commands + ); + } + + // No RPC will be sent if there are inflight RPC + { + eng.output.commands = vec![]; + eng.leader_handler()?.send_heartbeat(); + assert!(eng.output.commands.is_empty()); + } + + // No data to send, sending a heartbeat is to send empty RPC: + { + let l = eng.leader_handler()?; + let _ = l.leader.progress.update_with(&2, |ent| ent.update_matching(Some(log_id(2, 3)))); + let _ = l.leader.progress.update_with(&3, |ent| ent.update_matching(Some(log_id(2, 3)))); + } + eng.output.commands = vec![]; + eng.leader_handler()?.send_heartbeat(); + assert_eq!( + vec![ + Command::Replicate { + target: 2, + req: Inflight::logs(Some(log_id(2, 3)), Some(log_id(2, 3))).with_id(1), + }, + Command::Replicate { + target: 3, + req: Inflight::logs(Some(log_id(2, 3)), Some(log_id(2, 3))).with_id(1), + }, + ], + eng.output.commands + ); + + Ok(()) + } + } + mod test_leader_append_entries { use std::sync::Arc; diff --git a/openraft/src/engine/handler/replication_handler.rs b/openraft/src/engine/handler/replication_handler.rs index 48576b9b3..3ca5d9c5b 100644 --- a/openraft/src/engine/handler/replication_handler.rs +++ b/openraft/src/engine/handler/replication_handler.rs @@ -41,6 +41,16 @@ where pub(crate) output: &'x mut EngineOutput, } +/// An option about whether to send an RPC to follower/learner even when there is no data to send. +/// +/// Sending none data serves as a heartbeat. +#[derive(Debug)] +#[derive(PartialEq, Eq)] +pub(crate) enum SendNone { + False, + True, +} + impl<'x, NID, N> ReplicationHandler<'x, NID, N> where NID: NodeId, @@ -93,7 +103,7 @@ where self.rebuild_progresses(); self.rebuild_replication_streams(); - self.initiate_replication(); + self.initiate_replication(SendNone::False); } /// Rebuild leader's replication progress to reflect replication changes. @@ -274,8 +284,10 @@ where } /// Initiate replication for every target that is not sending data in flight. + /// + /// `send_none` specifies whether to force to send a message even when there is no data to send. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn initiate_replication(&mut self) { + pub(crate) fn initiate_replication(&mut self, send_none: SendNone) { tracing::debug!(progress = debug(&self.leader.progress), "send_to_all"); for (id, prog_entry) in self.leader.progress.iter_mut() { @@ -292,7 +304,19 @@ where Self::send_to_target(self.output, id, inflight); } Err(e) => { - tracing::debug!("no need to replicate for node-{}: current inflight: {:?}", id, e,); + tracing::debug!( + "no data to replicate for node-{}: current inflight: {:?}, send_none: {:?}", + id, + e, + send_none + ); + + #[allow(clippy::collapsible_if)] + if e == &Inflight::None { + if send_none == SendNone::True { + Self::send_to_target(self.output, id, e); + } + } } } } @@ -300,8 +324,6 @@ where #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn send_to_target(output: &mut EngineOutput, target: &NID, inflight: &Inflight) { - debug_assert!(!inflight.is_none()); - output.push_command(Command::Replicate { target: *target, req: *inflight, diff --git a/openraft/src/engine/handler/vote_handler.rs b/openraft/src/engine/handler/vote_handler.rs index b544e9ed6..4cede72ee 100644 --- a/openraft/src/engine/handler/vote_handler.rs +++ b/openraft/src/engine/handler/vote_handler.rs @@ -81,11 +81,32 @@ where self.output.push_command(Command::SaveVote { vote: *vote }); } + if vote.is_committed() { + self.extend_leader_lease(); + } + self.update_internal_server_state(); Ok(()) } + /// Extend leader lease so that in a specific duration no new election from other node will be + /// granted. + /// + /// `now` is the current time since when to extend leader lease. + pub(crate) fn extend_leader_lease(&mut self) { + tracing::debug!( + now = debug(&self.state.now), + current_leader_expire_at = debug(&self.state.leader_expire_at), + "{}", + func_name!() + ); + + // Because different nodes may have different local tick values, + // when leader switches, a follower may receive a lower tick. + self.state.leader_expire_at = self.state.now + self.config.leader_lease; + } + /// Enter leading or following state by checking `vote`. pub(crate) fn update_internal_server_state(&mut self) { if self.state.get_vote().leader_id().voted_for() == Some(self.config.id) { diff --git a/openraft/src/progress/entry.rs b/openraft/src/progress/entry.rs index bd7d1fa52..d23a973bd 100644 --- a/openraft/src/progress/entry.rs +++ b/openraft/src/progress/entry.rs @@ -25,7 +25,9 @@ pub(crate) struct ProgressEntry { pub(crate) curr_inflight_id: u64, - /// The log entries being transmitted in flight. + /// The data being transmitted in flight. + /// + /// A non-none inflight expects a response when the data was successfully sent or failed. pub(crate) inflight: Inflight, /// One plus the max log index on the following node that might match the leader log. diff --git a/openraft/src/raft.rs b/openraft/src/raft.rs index 89a125c9b..f1f2aebbb 100644 --- a/openraft/src/raft.rs +++ b/openraft/src/raft.rs @@ -234,6 +234,7 @@ impl, S: RaftStorage> Raft, S: RaftStor /// A tick event to wake up RaftCore to check timeout etc. Tick { /// ith tick - i: usize, + i: u64, }, /// Update the `matched` log id of a replication target. @@ -1027,6 +1028,7 @@ pub(crate) enum ExternalCommand { } /// An RPC sent by a cluster leader to replicate log entries (§5.3), and as a heartbeat (§5.2). +#[derive(Clone)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct AppendEntriesRequest { pub vote: Vote, @@ -1043,17 +1045,6 @@ pub struct AppendEntriesRequest { pub leader_commit: Option>, } -impl Clone for AppendEntriesRequest { - fn clone(&self) -> Self { - Self { - vote: self.vote, - prev_log_id: self.prev_log_id, - entries: self.entries.clone(), - leader_commit: self.leader_commit, - } - } -} - impl Debug for AppendEntriesRequest where C::D: Debug { @@ -1122,7 +1113,7 @@ pub struct VoteRequest { impl MessageSummary> for VoteRequest { fn summary(&self) -> String { - format!("{}, last_log:{:?}", self.vote, self.last_log_id.map(|x| x.to_string())) + format!("{}, last_log:{:?}", self.vote, self.last_log_id.map(|x| x.to_string()),) } } diff --git a/openraft/src/raft_state.rs b/openraft/src/raft_state.rs index a797a5aaa..8c1cd1da6 100644 --- a/openraft/src/raft_state.rs +++ b/openraft/src/raft_state.rs @@ -1,4 +1,7 @@ use std::error::Error; +use std::time::Duration; + +use tokio::time::Instant; use crate::engine::LogIdList; use crate::entry::RaftEntry; @@ -88,7 +91,7 @@ pub(crate) trait VoteStateReader { } /// A struct used to represent the raft state which a Raft node needs. -#[derive(Clone, Debug, Default, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct RaftState where NID: NodeId, @@ -119,6 +122,14 @@ where // -- // -- volatile fields: they are not persisted. // -- + /// Cached current time. + pub(crate) now: Instant, + + /// The time when the active leader is considered expired. + /// + /// It will be updated every time a message is received from the active leader. + pub(crate) leader_expire_at: Instant, + pub server_state: ServerState, /// The log id upto which the next time it purges. @@ -128,6 +139,29 @@ where pub(crate) purge_upto: Option>, } +impl Default for RaftState +where + NID: NodeId, + N: Node, +{ + fn default() -> Self { + Self { + vote: Vote::default(), + committed: None, + purged_next: 0, + log_ids: LogIdList::default(), + membership_state: MembershipState::default(), + snapshot_meta: SnapshotMeta::default(), + now: Instant::now(), + // no active leader + leader_expire_at: Instant::now() - Duration::from_millis(1), + + server_state: ServerState::default(), + purge_upto: None, + } + } +} + impl LogStateReader for RaftState where NID: NodeId, @@ -199,6 +233,15 @@ where NID: NodeId, N: Node, { + /// Returns the time when the leader lease expires + pub fn leader_expire_at(&self) -> Instant { + self.leader_expire_at + } + + pub(crate) fn update_now(&mut self, now: Instant) { + self.now = now; + } + /// Append a list of `log_id`. /// /// The log ids in the input has to be continuous. @@ -307,8 +350,6 @@ where } /// Build a ForwardToLeader error that contains the leader id and node it knows. - // TODO: This will be used in next PR. delete this attr - #[allow(dead_code)] pub(crate) fn forward_to_leader(&self) -> ForwardToLeader { let vote = self.get_vote(); diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index b63a663b4..5ba11fcc6 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -421,6 +421,13 @@ impl, S: RaftStorage> Replication self.committed = c; } + Replicate::Heartbeat => { + // Nothing to do. Heartbeat message is just for waking up replication to send + // something: When all messages are drained, + // - if self.next_action is None, it resend an empty AppendEntries request as + // heartbeat. + //- If self.next_action is not None, it will serve as a heartbeat. + } Replicate::Logs { id, log_id_range } => { if let ReplicationAction::None = self.next_action { self.next_action = ReplicationAction::Logs { id, log_id_range }; @@ -488,6 +495,9 @@ where /// Inform replication stream to forward the committed log id to followers/learners. Committed(Option>), + /// Send an empty AppendEntries RPC as heartbeat. + Heartbeat, + /// Inform replication stream to forward the log entries to followers/learners. Logs { id: u64, log_id_range: LogIdRange }, @@ -506,6 +516,7 @@ where Replicate::Committed(c) => { format!("Replicate::Committed: {:?}", c) } + Replicate::Heartbeat => "Replicate::Heartbeat".to_string(), Replicate::Logs { id, log_id_range } => { format!("Replicate::Entries(id={}): {}", id, log_id_range) } diff --git a/openraft/src/storage/helper.rs b/openraft/src/storage/helper.rs index 231d808ce..6c4a9b2e2 100644 --- a/openraft/src/storage/helper.rs +++ b/openraft/src/storage/helper.rs @@ -2,6 +2,9 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::ops::RangeBounds; use std::sync::Arc; +use std::time::Duration; + +use tokio::time::Instant; use crate::defensive::check_range_matches_entries; use crate::engine::LogIdList; @@ -77,6 +80,9 @@ where snapshot_meta, // -- volatile fields: they are not persisted. + now: Instant::now(), + // no active leader + leader_expire_at: Instant::now() - Duration::from_millis(1), server_state: Default::default(), purge_upto: last_purged_log_id, }) diff --git a/openraft/src/testing/suite.rs b/openraft/src/testing/suite.rs index b51556d36..672a3baef 100644 --- a/openraft/src/testing/suite.rs +++ b/openraft/src/testing/suite.rs @@ -409,8 +409,11 @@ where } pub async fn get_initial_state_without_init(mut store: S) -> Result<(), StorageError> { - let initial = StorageHelper::new(&mut store).get_initial_state().await?; - assert_eq!(RaftState::default(), initial, "uninitialized state"); + let mut initial = StorageHelper::new(&mut store).get_initial_state().await?; + let want = RaftState::default(); + initial.leader_expire_at = want.leader_expire_at; + initial.now = want.now; + assert_eq!(want, initial, "uninitialized state"); Ok(()) } diff --git a/openraft/tests/append_entries/main.rs b/openraft/tests/append_entries/main.rs index 4e5b4db05..d54b93709 100644 --- a/openraft/tests/append_entries/main.rs +++ b/openraft/tests/append_entries/main.rs @@ -16,5 +16,6 @@ mod t40_append_updates_membership; mod t50_append_entries_with_bigger_term; mod t50_replication_1_voter_to_isolated_learner; mod t60_enable_heartbeat; +mod t60_heartbeat_reject_vote; mod t60_large_heartbeat; mod t90_issue_216_stale_last_log_id; diff --git a/openraft/tests/append_entries/t10_see_higher_vote.rs b/openraft/tests/append_entries/t10_see_higher_vote.rs index 0c31b37ef..4b639f559 100644 --- a/openraft/tests/append_entries/t10_see_higher_vote.rs +++ b/openraft/tests/append_entries/t10_see_higher_vote.rs @@ -12,6 +12,7 @@ use openraft::RaftNetworkFactory; use openraft::ServerState; use openraft::Vote; use openraft_memstore::ClientRequest; +use tokio::time::sleep; use crate::fixtures::init_default_ut_tracing; use crate::fixtures::RaftRouter; @@ -23,6 +24,8 @@ async fn append_sees_higher_vote() -> Result<()> { Config { enable_heartbeat: false, enable_elect: false, + election_timeout_min: 500, + election_timeout_max: 501, ..Default::default() } .validate()?, @@ -34,7 +37,10 @@ async fn append_sees_higher_vote() -> Result<()> { tracing::info!("--- upgrade vote on node-1"); { - router + // Let leader lease expire + sleep(Duration::from_millis(800)).await; + + let resp = router .new_client(1, &()) .await .send_vote(VoteRequest { @@ -42,6 +48,8 @@ async fn append_sees_higher_vote() -> Result<()> { last_log_id: Some(LogId::new(CommittedLeaderId::new(10, 1), 5)), }) .await?; + + assert!(resp.vote_granted); } tracing::info!("--- a write operation will see a higher vote, then the leader revert to follower"); diff --git a/openraft/tests/append_entries/t60_enable_heartbeat.rs b/openraft/tests/append_entries/t60_enable_heartbeat.rs index 7dd7738c7..124e2da77 100644 --- a/openraft/tests/append_entries/t60_enable_heartbeat.rs +++ b/openraft/tests/append_entries/t60_enable_heartbeat.rs @@ -4,11 +4,13 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; use openraft::Config; +use tokio::time::sleep; +use tokio::time::Instant; use crate::fixtures::init_default_ut_tracing; use crate::fixtures::RaftRouter; -/// Enable heartbeat, heartbeat log should be replicated. +/// Enable heartbeat, heartbeat should be replicated. #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] async fn enable_heartbeat() -> Result<()> { // Setup test dependencies. @@ -21,16 +23,27 @@ async fn enable_heartbeat() -> Result<()> { ); let mut router = RaftRouter::new(config.clone()); - let mut log_index = router.new_nodes_from_single(btreeset! {0,1,2}, btreeset! {3}).await?; + let log_index = router.new_nodes_from_single(btreeset! {0,1,2}, btreeset! {3}).await?; + let _ = log_index; let node0 = router.get_raft_handle(&0)?; node0.enable_heartbeat(true); - for _i in 0..10 { - log_index += 1; // new heartbeat log - router.wait(&0, timeout()).log_at_least(Some(log_index), "node 0 emit heartbeat log").await?; - router.wait(&1, timeout()).log_at_least(Some(log_index), "node 1 receives heartbeat").await?; - router.wait(&3, timeout()).log_at_least(Some(log_index), "node 1 receives heartbeat").await?; + for _i in 0..3 { + sleep(Duration::from_millis(500)).await; + + for node_id in [1, 2, 3] { + // no new log will be sent, . + router + .wait(&node_id, timeout()) + .log_at_least(Some(log_index), format!("node {} emit heartbeat log", node_id)) + .await?; + + // leader lease is extended. + router.external_request(node_id, move |state, _store, _net| { + assert!(state.leader_expire_at() > Instant::now()); + }); + } } Ok(()) diff --git a/openraft/tests/append_entries/t60_heartbeat_reject_vote.rs b/openraft/tests/append_entries/t60_heartbeat_reject_vote.rs new file mode 100644 index 000000000..d8b690409 --- /dev/null +++ b/openraft/tests/append_entries/t60_heartbeat_reject_vote.rs @@ -0,0 +1,87 @@ +use std::sync::Arc; +use std::sync::Mutex; +use std::time::Duration; + +use anyhow::Result; +use maplit::btreeset; +use openraft::raft::VoteRequest; +use openraft::testing::log_id; +use openraft::Config; +use openraft::Vote; +use tokio::time::sleep; +use tokio::time::Instant; + +use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::RaftRouter; + +/// If a follower receives heartbeat, it should reject vote request until leader lease expired. +#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] +async fn heartbeat_reject_vote() -> Result<()> { + let config = Arc::new( + Config { + heartbeat_interval: 200, + election_timeout_min: 1000, + election_timeout_max: 1001, + ..Default::default() + } + .validate()?, + ); + let mut router = RaftRouter::new(config.clone()); + + let log_index = router.new_nodes_from_single(btreeset! {0,1,2}, btreeset! {3}).await?; + + let leader_expire_at = Arc::new(Mutex::new(Instant::now())); + tracing::info!("--- leader lease is set by heartbeat"); + { + let ll = leader_expire_at.clone(); + + router.external_request(1, move |state, _store, _net| { + let mut l = ll.lock().unwrap(); + *l = state.leader_expire_at(); + assert!(state.leader_expire_at() > Instant::now()); + }); + + sleep(Duration::from_millis(700)).await; + + let ll = leader_expire_at.clone(); + + router.external_request(1, move |state, _store, _net| { + let l = ll.lock().unwrap(); + assert!(state.leader_expire_at() > Instant::now()); + assert!(state.leader_expire_at() > *l); + }); + } + + let node0 = router.get_raft_handle(&0)?; + let node1 = router.get_raft_handle(&1)?; + + tracing::info!("--- leader lease rejects vote request"); + { + let res = node1.vote(VoteRequest::new(Vote::new(10, 2), Some(log_id(10, 10)))).await?; + assert!(!res.vote_granted); + } + + tracing::info!("--- ensures no more blank-log heartbeat is used"); + { + // TODO: this part can be removed when blank-log heartbeat is removed. + sleep(Duration::from_millis(1500)).await; + router.wait(&1, timeout()).log(Some(log_index), "no log is written").await?; + } + + tracing::info!("--- disable heartbeat, vote request will be granted"); + { + node0.enable_heartbeat(false); + sleep(Duration::from_millis(1500)).await; + + router.wait(&1, timeout()).log(Some(log_index), "no log is written").await?; + + let res = node1.vote(VoteRequest::new(Vote::new(10, 2), Some(log_id(10, 10)))).await?; + assert!(res.vote_granted, "vote is granted after leader lease expired"); + } + + Ok(()) +} + +fn timeout() -> Option { + Some(Duration::from_millis(1_000)) +} diff --git a/openraft/tests/membership/t25_elect_with_new_config.rs b/openraft/tests/membership/t25_elect_with_new_config.rs index a8e8f2e35..dad2a3b5e 100644 --- a/openraft/tests/membership/t25_elect_with_new_config.rs +++ b/openraft/tests/membership/t25_elect_with_new_config.rs @@ -5,6 +5,7 @@ use anyhow::Result; use maplit::btreeset; use openraft::Config; use openraft::LogIdOptionExt; +use tokio::time::sleep; use crate::fixtures::init_default_ut_tracing; use crate::fixtures::RaftRouter; @@ -39,6 +40,9 @@ async fn leader_election_after_changing_0_to_01234() -> Result<()> { tracing::info!("--- isolating leader node 0"); router.isolate_node(0); + // Wait for leader lease to expire + sleep(Duration::from_millis(700)).await; + // Let node-1 become leader. let node_1 = router.get_raft_handle(&1)?; node_1.trigger_elect().await?; diff --git a/openraft/tests/metrics/t30_leader_metrics.rs b/openraft/tests/metrics/t30_leader_metrics.rs index ae265654a..3ca9086a1 100644 --- a/openraft/tests/metrics/t30_leader_metrics.rs +++ b/openraft/tests/metrics/t30_leader_metrics.rs @@ -12,6 +12,7 @@ use openraft::LogId; use openraft::ServerState; #[allow(unused_imports)] use pretty_assertions::assert_eq; #[allow(unused_imports)] use pretty_assertions::assert_ne; +use tokio::time::sleep; use crate::fixtures::init_default_ut_tracing; use crate::fixtures::RaftRouter; @@ -167,6 +168,9 @@ async fn leader_metrics() -> Result<()> { tracing::info!("--- let node-1 to elect to take leadership from node-0"); { + // Let the leader lease expire + sleep(Duration::from_millis(700)).await; + n1.trigger_elect().await?; n1.wait(timeout()).state(ServerState::Leader, "node-1 becomes leader").await?; n1.wait(timeout()).metrics(|x| x.replication.is_some(), "node-1 starts replication").await?;