Skip to content

Commit

Permalink
Change: remove tx from spawn_replication_stream()
Browse files Browse the repository at this point in the history
Replication should not be responsible invoke the callback when
replication become upto date. It makes the logic dirty.
Such a job can be done by watching the metrics change.

- Change: API: AddLearnerResponse has a new field `membership_log_id`
  which is the log id of the membership log that contains the newly
  added learner.
  • Loading branch information
drmingdrmer committed Jul 1, 2022
1 parent 54ebed1 commit 01a16d0
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 80 deletions.
62 changes: 35 additions & 27 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::raft::AddLearnerResponse;
use crate::raft::ClientWriteResponse;
use crate::raft::RaftRespTx;
use crate::raft_types::LogIdOptionExt;
use crate::raft_types::RaftLogId;
use crate::runtime::RaftRuntime;
use crate::versioned::Updatable;
use crate::ChangeMembers;
Expand All @@ -40,15 +41,15 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
&mut self,
target: C::NodeId,
node: Option<Node>,
) -> Result<(), AddLearnerError<C::NodeId>> {
) -> Result<LogId<C::NodeId>, AddLearnerError<C::NodeId>> {
let curr = &self.core.engine.state.membership_state.effective.membership;
let new_membership = curr.add_learner(target, node)?;

tracing::debug!(?new_membership, "new_config");

self.write_entry(EntryPayload::Membership(new_membership), None).await?;
let log_id = self.write_entry(EntryPayload::Membership(new_membership), None).await?;

Ok(())
Ok(log_id)
}

/// Add a new node to the cluster as a learner, bringing it up-to-speed, and then responding
Expand All @@ -60,24 +61,26 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
///
/// If `blocking` is `true`, the result is sent to `tx` as the target node log has caught up. Otherwise, result is
/// sent at once, no matter whether the target node log is lagging or not.
#[tracing::instrument(level = "debug", skip(self, tx))]
#[tracing::instrument(level = "debug", skip(self))]
pub(super) async fn add_learner(
&mut self,
target: C::NodeId,
node: Option<Node>,
tx: RaftRespTx<AddLearnerResponse<C::NodeId>, AddLearnerError<C::NodeId>>,
blocking: bool,
) {
) -> Result<(), Fatal<C::NodeId>> {
tracing::debug!("add target node {} as learner {:?}", target, self.nodes.keys());

// Ensure the node doesn't already exist in the current
// config, in the set of new nodes already being synced, or in the nodes being removed.
// TODO: remove this
if target == self.core.id {
tracing::debug!("target node is this node");

let _ = tx.send(Ok(AddLearnerResponse {
membership_log_id: self.core.engine.state.membership_state.effective.log_id,
matched: self.core.engine.state.last_log_id(),
}));
return;
return Ok(());
}

let curr = &self.core.engine.state.membership_state.effective;
Expand All @@ -86,8 +89,11 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS

if let Some(t) = self.nodes.get(&target) {
tracing::debug!("target node is already a cluster member or is being synced");
let _ = tx.send(Ok(AddLearnerResponse { matched: t.matched }));
return;
let _ = tx.send(Ok(AddLearnerResponse {
membership_log_id: self.core.engine.state.membership_state.effective.log_id,
matched: t.matched,
}));
return Ok(());
} else {
unreachable!(
"node {} in membership but there is no replication stream for it",
Expand All @@ -99,29 +105,31 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
// TODO(xp): when new membership log is appended, write_entry() should be responsible to setup new replication
// stream.
let res = self.write_add_learner_entry(target, node).await;
if let Err(e) = res {
let _ = tx.send(Err(e));
return;
}

if blocking {
let state = self.spawn_replication_stream(target, Some(tx)).await;
// TODO(xp): nodes, i.e., replication streams, should also be a property of follower or candidate, for
// sending vote requests etc?
self.nodes.insert(target, state);
} else {
let state = self.spawn_replication_stream(target, None).await;
self.nodes.insert(target, state);
let log_id = match res {
Ok(x) => x,
Err(e) => {
let _ = tx.send(Err(e));
return Ok(());
}
};

// non-blocking mode, do not know about the replication stat.
let _ = tx.send(Ok(AddLearnerResponse { matched: None }));
}
// TODO(xp): nodes, i.e., replication streams, should also be a property of follower or candidate, for
// sending vote requests etc?
let state = self.spawn_replication_stream(target).await;
self.nodes.insert(target, state);

tracing::debug!(
"after add target node {} as learner {:?}",
target,
self.core.engine.state.last_log_id()
);

let _ = tx.send(Ok(AddLearnerResponse {
membership_log_id: Some(log_id),
matched: None,
}));

Ok(())
}

/// return true if there is pending uncommitted config change
Expand Down Expand Up @@ -250,7 +258,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
&mut self,
payload: EntryPayload<C>,
resp_tx: Option<RaftRespTx<ClientWriteResponse<C>, ClientWriteError<C::NodeId>>>,
) -> Result<(), Fatal<C::NodeId>> {
) -> Result<LogId<C::NodeId>, Fatal<C::NodeId>> {
let mut entry_refs = [EntryRef::new(&payload)];
// TODO: it should returns membership config error etc. currently this is done by the caller.
self.core.engine.leader_append_entries(&mut entry_refs);
Expand All @@ -262,7 +270,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS

self.run_engine_commands(&entry_refs).await?;

Ok(())
Ok(*entry_refs[0].get_log_id())
}

#[tracing::instrument(level = "debug", skip_all)]
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/core/leader_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
};

