Skip to content

Commit

Permalink
Fix: when leader reverts to follower, send error to waiting clients
Browse files Browse the repository at this point in the history
When a leader reverts to follower, e.g., if a higher vote is seen,
it should inform waiting clients that leadership is lost.

- Let Engine deals with vote change event.

- Add test for reverting to follower behavior.
  • Loading branch information
drmingdrmer committed Aug 6, 2022
1 parent 088cc1f commit 43dd8b6
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 42 deletions.
47 changes: 22 additions & 25 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
const IS_VOTER: bool = true;
const IS_LEARNER: bool = false;

// TODO: these part can be removed. the loop does not depend on server state
self.engine.state.server_state = match (has_log, single, is_voter) {
// A restarted raft that already received some logs but was not yet added to a cluster.
// It should remain in Learner state, not Follower.
Expand All @@ -288,12 +289,8 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
(NO_LOG, MULTI, IS_VOTER) => ServerState::Follower, // impossible: no logs but there are other members.
};

if self.engine.state.server_state == ServerState::Follower
|| self.engine.state.server_state == ServerState::Candidate
{
// To ensure that restarted nodes don't disrupt a stable cluster.
self.set_next_election_time(false);
}
// To ensure that restarted nodes don't disrupt a stable cluster.
self.set_next_election_time(false);

tracing::debug!("id={} target_state: {:?}", self.id, self.engine.state.server_state);

Expand Down Expand Up @@ -1380,9 +1377,15 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
}
}

RaftMsg::RevertToFollower { target, new_vote, vote } => {
if self.does_vote_match(vote, "RevertToFollower") {
self.handle_revert_to_follower(target, new_vote).await?;
RaftMsg::HigherVote {
target: _,
higher,
vote,
} => {
if self.does_vote_match(vote, "HigherVote") {
// Rejected vote change is ok.
let _ = self.engine.handle_vote_change(&higher);
self.run_engine_commands::<Entry<C>>(&[]).await?;
}
}

Expand Down Expand Up @@ -1418,22 +1421,6 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftCore<C,
Ok(())
}

/// Handle events from replication streams for when this node needs to revert to follower state.
#[tracing::instrument(level = "trace", skip(self))]
async fn handle_revert_to_follower(
&mut self,
_target: C::NodeId,
vote: Vote<C::NodeId>,
) -> Result<(), StorageError<C::NodeId>> {
if vote > self.engine.state.vote {
self.engine.state.vote = vote;
self.save_vote().await?;
// TODO: when switching to Follower, the next election time has to be set.
self.set_target_state(ServerState::Follower);
}
Ok(())
}

#[tracing::instrument(level = "debug", skip_all)]
async fn handle_update_matched(
&mut self,
Expand Down Expand Up @@ -1591,6 +1578,16 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> RaftRuntime
debug_assert!(self.leader_data.is_none(), "can not become leader twice");
self.leader_data = Some(LeaderData::new());
} else {
if let Some(l) = &mut self.leader_data {
// Leadership lost, inform waiting clients
let chans = std::mem::take(&mut l.client_resp_channels);
for (_, tx) in chans.into_iter() {
let _ = tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader {
leader_id: None,
leader_node: None,
})));
}
}
self.leader_data = None;
}
}
Expand Down
16 changes: 9 additions & 7 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,16 +752,10 @@ where

// --- Draft API ---

// // --- app API ---
//
// /// Write a new log entry.
// pub(crate) fn write(&mut self) -> Result<Vec<AlgoCmd<NID>>, ForwardToLeader<NID>> {}
//
// // --- raft protocol API ---
//
// pub(crate) fn handle_install_snapshot() {}
//
// pub(crate) fn handle_append_entries_resp() {}
// pub(crate) fn handle_install_snapshot_resp() {}
}

Expand All @@ -788,6 +782,10 @@ where
/// Leader state has two phase: election phase and replication phase, similar to paxos phase-1 and phase-2
pub(crate) fn enter_leading(&mut self) {
debug_assert_eq!(self.state.vote.node_id, self.id);
// debug_assert!(
// self.state.internal_server_state.is_following(),
// "can not enter leading twice"
// );

self.state.new_leader();
}
Expand All @@ -803,6 +801,11 @@ where
// This way it holds that 'vote.node_id != self.id <=> following state`.
// debug_assert_ne!(self.state.vote.node_id, self.id);

// debug_assert!(
// self.state.internal_server_state.is_leading(),
// "can not enter following twice"
// );

let vote = &self.state.vote;

