Skip to content

Commit

Permalink
Refactor: move LeaderState.nodes to RaftCore.leader_data
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jul 13, 2022
1 parent 01a3a32 commit 2c3024c
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 97 deletions.
50 changes: 9 additions & 41 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@ use crate::error::Fatal;
use crate::error::InProgress;
use crate::error::LearnerIsLagging;
use crate::error::LearnerNotFound;
use crate::metrics::RemoveTarget;
use crate::progress::Progress;
use crate::raft::AddLearnerResponse;
use crate::raft::ClientWriteResponse;
use crate::raft::RaftRespTx;
use crate::raft_types::RaftLogId;
use crate::runtime::RaftRuntime;
use crate::summary::MessageSummary;
use crate::versioned::Updatable;
use crate::ChangeMembers;
use crate::EntryPayload;
use crate::LogId;
Expand All @@ -51,11 +49,15 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
node: Option<Node>,
tx: RaftRespTx<AddLearnerResponse<C::NodeId>, AddLearnerError<C::NodeId>>,
) -> Result<(), Fatal<C::NodeId>> {
tracing::debug!(
"add target node {} as learner; current nodes: {:?}",
target,
self.nodes.keys()
);
if let Some(l) = &self.core.leader_data {
tracing::debug!(
"add target node {} as learner; current nodes: {:?}",
target,
l.nodes.keys()
);
} else {
unreachable!("it has to be a leader!!!");
}

// Ensure the node doesn't already exist in the current
// config, in the set of new nodes already being synced, or in the nodes being removed.
Expand Down Expand Up @@ -286,38 +288,4 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS

Ok(())
}

/// Remove a replication if the membership that does not include it has committed.
///
/// Return true if removed.
#[tracing::instrument(level = "trace", skip(self))]
pub async fn remove_replication(&mut self, target: C::NodeId) -> bool {
tracing::info!("removed_replication to: {}", target);

let repl_state = self.nodes.remove(&target);
if let Some(s) = repl_state {
let handle = s.handle;

// Drop sender to notify the task to shutdown
drop(s.repl_tx);

tracing::debug!("joining removed replication: {}", target);
let _x = handle.await;
tracing::info!("Done joining removed replication : {}", target);
} else {
unreachable!("try to nonexistent replication to {}", target);
}

if let Some(l) = &mut self.core.leader_data {
l.replication_metrics.update(RemoveTarget { target });
} else {
unreachable!("It has to be a leader!!!");
}

// TODO(xp): set_replication_metrics_changed() can be removed.
// Use self.replication_metrics.version to detect changes.
self.core.engine.metrics_flags.set_replication_changed();

true
}
}
23 changes: 0 additions & 23 deletions openraft/src/core/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use maplit::btreeset;
use tokio::time::timeout;
use tokio::time::Duration;
use tracing::Instrument;
use tracing::Level;

use crate::core::LeaderState;
use crate::core::ServerState;
Expand All @@ -18,8 +17,6 @@ use crate::quorum::QuorumSet;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::RaftRespTx;
use crate::replication::UpdateReplication;
use crate::LogId;
use crate::MessageSummary;
use crate::RPCTypes;
use crate::RaftNetwork;
Expand Down Expand Up @@ -153,24 +150,4 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
}
.into()));
}

/// Begin replicating upto the given log id.
///
/// It does not block until the entry is committed or actually sent out.
/// It merely broadcasts a signal to inform the replication threads.
#[tracing::instrument(level = "debug", skip_all)]
pub(super) fn replicate_entry(&mut self, log_id: LogId<C::NodeId>) {
if tracing::enabled!(Level::DEBUG) {
for node_id in self.nodes.keys() {
tracing::debug!(node_id = display(node_id), log_id = display(log_id), "replicate_entry");
}
}

for node in self.nodes.values() {
let _ = node.repl_tx.send(UpdateReplication {
last_log_id: Some(log_id),
committed: self.core.engine.state.committed,
});
}
}
}
35 changes: 11 additions & 24 deletions openraft/src/core/leader_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::collections::BTreeMap;