for target in targets {
let state = self.spawn_replication_stream(target, None).await;
let state = self.spawn_replication_stream(target).await;
self.nodes.insert(target, state);
}

Expand Down Expand Up @@ -148,8 +148,8 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
RaftMsg::ClientWriteRequest { rpc, tx } => {
self.write_entry(rpc.payload, Some(tx)).await?;
}
RaftMsg::AddLearner { id, node, tx, blocking } => {
self.add_learner(id, node, tx, blocking).await;
RaftMsg::AddLearner { id, node, tx } => {
self.add_learner(id, node, tx).await?;
}
RaftMsg::ChangeMembership {
changes,
Expand Down
24 changes: 2 additions & 22 deletions openraft/src/core/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ use crate::core::LeaderState;
use crate::core::ReplicationState;
use crate::core::ServerState;
use crate::core::SnapshotState;
use crate::error::AddLearnerError;
use crate::metrics::UpdateMatchedLogId;
use crate::raft::AddLearnerResponse;
use crate::raft::RaftRespTx;
use crate::replication::ReplicaEvent;
use crate::replication::ReplicationStream;
use crate::replication::UpdateReplication;
Expand All @@ -28,13 +25,9 @@ use crate::StorageError;

impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderState<'a, C, N, S> {
/// Spawn a new replication stream returning its replication state handle.
#[tracing::instrument(level = "debug", skip(self, caller_tx))]
#[tracing::instrument(level = "debug", skip(self))]
#[allow(clippy::type_complexity)]
pub(super) async fn spawn_replication_stream(
&mut self,
target: C::NodeId,
caller_tx: Option<RaftRespTx<AddLearnerResponse<C::NodeId>, AddLearnerError<C::NodeId>>>,
) -> ReplicationState<C::NodeId> {
pub(super) async fn spawn_replication_stream(&mut self, target: C::NodeId) -> ReplicationState<C::NodeId> {
let target_node = self.core.engine.state.membership_state.effective.get_node(&target);

let repl_stream = ReplicationStream::new::<C, N, S>(
Expand All @@ -53,7 +46,6 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS
matched: None,
repl_stream,
remove_since: None,
tx: caller_tx,
failures: 0,
}
}
Expand Down Expand Up @@ -135,18 +127,6 @@ impl<'a, C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> LeaderS

state.matched = Some(matched);

// Issue a response on the learners response channel if needed.
if state.is_line_rate(&self.core.engine.state.last_log_id(), &self.core.config) {
// This replication became line rate.

// When adding a learner, it blocks until the replication becomes line-rate.
if let Some(tx) = state.tx.take() {
// TODO(xp): define a specific response type for learner matched event.
let x = AddLearnerResponse { matched: state.matched };
let _ = tx.send(Ok(x));
}
}

// Drop replication stream if needed.
if self.try_remove_replication(target).await {
// nothing to do
Expand Down
7 changes: 0 additions & 7 deletions openraft/src/core/replication_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ use std::fmt::Debug;
use std::fmt::Formatter;

use crate::config::Config;
use crate::error::AddLearnerError;
use crate::raft::AddLearnerResponse;
use crate::raft::RaftRespTx;
use crate::raft_types::LogIdOptionExt;
use crate::replication::ReplicationStream;
use crate::LogId;
Expand All @@ -23,10 +20,6 @@ pub(crate) struct ReplicationState<NID: NodeId> {
///
/// It will be reset once a successful replication is done.
pub failures: u64,

/// The response channel to use for when this node has successfully synced with the cluster.
#[allow(clippy::type_complexity)]
pub tx: Option<RaftRespTx<AddLearnerResponse<NID>, AddLearnerError<NID>>>,
}

impl<NID: NodeId> MessageSummary<ReplicationState<NID>> for ReplicationState<NID> {
Expand Down
109 changes: 102 additions & 7 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tokio::task::JoinHandle;
use tracing::Span;

use crate::config::Config;
use crate::core::is_matched_upto_date;
use crate::core::Expectation;
use crate::core::RaftCore;
use crate::error::AddLearnerError;
Expand Down Expand Up @@ -118,6 +119,8 @@ enum CoreState<NID: NodeId> {
}

struct RaftInner<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> {
id: C::NodeId,
config: Arc<Config>,
tx_api: mpsc::UnboundedSender<(RaftMsg<C, N, S>, Span)>,
rx_metrics: watch::Receiver<RaftMetrics<C::NodeId>>,
// TODO(xp): it does not need to be a async mutex.
Expand Down Expand Up @@ -180,7 +183,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,

let raft_handle = RaftCore::spawn(
id,
config,
config.clone(),
network,
storage,
tx_api.clone(),
Expand All @@ -190,6 +193,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
);

let inner = RaftInner {
id,
config,
tx_api,
rx_metrics,
tx_shutdown: Mutex::new(Some(tx_shutdown)),
Expand Down Expand Up @@ -345,7 +350,96 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
blocking: bool,
) -> Result<AddLearnerResponse<C::NodeId>, AddLearnerError<C::NodeId>> {
let (tx, rx) = oneshot::channel();
self.call_core(RaftMsg::AddLearner { id, node, blocking, tx }, rx).await
let resp = self.call_core(RaftMsg::AddLearner { id, node, tx }, rx).await?;

if !blocking {
return Ok(resp);
}

if self.inner.id == id {
return Ok(resp);
}

// Otherwise, blocks until the replication to the new learner becomes up to date.

// The log id of the membership that contains the added learner.
let membership_log_id = resp.membership_log_id;

let res0 = Arc::new(std::sync::Mutex::new(resp));
let res = res0.clone();

let wait_res = self
.wait(None)
.metrics(
|metrics| match self.check_replication_upto_date(metrics, id, membership_log_id) {
Ok(resp) => {
res.lock().unwrap().membership_log_id = resp;
true
}
// keep waiting
Err(_) => false,
},
"wait new learner to become line-rate",
)
.await;

tracing::info!(wait_res = debug(&wait_res), "waiting for replication to new learner");

let r = {
let x = res0.lock().unwrap();
x.clone()
};
Ok(r)
}

/// Returns Ok() with the latest known matched log id if it should quit waiting: leader change, node removed, or
/// replication becomes upto date.
///
/// Returns Err() if it should keep waiting.
fn check_replication_upto_date(
&self,
metrics: &RaftMetrics<C::NodeId>,
node_id: C::NodeId,
membership_log_id: Option<LogId<C::NodeId>>,
) -> Result<Option<LogId<C::NodeId>>, ()> {
if metrics.membership_config.log_id < membership_log_id {
// Waiting for the latest metrics to report.
return Err(());
}

if !metrics.membership_config.membership.contains(&node_id) {
// This learner has been removed.
return Ok(None);
}

let repl = match &metrics.replication {
None => {
// This node is no longer a leader.
return Ok(None);
}
Some(x) => x,
};

let replication_metrics = &repl.data().replication;
let target_metrics = match replication_metrics.get(&node_id) {
None => {
// Maybe replication is not reported yet. Keep waiting.
return Err(());
}
Some(x) => x,
};

let matched = target_metrics.matched();

let last_log_id = LogId::new(matched.leader_id, metrics.last_log_index.unwrap_or_default());

if is_matched_upto_date(&Some(matched), &Some(last_log_id), &self.inner.config) {
// replication became up to date.
return Ok(Some(matched));
}

// Not up to date, keep waiting.
Err(())
}

/// Propose a cluster configuration change.
Expand Down Expand Up @@ -604,6 +698,10 @@ pub(crate) type RaftRespRx<T, E> = oneshot::Receiver<Result<T, E>>;
#[derive(Debug, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct AddLearnerResponse<NID: NodeId> {
/// The log id of the membership that contains the added learner.
pub membership_log_id: Option<LogId<NID>>,

/// The last log id that matches leader log.
pub matched: Option<LogId<NID>>,
}

Expand Down Expand Up @@ -640,9 +738,6 @@ pub(crate) enum RaftMsg<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStor

node: Option<Node>,

/// If block until the newly added learner becomes line-rate.
blocking: bool,

/// Send the log id when the replication becomes line-rate.
tx: RaftRespTx<AddLearnerResponse<C::NodeId>, AddLearnerError<C::NodeId>>,
},
Expand Down Expand Up @@ -696,8 +791,8 @@ where
RaftMsg::Initialize { members, .. } => {
format!("Initialize: {:?}", members)
}
RaftMsg::AddLearner { id, blocking, .. } => {
format!("AddLearner: id: {}, blocking: {}", id, blocking)
RaftMsg::AddLearner { id, node, .. } => {
format!("AddLearner: id: {}, node: {:?}", id, node)
}
RaftMsg::ChangeMembership {
changes: members,
Expand Down
Loading

0 comments on commit 01a16d0

Please sign in to comment.