if vote.committed {
Expand Down Expand Up @@ -1108,7 +1111,6 @@ where

/// The node is candidate or leader
fn is_becoming_leader(&self) -> bool {
// self.state.vote.node_id == self.id
self.state.internal_server_state.is_leading()
}

Expand Down
18 changes: 10 additions & 8 deletions openraft/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
/// Invoke RaftCore by sending a RaftMsg and blocks waiting for response.
#[tracing::instrument(level = "debug", skip(self, mes, rx))]
pub(crate) async fn call_core<T, E>(&self, mes: RaftMsg<C, N, S>, rx: RaftRespRx<T, E>) -> Result<T, E>
where E: From<Fatal<C::NodeId>> {
where E: From<Fatal<C::NodeId>> + Debug {
let sum = if tracing::enabled!(Level::DEBUG) {
None
} else {
Expand All @@ -606,11 +606,13 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
}

let recv_res = rx.await;
tracing::debug!("call_core receives result is error: {:?}", recv_res.is_err());

match recv_res {
Ok(x) => x,
Err(_) => {
let fatal = self.get_core_stopped_error("receiving rx from RaftCore", sum).await;
tracing::error!(error = debug(&fatal), "core_call fatal error");
Err(fatal.into())
}
}
Expand Down Expand Up @@ -645,6 +647,7 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Raft<C, N,
}

/// Wait for RaftCore task to finish and record the returned value from the task.
#[tracing::instrument(level = "debug", skip_all)]
async fn join_core_task(&self) {
let mut state = self.inner.core_state.lock().await;
match &mut *state {
Expand Down Expand Up @@ -861,15 +864,14 @@ pub(crate) enum RaftMsg<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStor
membership_log_id: Option<LogId<C::NodeId>>,
},

/// An event indicating that the Raft node needs to revert to follower state.
/// ReplicationCore has seen a higher `vote`.
/// Sent by a replication task `ReplicationCore`.
// TODO: rename it
RevertToFollower {
HigherVote {
/// The ID of the target node from which the new term was observed.
target: C::NodeId,

/// The new vote observed.
new_vote: Vote<C::NodeId>,
/// The higher vote observed.
higher: Vote<C::NodeId>,

/// Which ServerState sent this message
vote: Vote<C::NodeId>,
Expand Down Expand Up @@ -960,9 +962,9 @@ where
membership_log_id.summary()
)
}
RaftMsg::RevertToFollower {
RaftMsg::HigherVote {
ref target,
ref new_vote,
higher: ref new_vote,
ref vote,
} => {
format!(
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ impl<C: RaftTypeConfig, N: RaftNetworkFactory<C>, S: RaftStorage<C>> Replication
return;
}
ReplicationError::HigherVote(h) => {
let _ = self.raft_core_tx.send(RaftMsg::RevertToFollower {
let _ = self.raft_core_tx.send(RaftMsg::HigherVote {
target: self.target,
new_vote: h.higher,
higher: h.higher,
vote: self.vote,
});
return;
Expand Down
1 change: 1 addition & 0 deletions openraft/tests/append_entries/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod fixtures;
// The later tests may depend on the earlier ones.

mod t10_conflict_with_empty_entries;
mod t10_see_higher_vote;
mod t20_append_conflicts;
mod t30_append_inconsistent_log;
mod t40_append_updates_membership;
Expand Down
78 changes: 78 additions & 0 deletions openraft/tests/append_entries/t10_see_higher_vote.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use maplit::btreeset;
use memstore::ClientRequest;
use openraft::raft::VoteRequest;
use openraft::BasicNode;
use openraft::Config;
use openraft::LeaderId;
use openraft::LogId;
use openraft::RaftNetwork;
use openraft::RaftNetworkFactory;
use openraft::ServerState;
use openraft::Vote;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// A leader reverts to follower if a higher vote is seen when append-entries.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn append_sees_higher_vote() -> Result<()> {
let config = Arc::new(
Config {
enable_heartbeat: false,
enable_elect: false,
..Default::default()
}
.validate()?,
);

let mut router = RaftRouter::new(config.clone());

let _log_index = router.new_nodes_from_single(btreeset! {0,1}, btreeset! {}).await?;

tracing::info!("--- upgrade vote on node-1");
{
router
.connect(1, &BasicNode::default())
.await?
.send_vote(VoteRequest {
vote: Vote::new(10, 1),
last_log_id: Some(LogId::new(LeaderId::new(10, 1), 5)),
})
.await?;
}

tracing::info!("--- a write operation will see a higher vote, then the leader revert to follower");
{
router.wait(&0, timeout()).state(ServerState::Leader, "node-0 is leader").await?;

let n0 = router.get_raft_handle(&0)?;
let res = n0
.client_write(ClientRequest {
client: "0".to_string(),
serial: 1,
status: "2".to_string(),
})
.await;

tracing::debug!("--- client_write res: {:?}", res);

router
.wait(&0, timeout())
.state(ServerState::Follower, "node-0 becomes follower due to a higher vote")
.await?;

router.external_request(0, |st, _, _| {
assert_eq!(Vote::new(10, 1), st.vote, "higher vote is stored");
});
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(1_000))
}

0 comments on commit 43dd8b6

Please sign in to comment.