Skip to content

Commit

Permalink
Fix: when responding ForwardToLeader, make leader_id a None if the …
Browse files Browse the repository at this point in the history
…leader is no longer in the cluster
  • Loading branch information
drmingdrmer committed Nov 2, 2022
1 parent 689e21d commit 678af4a
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 8 deletions.
14 changes: 9 additions & 5 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -863,11 +863,15 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
#[tracing::instrument(level = "trace", skip(self, tx))]
pub(crate) fn reject_with_forward_to_leader<T, E>(&self, tx: RaftRespTx<T, E>)
where E: From<ForwardToLeader<C::NodeId, C::Node>> {
let l = self.current_leader();
let err = ForwardToLeader {
leader_id: l,
leader_node: self.get_leader_node(l),
};
let mut leader_id = self.current_leader();
let leader_node = self.get_leader_node(leader_id);

// Leader is no longer a node in the membership config.
if leader_node.is_none() {
leader_id = None;
}

let err = ForwardToLeader { leader_id, leader_node };

let _ = tx.send(Err(err.into()));
}
Expand Down
2 changes: 1 addition & 1 deletion openraft/tests/fixtures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ where
Ok(count as u64)
}

async fn send_client_request(
pub async fn send_client_request(
&self,
target: C::NodeId,
req: C::D,
Expand Down
2 changes: 1 addition & 1 deletion openraft/tests/membership/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod t16_change_membership_cases;
mod t20_change_membership;
mod t25_elect_with_new_config;
mod t30_commit_joint_config;
mod t30_step_down;
mod t30_remove_leader;
mod t40_removed_follower;
mod t45_remove_unreachable_follower;
mod t99_issue_471_adding_learner_uses_uninit_leader_id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ use std::time::Duration;

use anyhow::Result;
use maplit::btreeset;
use memstore::ClientRequest;
use memstore::IntoMemClientRequest;
use openraft::error::ClientWriteError;
use openraft::Config;
use openraft::LeaderId;
use openraft::LogId;
Expand All @@ -16,7 +19,7 @@ use crate::fixtures::RaftRouter;
/// - Then the leader should step down after joint log is committed.
/// - Check logs on other node.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn step_down() -> Result<()> {
async fn remove_leader() -> Result<()> {
// Setup test dependencies.
let config = Arc::new(
Config {
Expand Down Expand Up @@ -105,6 +108,79 @@ async fn step_down() -> Result<()> {
Ok(())
}

/// Change membership from {0,1,2} to {2}. Access {2} at once.
///
/// It should not respond a ForwardToLeader error that pointing to the removed leader.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn remove_leader_access_new_cluster() -> Result<()> {
// Setup test dependencies.
let config = Arc::new(
Config {
enable_heartbeat: false,
enable_elect: false,
..Default::default()
}
.validate()?,
);
let mut router = RaftRouter::new(config.clone());

let mut log_index = router.new_nodes_from_single(btreeset! {0,1,2}, btreeset! {}).await?;

let orig_leader = 0;

tracing::info!("--- change membership 012 to 2");
{
let node = router.get_raft_handle(&orig_leader)?;
node.change_membership(btreeset![2], true, false).await?;
// 2 change_membership logs
log_index += 2;

router
.wait(&2, timeout())
.log(Some(log_index), "new leader node-2 commits 2 membership log")
.await?;
}

tracing::info!("--- old leader commits 2 membership log");
{
router
.wait(&orig_leader, timeout())
.log(Some(log_index), "old leader commits 2 membership log")
.await?;
}

let res = router.send_client_request(2, ClientRequest::make_request("foo", 1)).await;
match res {
Ok(_) => {
unreachable!("expect error");
}
Err(cli_err) => match cli_err {
ClientWriteError::ForwardToLeader(fwd) => {
assert!(fwd.leader_id.is_none());
assert!(fwd.leader_node.is_none());
}
_ => {
unreachable!("expect ForwardToLeader");
}
},
}

tracing::info!("--- elect node-2, handle write");
{
let n2 = router.get_raft_handle(&2)?;
n2.enable_elect(true);
n2.wait(timeout()).state(ServerState::Leader, "node-2 elect itself").await?;
log_index += 1;

router.send_client_request(2, ClientRequest::make_request("foo", 1)).await?;
log_index += 1;

n2.wait(timeout()).log(Some(log_index), "node-2 become leader and handle write request").await?;
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(3_000))
}

0 comments on commit 678af4a

Please sign in to comment.