use tokio::sync::mpsc;
use tracing::Instrument;
use tracing::Span;
Expand All @@ -11,8 +9,6 @@ use crate::error::Fatal;
use crate::raft::RaftMsg;
use crate::raft_types::RaftLogId;
use crate::replication::ReplicaEvent;
use crate::replication::ReplicationStream;
use crate::replication::UpdateReplication;
use crate::runtime::RaftRuntime;
use crate::summary::MessageSummary;
use crate::Entry;
Expand All @@ -27,9 +23,6 @@ use crate::Update;
pub(crate) struct LeaderState<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> {
pub(super) core: &'a mut RaftCore<C, N, S>,

/// A mapping of node IDs the replication state of the target node.
pub(super) nodes: BTreeMap<C::NodeId, ReplicationStream<C::NodeId>>,

/// The stream of events coming from replication streams.
#[allow(clippy::type_complexity)]
pub(super) replication_rx: mpsc::UnboundedReceiver<(ReplicaEvent<C::NodeId, S::SnapshotData>, Span)>,
Expand All @@ -45,7 +38,6 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
let (replication_tx, replication_rx) = mpsc::unbounded_channel();
Self {
core,
nodes: BTreeMap::new(),
replication_tx,
replication_rx,
}
Expand All @@ -71,7 +63,11 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
// TODO(xp): make this Engine::Command driven.
for target in targets {
let state = self.spawn_replication_stream(target).await;
self.nodes.insert(target, state);
if let Some(l) = &mut self.core.leader_data {
l.nodes.insert(target, state);
} else {
unreachable!("it has to be a leader!!!");
}
}

// Commit the initial entry when new leader established.
Expand Down Expand Up @@ -179,26 +175,17 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRun
{
// Run leader specific commands or pass non leader specific commands to self.core.
match cmd {
Command::ReplicateCommitted { committed } => {
for node in self.nodes.values() {
let _ = node.repl_tx.send(UpdateReplication {
last_log_id: None,
committed: *committed,
});
}
}
Command::ReplicateInputEntries { range } => {
if let Some(last) = range.clone().last() {
self.replicate_entry(*input_entries[last].get_log_id());
}
}
Command::UpdateReplicationStreams { remove, add } => {
for (node_id, _matched) in remove.iter() {
self.remove_replication(*node_id).await;
self.core.remove_replication(*node_id).await;
}
for (node_id, _matched) in add.iter() {
let state = self.spawn_replication_stream(*node_id).await;
self.nodes.insert(*node_id, state);
if let Some(l) = &mut self.core.leader_data {
l.nodes.insert(*node_id, state);
} else {
unreachable!("it has to be a leader!!!");
}
}
}
_ => self.core.run_command(input_entries, curr, cmd).await?,
Expand Down
92 changes: 88 additions & 4 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::error::ForwardToLeader;
use crate::error::InitializeError;
use crate::error::VoteError;
use crate::metrics::RaftMetrics;
use crate::metrics::RemoveTarget;
use crate::metrics::ReplicationMetrics;
use crate::raft::ClientWriteResponse;
use crate::raft::RaftMsg;
Expand All @@ -45,11 +46,14 @@ use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft_types::LogIdOptionExt;
use crate::raft_types::RaftLogId;
use crate::replication::ReplicationStream;
use crate::replication::UpdateReplication;
use crate::runtime::RaftRuntime;
use crate::storage::RaftSnapshotBuilder;
use crate::storage::StorageHelper;
use crate::timer::RaftTimer;
use crate::timer::Timeout;
use crate::versioned::Updatable;
use crate::versioned::Versioned;
use crate::Entry;
use crate::EntryPayload;
Expand All @@ -69,6 +73,11 @@ pub(crate) struct LeaderData<C: RaftTypeConfig> {
/// Channels to send result back to client when logs are committed.
pub(crate) client_resp_channels: BTreeMap<u64, RaftRespTx<ClientWriteResponse<C>, ClientWriteError<C::NodeId>>>,

/// A mapping of node IDs the replication state of the target node.
// TODO(xp): make it a field of RaftCore. it does not have to belong to leader.
// It requires the Engine to emit correct add/remove replication commands
pub(super) nodes: BTreeMap<C::NodeId, ReplicationStream<C::NodeId>>,

/// The metrics of all replication streams
pub(crate) replication_metrics: Versioned<ReplicationMetrics<C::NodeId>>,
}
Expand All @@ -77,6 +86,7 @@ impl<C: RaftTypeConfig> LeaderData<C> {
pub(crate) fn new() -> Self {
Self {
client_resp_channels: Default::default(),
nodes: BTreeMap::new(),
replication_metrics: Versioned::new(ReplicationMetrics::default()),
}
}
Expand Down Expand Up @@ -666,6 +676,69 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
Ok(())
}

/// Begin replicating upto the given log id.
///
/// It does not block until the entry is committed or actually sent out.
/// It merely broadcasts a signal to inform the replication threads.
#[tracing::instrument(level = "debug", skip_all)]
pub(super) fn replicate_entry(&mut self, log_id: LogId<C::NodeId>) {
if let Some(l) = &self.leader_data {
if tracing::enabled!(Level::DEBUG) {
for node_id in l.nodes.keys() {
tracing::debug!(node_id = display(node_id), log_id = display(log_id), "replicate_entry");
}
}

for node in l.nodes.values() {
let _ = node.repl_tx.send(UpdateReplication {
last_log_id: Some(log_id),
committed: self.engine.state.committed,
});
}
} else {
unreachable!("it has to be a leader!!!");
}
}

/// Remove a replication if the membership that does not include it has committed.
///
/// Return true if removed.
#[tracing::instrument(level = "trace", skip(self))]
pub async fn remove_replication(&mut self, target: C::NodeId) -> bool {
tracing::info!("removed_replication to: {}", target);

let repl_state = if let Some(l) = &mut self.leader_data {
l.nodes.remove(&target)
} else {
unreachable!("it has to be a leader!!!");
};

if let Some(s) = repl_state {
let handle = s.handle;

// Drop sender to notify the task to shutdown
drop(s.repl_tx);

tracing::debug!("joining removed replication: {}", target);
let _x = handle.await;
tracing::info!("Done joining removed replication : {}", target);
} else {
unreachable!("try to nonexistent replication to {}", target);
}

if let Some(l) = &mut self.leader_data {
l.replication_metrics.update(RemoveTarget { target });
} else {
unreachable!("It has to be a leader!!!");
}

// TODO(xp): set_replication_metrics_changed() can be removed.
// Use self.replication_metrics.version to detect changes.
self.engine.metrics_flags.set_replication_changed();

true
}

/// Leader will keep working until the effective membership that removes it committed.
///
/// This is ony called by leader.
Expand Down Expand Up @@ -1016,8 +1089,17 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
Command::SendVote { vote_req } => {
self.spawn_parallel_vote_requests(vote_req).await;
}
Command::ReplicateCommitted { .. } => {
unreachable!("leader specific command")
Command::ReplicateCommitted { committed } => {
if let Some(l) = &self.leader_data {
for node in l.nodes.values() {
let _ = node.repl_tx.send(UpdateReplication {
last_log_id: None,
committed: *committed,
});
}
} else {
unreachable!("it has to be a leader!!!");
}
}
Command::LeaderCommit { ref upto, .. } => {
for i in self.engine.state.last_applied.next_index()..(upto.index + 1) {
Expand All @@ -1027,8 +1109,10 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
Command::FollowerCommit { upto: _, .. } => {
self.replicate_to_state_machine_if_needed().await?;
}
Command::ReplicateInputEntries { .. } => {
unreachable!("leader specific command")
Command::ReplicateInputEntries { range } => {
if let Some(last) = range.clone().last() {
self.replicate_entry(*input_ref_entries[last].get_log_id());
}
}
Command::UpdateReplicationStreams { .. } => {
unreachable!("leader specific command")
Expand Down
14 changes: 12 additions & 2 deletions openraft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,19 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
);

// TODO(xp): a leader has to refuse a message from a previous leader.
if !self.nodes.contains_key(&target) {
if let Some(l) = &self.core.leader_data {
if !l.nodes.contains_key(&target) {
return Ok(());
};
} else {
// no longer a leader.
tracing::warn!(
target = display(target),
result = debug(&result),
"received replication update but no longer a leader"
);
return Ok(());
};
}

tracing::debug!("update matched: {:?}", result);

Expand Down
Loading

0 comments on commit 2c3024c

Please sign in to comment.