From 2c3024c783d832a81c4125232fca39d678cf09b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Wed, 13 Jul 2022 15:55:52 +0800 Subject: [PATCH] Refactor: move `LeaderState.nodes` to `RaftCore.leader_data` --- openraft/src/core/admin.rs | 50 +++------------- openraft/src/core/client.rs | 23 -------- openraft/src/core/leader_state.rs | 35 ++++-------- openraft/src/core/raft_core.rs | 92 ++++++++++++++++++++++++++++-- openraft/src/core/replication.rs | 14 ++++- openraft/src/engine/engine_impl.rs | 22 ++++++- 6 files changed, 139 insertions(+), 97 deletions(-) diff --git a/openraft/src/core/admin.rs b/openraft/src/core/admin.rs index e5bed1657..87e705322 100644 --- a/openraft/src/core/admin.rs +++ b/openraft/src/core/admin.rs @@ -16,7 +16,6 @@ use crate::error::Fatal; use crate::error::InProgress; use crate::error::LearnerIsLagging; use crate::error::LearnerNotFound; -use crate::metrics::RemoveTarget; use crate::progress::Progress; use crate::raft::AddLearnerResponse; use crate::raft::ClientWriteResponse; @@ -24,7 +23,6 @@ use crate::raft::RaftRespTx; use crate::raft_types::RaftLogId; use crate::runtime::RaftRuntime; use crate::summary::MessageSummary; -use crate::versioned::Updatable; use crate::ChangeMembers; use crate::EntryPayload; use crate::LogId; @@ -51,11 +49,15 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS node: Option, tx: RaftRespTx, AddLearnerError>, ) -> Result<(), Fatal> { - tracing::debug!( - "add target node {} as learner; current nodes: {:?}", - target, - self.nodes.keys() - ); + if let Some(l) = &self.core.leader_data { + tracing::debug!( + "add target node {} as learner; current nodes: {:?}", + target, + l.nodes.keys() + ); + } else { + unreachable!("it has to be a leader!!!"); + } // Ensure the node doesn't already exist in the current // config, in the set of new nodes already being synced, or in the nodes being removed. @@ -286,38 +288,4 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS Ok(()) } - - /// Remove a replication if the membership that does not include it has committed. - /// - /// Return true if removed. - #[tracing::instrument(level = "trace", skip(self))] - pub async fn remove_replication(&mut self, target: C::NodeId) -> bool { - tracing::info!("removed_replication to: {}", target); - - let repl_state = self.nodes.remove(&target); - if let Some(s) = repl_state { - let handle = s.handle; - - // Drop sender to notify the task to shutdown - drop(s.repl_tx); - - tracing::debug!("joining removed replication: {}", target); - let _x = handle.await; - tracing::info!("Done joining removed replication : {}", target); - } else { - unreachable!("try to nonexistent replication to {}", target); - } - - if let Some(l) = &mut self.core.leader_data { - l.replication_metrics.update(RemoveTarget { target }); - } else { - unreachable!("It has to be a leader!!!"); - } - - // TODO(xp): set_replication_metrics_changed() can be removed. - // Use self.replication_metrics.version to detect changes. - self.core.engine.metrics_flags.set_replication_changed(); - - true - } } diff --git a/openraft/src/core/client.rs b/openraft/src/core/client.rs index e6281ae18..abb786999 100644 --- a/openraft/src/core/client.rs +++ b/openraft/src/core/client.rs @@ -5,7 +5,6 @@ use maplit::btreeset; use tokio::time::timeout; use tokio::time::Duration; use tracing::Instrument; -use tracing::Level; use crate::core::LeaderState; use crate::core::ServerState; @@ -18,8 +17,6 @@ use crate::quorum::QuorumSet; use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; use crate::raft::RaftRespTx; -use crate::replication::UpdateReplication; -use crate::LogId; use crate::MessageSummary; use crate::RPCTypes; use crate::RaftNetwork; @@ -153,24 +150,4 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS } .into())); } - - /// Begin replicating upto the given log id. - /// - /// It does not block until the entry is committed or actually sent out. - /// It merely broadcasts a signal to inform the replication threads. - #[tracing::instrument(level = "debug", skip_all)] - pub(super) fn replicate_entry(&mut self, log_id: LogId) { - if tracing::enabled!(Level::DEBUG) { - for node_id in self.nodes.keys() { - tracing::debug!(node_id = display(node_id), log_id = display(log_id), "replicate_entry"); - } - } - - for node in self.nodes.values() { - let _ = node.repl_tx.send(UpdateReplication { - last_log_id: Some(log_id), - committed: self.core.engine.state.committed, - }); - } - } } diff --git a/openraft/src/core/leader_state.rs b/openraft/src/core/leader_state.rs index eedfc48a9..6cb70f2b2 100644 --- a/openraft/src/core/leader_state.rs +++ b/openraft/src/core/leader_state.rs @@ -1,5 +1,3 @@ -use std::collections::BTreeMap; - use tokio::sync::mpsc; use tracing::Instrument; use tracing::Span; @@ -11,8 +9,6 @@ use crate::error::Fatal; use crate::raft::RaftMsg; use crate::raft_types::RaftLogId; use crate::replication::ReplicaEvent; -use crate::replication::ReplicationStream; -use crate::replication::UpdateReplication; use crate::runtime::RaftRuntime; use crate::summary::MessageSummary; use crate::Entry; @@ -27,9 +23,6 @@ use crate::Update; pub(crate) struct LeaderState<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> { pub(super) core: &'a mut RaftCore, - /// A mapping of node IDs the replication state of the target node. - pub(super) nodes: BTreeMap>, - /// The stream of events coming from replication streams. #[allow(clippy::type_complexity)] pub(super) replication_rx: mpsc::UnboundedReceiver<(ReplicaEvent, Span)>, @@ -45,7 +38,6 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS let (replication_tx, replication_rx) = mpsc::unbounded_channel(); Self { core, - nodes: BTreeMap::new(), replication_tx, replication_rx, } @@ -71,7 +63,11 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS // TODO(xp): make this Engine::Command driven. for target in targets { let state = self.spawn_replication_stream(target).await; - self.nodes.insert(target, state); + if let Some(l) = &mut self.core.leader_data { + l.nodes.insert(target, state); + } else { + unreachable!("it has to be a leader!!!"); + } } // Commit the initial entry when new leader established. @@ -179,26 +175,17 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> RaftRun { // Run leader specific commands or pass non leader specific commands to self.core. match cmd { - Command::ReplicateCommitted { committed } => { - for node in self.nodes.values() { - let _ = node.repl_tx.send(UpdateReplication { - last_log_id: None, - committed: *committed, - }); - } - } - Command::ReplicateInputEntries { range } => { - if let Some(last) = range.clone().last() { - self.replicate_entry(*input_entries[last].get_log_id()); - } - } Command::UpdateReplicationStreams { remove, add } => { for (node_id, _matched) in remove.iter() { - self.remove_replication(*node_id).await; + self.core.remove_replication(*node_id).await; } for (node_id, _matched) in add.iter() { let state = self.spawn_replication_stream(*node_id).await; - self.nodes.insert(*node_id, state); + if let Some(l) = &mut self.core.leader_data { + l.nodes.insert(*node_id, state); + } else { + unreachable!("it has to be a leader!!!"); + } } } _ => self.core.run_command(input_entries, curr, cmd).await?, diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 6dd56e28b..c33a2290f 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -37,6 +37,7 @@ use crate::error::ForwardToLeader; use crate::error::InitializeError; use crate::error::VoteError; use crate::metrics::RaftMetrics; +use crate::metrics::RemoveTarget; use crate::metrics::ReplicationMetrics; use crate::raft::ClientWriteResponse; use crate::raft::RaftMsg; @@ -45,11 +46,14 @@ use crate::raft::VoteRequest; use crate::raft::VoteResponse; use crate::raft_types::LogIdOptionExt; use crate::raft_types::RaftLogId; +use crate::replication::ReplicationStream; +use crate::replication::UpdateReplication; use crate::runtime::RaftRuntime; use crate::storage::RaftSnapshotBuilder; use crate::storage::StorageHelper; use crate::timer::RaftTimer; use crate::timer::Timeout; +use crate::versioned::Updatable; use crate::versioned::Versioned; use crate::Entry; use crate::EntryPayload; @@ -69,6 +73,11 @@ pub(crate) struct LeaderData { /// Channels to send result back to client when logs are committed. pub(crate) client_resp_channels: BTreeMap, ClientWriteError>>, + /// A mapping of node IDs the replication state of the target node. + // TODO(xp): make it a field of RaftCore. it does not have to belong to leader. + // It requires the Engine to emit correct add/remove replication commands + pub(super) nodes: BTreeMap>, + /// The metrics of all replication streams pub(crate) replication_metrics: Versioned>, } @@ -77,6 +86,7 @@ impl LeaderData { pub(crate) fn new() -> Self { Self { client_resp_channels: Default::default(), + nodes: BTreeMap::new(), replication_metrics: Versioned::new(ReplicationMetrics::default()), } } @@ -666,6 +676,69 @@ impl, S: RaftStorage> RaftCore) { + if let Some(l) = &self.leader_data { + if tracing::enabled!(Level::DEBUG) { + for node_id in l.nodes.keys() { + tracing::debug!(node_id = display(node_id), log_id = display(log_id), "replicate_entry"); + } + } + + for node in l.nodes.values() { + let _ = node.repl_tx.send(UpdateReplication { + last_log_id: Some(log_id), + committed: self.engine.state.committed, + }); + } + } else { + unreachable!("it has to be a leader!!!"); + } + } + + /// Remove a replication if the membership that does not include it has committed. + /// + /// Return true if removed. + #[tracing::instrument(level = "trace", skip(self))] + pub async fn remove_replication(&mut self, target: C::NodeId) -> bool { + tracing::info!("removed_replication to: {}", target); + + let repl_state = if let Some(l) = &mut self.leader_data { + l.nodes.remove(&target) + } else { + unreachable!("it has to be a leader!!!"); + }; + + if let Some(s) = repl_state { + let handle = s.handle; + + // Drop sender to notify the task to shutdown + drop(s.repl_tx); + + tracing::debug!("joining removed replication: {}", target); + let _x = handle.await; + tracing::info!("Done joining removed replication : {}", target); + } else { + unreachable!("try to nonexistent replication to {}", target); + } + + if let Some(l) = &mut self.leader_data { + l.replication_metrics.update(RemoveTarget { target }); + } else { + unreachable!("It has to be a leader!!!"); + } + + // TODO(xp): set_replication_metrics_changed() can be removed. + // Use self.replication_metrics.version to detect changes. + self.engine.metrics_flags.set_replication_changed(); + + true + } + /// Leader will keep working until the effective membership that removes it committed. /// /// This is ony called by leader. @@ -1016,8 +1089,17 @@ impl, S: RaftStorage> RaftRuntime Command::SendVote { vote_req } => { self.spawn_parallel_vote_requests(vote_req).await; } - Command::ReplicateCommitted { .. } => { - unreachable!("leader specific command") + Command::ReplicateCommitted { committed } => { + if let Some(l) = &self.leader_data { + for node in l.nodes.values() { + let _ = node.repl_tx.send(UpdateReplication { + last_log_id: None, + committed: *committed, + }); + } + } else { + unreachable!("it has to be a leader!!!"); + } } Command::LeaderCommit { ref upto, .. } => { for i in self.engine.state.last_applied.next_index()..(upto.index + 1) { @@ -1027,8 +1109,10 @@ impl, S: RaftStorage> RaftRuntime Command::FollowerCommit { upto: _, .. } => { self.replicate_to_state_machine_if_needed().await?; } - Command::ReplicateInputEntries { .. } => { - unreachable!("leader specific command") + Command::ReplicateInputEntries { range } => { + if let Some(last) = range.clone().last() { + self.replicate_entry(*input_ref_entries[last].get_log_id()); + } } Command::UpdateReplicationStreams { .. } => { unreachable!("leader specific command") diff --git a/openraft/src/core/replication.rs b/openraft/src/core/replication.rs index ede3be5e0..f361841a8 100644 --- a/openraft/src/core/replication.rs +++ b/openraft/src/core/replication.rs @@ -99,9 +99,19 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory, S: RaftStorage> LeaderS ); // TODO(xp): a leader has to refuse a message from a previous leader. - if !self.nodes.contains_key(&target) { + if let Some(l) = &self.core.leader_data { + if !l.nodes.contains_key(&target) { + return Ok(()); + }; + } else { + // no longer a leader. + tracing::warn!( + target = display(target), + result = debug(&result), + "received replication update but no longer a leader" + ); return Ok(()); - }; + } tracing::debug!("update matched: {:?}", result); diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index ce658702b..53e359abf 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -124,8 +124,8 @@ impl Engine { #[tracing::instrument(level = "debug", skip(self))] pub(crate) fn elect(&mut self) { // init election + self.enter_leader(); self.state.vote = Vote::new(self.state.vote.term + 1, self.id); - self.state.new_leader(); // Safe unwrap() let leader = self.state.leader.as_mut().unwrap(); @@ -220,8 +220,8 @@ impl Engine { self.set_server_state(ServerState::Learner); } + self.leave_leader(); self.state.vote = resp.vote; - self.state.leader = None; self.push_command(Command::SaveVote { vote: self.state.vote }); return; } @@ -751,6 +751,22 @@ impl Engine { /// Supporting util impl Engine { + /// Enter leader state. + /// + /// Leader state has two phase: election phase and replication phase, similar to paxos phase-1 and phase-2 + fn enter_leader(&mut self) { + self.state.new_leader(); + // TODO: install heartbeat timer + } + + /// Leave leader state. + /// + /// This node then becomes raft-follower or raft-learner. + fn leave_leader(&mut self) { + self.state.leader = None; + // TODO: install election timer if it is a voter + } + /// Update effective membership config if encountering a membership config log entry. fn try_update_membership>(&mut self, entry: &Ent) { if let Some(m) = entry.get_membership() { @@ -981,7 +997,7 @@ impl Engine { } // I'm no longer a leader. - self.state.leader = None; + self.leave_leader(); #[allow(clippy::collapsible_else_if)] if self.state.server_state == ServerState::Follower || self.state.server_state == ServerState::Learner {