Skip to content

Commit

Permalink
Change: change-membership does not return error when replication lags
Browse files Browse the repository at this point in the history
If `blocking` is `true`, `Raft::change_membership(..., blocking)` will
block until repliication to new nodes become upto date.
But it won't return an error when proposing change-membership log.

- Change: remove two errors: `LearnerIsLagging` and `LearnerNotFound`.

- Fix: #581
  • Loading branch information
drmingdrmer committed Oct 22, 2022
1 parent 2896b98 commit c6fe29d
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 102 deletions.
45 changes: 0 additions & 45 deletions openraft/src/core/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ use crate::error::ClientWriteError;
use crate::error::EmptyMembership;
use crate::error::InProgress;
use crate::error::InitializeError;
use crate::error::LearnerIsLagging;
use crate::error::LearnerNotFound;
use crate::error::RemoveLearnerError;
use crate::raft::AddLearnerResponse;
use crate::raft::ClientWriteResponse;
Expand Down Expand Up @@ -144,7 +142,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
pub(super) async fn change_membership(
&mut self,
members: BTreeSet<NodeId>,
blocking: bool,
tx: RaftRespTx<ClientWriteResponse<R>, ClientWriteError>,
) -> Result<(), StorageError> {
tracing::info!("change_membership: members: {:?}", members);
Expand Down Expand Up @@ -174,48 +171,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>

tracing::info!("change_membership: new_config: {:?}", new_config);

// Check the proposed config for any new nodes. If ALL new nodes already have replication
// streams AND are ready to join, then we can immediately proceed with entering joint
// consensus. Else, new nodes need to first be brought up-to-speed.
//
// Here, all we do is check to see which nodes still need to be synced, which determines
// if we can proceed.

// TODO(xp): test change membership without adding as learner.

// TODO(xp): 111 test adding a node that is not learner.
// TODO(xp): 111 test adding a node that is lagging.
for new_node in members.difference(curr.all_nodes()) {
match self.nodes.get(new_node) {
Some(node) => {
if node.is_line_rate(&self.core.last_log_id, &self.core.config) {
// Node is ready to join.
continue;
}

if !blocking {
// Node has repl stream, but is not yet ready to join.
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(
ChangeMembershipError::LearnerIsLagging(LearnerIsLagging {
node_id: *new_node,
matched: node.matched,
distance: self.core.last_log_id.next_index().saturating_sub(node.matched.next_index()),
}),
)));
return Ok(());
}
}

// Node does not yet have a repl stream, spawn one.
None => {
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(
ChangeMembershipError::LearnerNotFound(LearnerNotFound { node_id: *new_node }),
)));
return Ok(());
}
}
}

