Skip to content

Commit

Permalink
Merge pull request #853 from drmingdrmer/45-leader-lease
Browse files Browse the repository at this point in the history
Refactor: extract voting management into a separate `Voting` struct
  • Loading branch information
drmingdrmer authored May 24, 2023
2 parents e88df90 + bec5ad3 commit 4ab38e7
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 71 deletions.
55 changes: 43 additions & 12 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,15 @@ where C: RaftTypeConfig
// Safe unwrap(): it won't reject itself ˙–˙
self.vote_handler().update_vote(&v).unwrap();

// Safe unwrap()
let leader = self.internal_server_state.leading_mut().unwrap();
let quorum_granted = leader.grant_vote_by(self.config.id);
// TODO: simplify voting initialization.
// - update_vote() should be moved to after initialize_voting(), because it can be considered
// as a local RPC

// Safe unwrap(): leading state is just created
let leading = self.internal_server_state.leading_mut().unwrap();
let voting = leading.initialize_voting(self.state.last_log_id().copied());

let quorum_granted = voting.grant_by(&self.config.id);

// Fast-path: if there is only one voter in the cluster.

Expand Down Expand Up @@ -316,18 +322,21 @@ where C: RaftTypeConfig
func_name!()
);

// If this node is no longer a leader(i.e., electing), just ignore the delayed vote_resp.
let leader = match &mut self.internal_server_state {
InternalServerState::Leading(l) => l,
InternalServerState::Following => return,
let voting = if let Some(voting) = self.internal_server_state.voting_mut() {
// TODO check the sending vote matches current vote
voting
} else {
// If this node is no longer a leader(i.e., electing) or candidate,
// just ignore the delayed vote_resp.
return;
};

if &resp.vote < self.state.vote_ref() {
debug_assert!(!resp.vote_granted);
}

if resp.vote_granted {
let quorum_granted = leader.grant_vote_by(target);
let quorum_granted = voting.grant_by(&target);
if quorum_granted {
tracing::info!("a quorum granted my vote");
self.establish_leader();
Expand Down Expand Up @@ -549,16 +558,38 @@ where C: RaftTypeConfig
fn establish_leader(&mut self) {
tracing::info!("{}", func_name!());

self.vote_handler().commit_vote();
// Mark the vote as committed, i.e., being granted and saved by a quorum.
//
// The committed vote, is not necessary in original raft.
// Openraft insists doing this because:
// - Voting is not in the hot path, thus no performance penalty.
// - Leadership won't be lost if a leader restarted quick enough.
{
let leading = self.internal_server_state.leading_mut().unwrap();
let voting = leading.finish_voting();
let mut vote = *voting.vote_ref();

debug_assert!(!vote.is_committed());
debug_assert_eq!(
vote.leader_id().voted_for(),
Some(self.config.id),
"it can only commit its own vote"
);
vote.commit();

let _res = self.vote_handler().update_vote(&vote);
debug_assert!(_res.is_ok(), "commit vote can not fail but: {:?}", _res);
}

let mut rh = self.replication_handler();

// It has to setup replication stream first because append_blank_log() may update the
// committed-log-id(a single leader with several learners), in which case the
// committed-log-id will be at once submitted to replicate before replication stream
// is built. TODO: But replication streams should be built when a node enters
// leading state. Thus append_blank_log() can be moved before
// rebuild_replication_streams()
// is built.
//
// TODO: But replication streams should be built when a node enters leading state.
// Thus append_blank_log() can be moved before rebuild_replication_streams()

rh.rebuild_replication_streams();
rh.append_blank_log();
Expand Down
24 changes: 2 additions & 22 deletions openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,27 +74,6 @@ where C: RaftTypeConfig
Ok(tx)
}

/// Mark the vote as committed, i.e., being granted and saved by a quorum.
///
/// The committed vote, is not necessary in original raft.
/// Openraft insists doing this because:
/// - Voting is not in the hot path, thus no performance penalty.
/// - Leadership won't be lost if a leader restarted quick enough.
pub(crate) fn commit_vote(&mut self) {
debug_assert!(!self.state.vote_ref().is_committed());
debug_assert_eq!(
self.state.vote_ref().leader_id().voted_for(),
Some(self.config.id),
"it can only commit its own vote"
);

let mut v = *self.state.vote_ref();
v.commit();

let _res = self.update_vote(&v);
debug_assert!(_res.is_ok(), "commit vote can not fail but: {:?}", _res);
}

/// Check and update the local vote and related state for every message received.
///
/// This is used by all incoming event, such as the three RPC append-entries, vote,
Expand Down Expand Up @@ -165,13 +144,14 @@ where C: RaftTypeConfig

let em = &self.state.membership_state.effective();
let mut leader = Leading::new(
Instant::now(),
*self.state.vote_ref(),
em.membership().to_quorum_set(),
em.learner_ids(),
self.state.last_log_id().copied(),
);

// TODO: the progress should be initialized when the leader is elected.
// TODO: we do not need to update the progress until the first blank log is appended.
// We can just ignore the result here:
// The `committed` will not be updated until a log of current term is granted by a quorum
let _ = leader.progress.update_with(&self.config.id, |v| v.matching = self.state.last_log_id().copied());
Expand Down
16 changes: 8 additions & 8 deletions openraft/src/engine/tests/elect_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ fn test_elect() -> anyhow::Result<()> {
eng.elect();

assert_eq!(Vote::new_committed(1, 1), *eng.state.vote_ref());
assert_eq!(
Some(btreeset! {1},),
eng.internal_server_state.leading().map(|x| x.voting().granters().collect::<BTreeSet<_>>())
assert!(
eng.internal_server_state.voting_mut().is_none(),
"voting state is removed when becoming leader"
);

assert_eq!(ServerState::Leader, eng.state.server_state);
Expand Down Expand Up @@ -98,14 +98,14 @@ fn test_elect() -> anyhow::Result<()> {
// Build in-progress election state
eng.state.vote = UTime::new(Instant::now(), Vote::new_committed(1, 2));
eng.vote_handler().become_leading();
eng.internal_server_state.leading_mut().map(|l| l.voting_mut().grant_by(&1));
eng.internal_server_state.voting_mut().map(|l| l.grant_by(&1));

eng.elect();

assert_eq!(Vote::new_committed(2, 1), *eng.state.vote_ref());
assert_eq!(
Some(btreeset! {1},),
eng.internal_server_state.leading().map(|x| x.voting().granters().collect::<BTreeSet<_>>())
assert!(
eng.internal_server_state.voting_mut().is_none(),
"voting state is removed when becoming leader"
);

assert_eq!(ServerState::Leader, eng.state.server_state);
Expand Down Expand Up @@ -154,7 +154,7 @@ fn test_elect() -> anyhow::Result<()> {
assert_eq!(Vote::new(1, 1), *eng.state.vote_ref());
assert_eq!(
Some(btreeset! {1},),
eng.internal_server_state.leading().map(|x| x.voting().granters().collect::<BTreeSet<_>>())
eng.internal_server_state.leading().map(|x| x.voting().unwrap().granters().collect::<BTreeSet<_>>())
);

assert_eq!(ServerState::Candidate, eng.state.server_state);
Expand Down
58 changes: 46 additions & 12 deletions openraft/src/engine/tests/handle_vote_resp_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::entry::RaftEntry;
use crate::progress::entry::ProgressEntry;
use crate::progress::Inflight;
use crate::raft::VoteResponse;
use crate::raft_state::LogStateReader;
use crate::testing::log_id;
use crate::testing::log_id1;
use crate::utime::UTime;
Expand Down Expand Up @@ -65,7 +66,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
assert_eq!(0, eng.output.take_commands().len());
}

tracing::info!("--- recv a smaller vote. vote_granted==false always; keep trying in candidate state");
tracing::info!("--- recv a smaller vote. vote_granted==false; always keep trying in candidate state");
{
let mut eng = eng();
eng.config.id = 1;
Expand All @@ -74,7 +75,13 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
.membership_state
.set_effective(Arc::new(EffectiveMembership::new(Some(log_id1(1, 1)), m12())));
eng.vote_handler().become_leading();
eng.internal_server_state.leading_mut().map(|l| l.voting_mut().grant_by(&1));

let last_log_id = eng.state.last_log_id().copied();

eng.internal_server_state.leading_mut().map(|l| {
l.initialize_voting(last_log_id);
l.voting_mut().unwrap().grant_by(&1)
});
eng.state.server_state = ServerState::Candidate;

eng.handle_vote_resp(2, VoteResponse {
Expand All @@ -86,7 +93,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
assert_eq!(Vote::new(2, 1), *eng.state.vote_ref());
assert_eq!(
Some(btreeset! {1},),
eng.internal_server_state.leading().map(|x| x.voting().granters().collect::<BTreeSet<_>>())
eng.internal_server_state.leading().map(|x| x.voting().unwrap().granters().collect::<BTreeSet<_>>())
);

assert_eq!(ServerState::Candidate, eng.state.server_state);
Expand All @@ -105,7 +112,13 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
.membership_state
.set_effective(Arc::new(EffectiveMembership::new(Some(log_id1(1, 1)), m12())));
eng.vote_handler().become_leading();
eng.internal_server_state.leading_mut().map(|l| l.voting_mut().grant_by(&1));

let last_log_id = eng.state.last_log_id().copied();

eng.internal_server_state.leading_mut().map(|l| {
l.initialize_voting(last_log_id);
l.voting_mut().unwrap().grant_by(&1)
});
eng.state.server_state = ServerState::Candidate;

eng.handle_vote_resp(2, VoteResponse {
Expand Down Expand Up @@ -134,7 +147,14 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
.membership_state
.set_effective(Arc::new(EffectiveMembership::new(Some(log_id1(1, 1)), m12())));
eng.vote_handler().become_leading();
eng.internal_server_state.leading_mut().map(|l| l.voting_mut().grant_by(&1));

let last_log_id = eng.state.last_log_id().copied();

eng.internal_server_state.leading_mut().map(|l| {
l.initialize_voting(last_log_id);
l.voting_mut().unwrap().grant_by(&1)
});

eng.state.server_state = ServerState::Candidate;

eng.handle_vote_resp(2, VoteResponse {
Expand All @@ -146,7 +166,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
assert_eq!(Vote::new(2, 1), *eng.state.vote_ref());
assert_eq!(
Some(btreeset! {1},),
eng.internal_server_state.leading().map(|x| x.voting().granters().collect::<BTreeSet<_>>())
eng.internal_server_state.leading().map(|x| x.voting().unwrap().granters().collect::<BTreeSet<_>>())
);

assert_eq!(ServerState::Candidate, eng.state.server_state);
Expand All @@ -163,7 +183,14 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
.membership_state
.set_effective(Arc::new(EffectiveMembership::new(Some(log_id1(1, 1)), m1234())));
eng.vote_handler().become_leading();
eng.internal_server_state.leading_mut().map(|l| l.voting_mut().grant_by(&1));

let last_log_id = eng.state.last_log_id().copied();

eng.internal_server_state.leading_mut().map(|l| {
l.initialize_voting(last_log_id);
l.voting_mut().unwrap().grant_by(&1)
});

eng.state.server_state = ServerState::Candidate;

eng.handle_vote_resp(2, VoteResponse {
Expand All @@ -175,7 +202,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
assert_eq!(Vote::new(2, 1), *eng.state.vote_ref());
assert_eq!(
Some(btreeset! {1,2},),
eng.internal_server_state.leading().map(|x| x.voting().granters().collect::<BTreeSet<_>>())
eng.internal_server_state.leading().map(|x| x.voting().unwrap().granters().collect::<BTreeSet<_>>())
);

assert_eq!(ServerState::Candidate, eng.state.server_state);
Expand All @@ -192,7 +219,14 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
.membership_state
.set_effective(Arc::new(EffectiveMembership::new(Some(log_id1(1, 1)), m12())));
eng.vote_handler().become_leading();
eng.internal_server_state.leading_mut().map(|l| l.voting_mut().grant_by(&1));

let last_log_id = eng.state.last_log_id().copied();

eng.internal_server_state.leading_mut().map(|l| {
l.initialize_voting(last_log_id);
l.voting_mut().unwrap().grant_by(&1)
});

eng.state.server_state = ServerState::Candidate;

eng.handle_vote_resp(2, VoteResponse {
Expand All @@ -202,9 +236,9 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
});

assert_eq!(Vote::new_committed(2, 1), *eng.state.vote_ref());
assert_eq!(
Some(btreeset! {1,2},),
eng.internal_server_state.leading().map(|x| x.voting().granters().collect::<BTreeSet<_>>())
assert!(
eng.internal_server_state.voting_mut().is_none(),
"voting state is removed when becoming leader"
);

assert_eq!(ServerState::Leader, eng.state.server_state);
Expand Down
8 changes: 8 additions & 0 deletions openraft/src/internal_server_state.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::leader::voting::Voting;
use crate::leader::Leading;
use crate::quorum::Joint;
use crate::NodeId;
Expand Down Expand Up @@ -46,6 +47,13 @@ where NID: NodeId
impl<NID> InternalServerState<NID>
where NID: NodeId
{
pub(crate) fn voting_mut(&mut self) -> Option<&mut Voting<NID, LeaderQuorumSet<NID>>> {
match self {
InternalServerState::Leading(l) => l.voting_mut(),
InternalServerState::Following => None,
}
}

pub(crate) fn leading(&self) -> Option<&Leading<NID, LeaderQuorumSet<NID>>> {
match self {
InternalServerState::Leading(l) => Some(l),
Expand Down
Loading

0 comments on commit 4ab38e7

Please sign in to comment.