self.append_membership_log(new_config, Some(tx)).await?;
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -809,8 +809,8 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
RaftMsg::RemoveLearner { id, tx } => {
self.remove_learner(id, tx);
}
RaftMsg::ChangeMembership { members, blocking, tx } => {
self.change_membership(members, blocking, tx).await?;
RaftMsg::ChangeMembership { members, tx } => {
self.change_membership(members, tx).await?;
}
RaftMsg::ForceSnapshotting { tx } => {
for node in self.nodes.values() {
Expand Down
20 changes: 0 additions & 20 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,6 @@ pub enum ChangeMembershipError {

#[error(transparent)]
EmptyMembership(#[from] EmptyMembership),

#[error(transparent)]
LearnerNotFound(#[from] LearnerNotFound),

#[error(transparent)]
LearnerIsLagging(#[from] LearnerIsLagging),
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
Expand Down Expand Up @@ -261,20 +255,6 @@ pub struct InProgress {
pub membership_log_id: LogId,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error("to add a member {node_id} first need to add it as learner")]
pub struct LearnerNotFound {
pub node_id: NodeId,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error("replication to learner {node_id} is lagging {distance}, matched: {matched:?}, can not add as member")]
pub struct LearnerIsLagging {
pub node_id: NodeId,
pub matched: Option<LogId>,
pub distance: u64,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, thiserror::Error)]
#[error("new membership can not be empty")]
pub struct EmptyMembership {}
32 changes: 15 additions & 17 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,14 +284,18 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
/// If a node in the proposed config but is not yet a voter or learner, it first calls `add_learner` to setup
/// replication to the new node.
///
/// Internal:
/// - It proposes a **joint** config.
/// - When the **joint** config is committed, it proposes a uniform config.
///
/// If blocking is true, it blocks until every learner becomes up to date.
/// Otherwise it returns error `ChangeMembershipError::LearnerIsLagging` if there is a lagging learner.
///
/// If it lost leadership or crashed before committing the second **uniform** config log, the cluster is left in the
/// Internally it commits two logs to get this task done:
/// - Proposes a **joint** config, e.g. `[{1,2,3}, {4,5,6}]` if trying to change config from `{1,2,3}` to `{4,5,6}`.
/// - When the **joint** config is committed, then proposes a **uniform** config, e.g. `{4,5,6}`.
///
/// If `blocking` is true, it does not propose the membership log until replication to every learner becomes up to
/// date.
/// Otherwise it just proposes membership log at once. In this case, the change-membership log may be
/// committed very quickly: if the **old** cluster constitutes a quorum.
/// E.g., when changing from `[1,2,3,4]` to `[1,2,3,4,5]`, `[1,2,3]` is a quorum in both the old and the new
/// cluster. Then it will return before the replication to node-5 becomes up to date.
///
/// If this node crashes before committing the second **uniform** config log, the cluster may be left in the
/// **joint** config.
#[tracing::instrument(level = "info", skip_all)]
pub async fn change_membership(
Expand Down Expand Up @@ -330,7 +334,6 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
.call_core(
RaftMsg::ChangeMembership {
members: members.clone(),
blocking,
tx,
},
rx,
Expand All @@ -350,7 +353,7 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
tracing::info!("the second step is to change to uniform config: {:?}", members);

let (tx, rx) = oneshot::channel();
let res = self.call_core(RaftMsg::ChangeMembership { members, blocking, tx }, rx).await?;
let res = self.call_core(RaftMsg::ChangeMembership { members, tx }, rx).await?;

tracing::info!("res of second change_membership: {}", res.summary());

Expand Down Expand Up @@ -500,11 +503,6 @@ pub(crate) enum RaftMsg<D: AppData, R: AppDataResponse> {
},
ChangeMembership {
members: BTreeSet<NodeId>,
/// with blocking==false, respond to client a ChangeMembershipError::LearnerIsLagging error at once if a
/// non-member is lagging.
///
/// Otherwise, wait for commit of the member change log.
blocking: bool,
tx: RaftRespTx<ClientWriteResponse<R>, ClientWriteError>,
},
/// Force to switch to snapshot replication to every target.
Expand Down Expand Up @@ -542,8 +540,8 @@ where
RaftMsg::RemoveLearner { id, .. } => {
format!("RemoveLearner: id: {}", id)
}
RaftMsg::ChangeMembership { members, blocking, .. } => {
format!("ChangeMembership: members: {:?}, blocking: {}", members, blocking)
RaftMsg::ChangeMembership { members, .. } => {
format!("ChangeMembership: members: {:?}", members)
}
RaftMsg::ForceSnapshotting { .. } => "ForceSnapshotting".to_string(),
}
Expand Down
43 changes: 25 additions & 18 deletions openraft/tests/membership/t20_change_membership.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::convert::TryInto;
use std::sync::Arc;
use std::time::Duration;

use maplit::btreeset;
use openraft::error::ChangeMembershipError;
use openraft::Config;
use openraft::StorageHelper;

Expand Down Expand Up @@ -59,7 +57,7 @@ async fn change_with_new_learner_blocking() -> anyhow::Result<()> {

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn change_with_lagging_learner_non_blocking() -> anyhow::Result<()> {
// Add a learner into membership config, expect error NonVoterIsLagging.
// Add a learner into membership config.

let (_log_guard, ut_span) = init_ut!();
let _ent = ut_span.enter();
Expand Down Expand Up @@ -90,27 +88,36 @@ async fn change_with_lagging_learner_non_blocking() -> anyhow::Result<()> {
router.wait(&0, timeout()).await?.log(Some(log_index), "received 500 logs").await?;
}

tracing::info!("--- restore replication and change membership at once, expect NonVoterIsLagging");
tracing::info!(
"--- restore replication and change membership at once, it still blocks until logs are replicated to node-1"
);
{
router.restore_node(1).await;
let res = router.change_membership_with_blocking(0, btreeset! {0,1}, false).await;
log_index += 2;

tracing::info!("--- got res: {:?}", res);
assert!(res.is_ok());
router.wait(&1, timeout()).await?.log(Some(log_index), "received 500+2 logs").await?;
}

let err = res.unwrap_err();
let err: ChangeMembershipError = err.try_into().unwrap();

match err {
ChangeMembershipError::LearnerIsLagging(e) => {
tracing::info!(e.distance, "--- distance");
assert_eq!(1, e.node_id);
assert!(e.distance >= lag_threshold);
assert!(e.distance < 500);
}
_ => {
panic!("expect ChangeMembershipError::NonVoterNotFound");
}
}
tracing::info!("--- add node-3, with blocking=false, won't block, because [0,1] is a quorum");
{
router.new_raft_node(2).await;

router.change_membership_with_blocking(0, btreeset! {0,1,2}, false).await?;
log_index += 2;
let m = router.get_metrics(&2).await?;
assert!(m.last_log_index < Some(log_index));
}

tracing::info!("--- make sure replication to node-3 works as expected");
{
router
.wait(&2, timeout())
.await?
.log(Some(log_index), "received all logs, replication works")
.await?;
}

Ok(())
Expand Down

0 comments on commit c6fe29d

Please sign in to comment.