Skip to content

Commit

Permalink
Fix: a single Candidate should be able to vote itself.
Browse files Browse the repository at this point in the history
A Candidate should check if it is the only member in a cluster before
sending vote request.
Otherwise a single node cluster does work.

- Change: `Wait::log_at_least()` use `Option<u64>` instead of u64 for log index.
  • Loading branch information
drmingdrmer committed Jan 19, 2022
1 parent 8b5198c commit 86e2ccd
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 15 deletions.
1 change: 0 additions & 1 deletion memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ pub struct MemStore {

impl MemStore {
/// Create a new `MemStore` instance.
/// TODO(xp): creating a store should not require an id.
pub async fn new() -> Self {
let log = RwLock::new(BTreeMap::new());
let sm = RwLock::new(MemStoreStateMachine::default());
Expand Down
28 changes: 25 additions & 3 deletions openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use crate::raft::Entry;
use crate::raft::EntryPayload;
use crate::raft::RaftMsg;
use crate::raft::RaftRespTx;
use crate::raft::VoteResponse;
use crate::raft_types::LogIdOptionExt;
use crate::replication::ReplicaEvent;
use crate::replication::ReplicationStream;
Expand Down Expand Up @@ -353,6 +354,14 @@ impl<D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> Ra
leader_metrics,
};

{
let curr = self.tx_metrics.borrow();
if m == *curr {
tracing::debug!("metrics not changed: {}", m.summary());
return;
}
}

tracing::debug!("report_metrics: {}", m.summary());
let res = self.tx_metrics.send(m);

Expand Down Expand Up @@ -879,18 +888,17 @@ struct CandidateState<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S:

impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>> CandidateState<'a, D, R, N, S> {
pub(self) fn new(core: &'a mut RaftCore<D, R, N, S>) -> Self {
let id = core.id;
Self {
core,
// vote for itself.
granted: btreeset! {id},
granted: btreeset! {},
}
}

/// Run the candidate loop.
#[tracing::instrument(level="debug", skip(self), fields(id=self.core.id, raft_state="candidate"))]
pub(self) async fn run(mut self) -> Result<(), Fatal> {
// Each iteration of the outer loop represents a new term.

loop {
if !self.core.target_state.is_candidate() {
return Ok(());
Expand All @@ -904,6 +912,20 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
self.core.save_hard_state().await?;
self.core.report_metrics(Update::Update(None));

// vote for itself.
self.handle_vote_response(
VoteResponse {
term: self.core.current_term,
vote_granted: true,
last_log_id: self.core.last_log_id,
},
self.core.id,
)
.await?;
if !self.core.target_state.is_candidate() {
return Ok(());
}

// Send RPCs to all members in parallel.
let mut pending_votes = self.spawn_parallel_vote_requests();

Expand Down
1 change: 0 additions & 1 deletion openraft/src/core/vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ impl<'a, D: AppData, R: AppDataResponse, N: RaftNetwork<D>, S: RaftStorage<D, R>
/// Handle response from a vote request sent to a peer.
#[tracing::instrument(level = "debug", skip(self))]
pub(super) async fn handle_vote_response(&mut self, res: VoteResponse, target: NodeId) -> Result<(), StorageError> {
// TODO(xp): change membership from 123 to 4 may hangs I guess. Because this function will not be called.
// If peer's term is greater than current term, revert to follower state.

if res.term > self.core.current_term {
Expand Down
10 changes: 5 additions & 5 deletions openraft/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,16 +225,16 @@ impl Wait {

/// Wait until applied at least `want_log`(inclusive) logs or timeout.
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn log_at_least(&self, want_log: u64, msg: impl ToString) -> Result<RaftMetrics, WaitError> {
pub async fn log_at_least(&self, want_log: Option<u64>, msg: impl ToString) -> Result<RaftMetrics, WaitError> {
self.metrics(
|x| x.last_log_index >= Some(want_log),
&format!("{} .last_log_index >= {}", msg.to_string(), want_log),
|x| x.last_log_index >= want_log,
&format!("{} .last_log_index >= {:?}", msg.to_string(), want_log),
)
.await?;

self.metrics(
|x| x.last_applied.index() >= Some(want_log),
&format!("{} .last_applied >= {}", msg.to_string(), want_log),
|x| x.last_applied.index() >= want_log,
&format!("{} .last_applied >= {:?}", msg.to_string(), want_log),
)
.await
}
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/metrics_wait_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ async fn test_wait() -> anyhow::Result<()> {
assert!(rst.is_ok());
});
let got = w.log(Some(3), "log").await?;
let got_least2 = w.log_at_least(2, "log").await?;
let got_least3 = w.log_at_least(3, "log").await?;
let got_least4 = w.log_at_least(4, "log").await;
let got_least2 = w.log_at_least(Some(2), "log").await?;
let got_least3 = w.log_at_least(Some(3), "log").await?;
let got_least4 = w.log_at_least(Some(4), "log").await;
h.await?;

assert_eq!(Some(3), got.last_log_index);
Expand Down
2 changes: 2 additions & 0 deletions openraft/tests/membership/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ mod fixtures;
mod t00_learner_restart;
mod t10_add_learner;
mod t15_add_remove_follower;
mod t16_change_membership_cases;
// TODO(xp): rename it
mod t20_change_membership;
mod t25_elect_with_new_config;
mod t30_commit_joint_config;
Expand Down
146 changes: 146 additions & 0 deletions openraft/tests/membership/t16_change_membership_cases.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::Duration;

use maplit::btreeset;
use openraft::Config;
use openraft::NodeId;
use openraft::State;
use tracing_futures::Instrument;

use crate::fixtures::RaftRouter;

#[tokio::test(flavor = "multi_thread", worker_threads = 6)]
async fn change_membership_cases() -> anyhow::Result<()> {
let (_log_guard, ut_span) = init_ut!();

async {
change_from_to(btreeset! {0}, btreeset! {1}).await?;
change_from_to(btreeset! {0}, btreeset! {1,2}).await?;
change_from_to(btreeset! {0}, btreeset! {1,2, 3}).await?;
change_from_to(btreeset! {0, 1}, btreeset! {1, 2}).await?;
change_from_to(btreeset! {0, 1}, btreeset! {1}).await?;
change_from_to(btreeset! {0, 1}, btreeset! {2}).await?;
change_from_to(btreeset! {0, 1}, btreeset! {3}).await?;
change_from_to(btreeset! {0, 1, 2}, btreeset! {4}).await?;
change_from_to(btreeset! {0, 1, 2}, btreeset! {4,5,6}).await?;
change_from_to(btreeset! {0, 1, 2, 3, 4}, btreeset! {0, 1, 2, 3}).await?;

Ok::<(), anyhow::Error>(())
}
.instrument(ut_span)
.await?;

Ok(())
}

#[tracing::instrument(level = "debug")]
async fn change_from_to(old: BTreeSet<NodeId>, new: BTreeSet<NodeId>) -> anyhow::Result<()> {
let mes = format!("from {:?} to {:?}", old, new);

let only_in_old = old.difference(&new);
let only_in_new = new.difference(&old);

let config = Arc::new(Config::default().validate()?);
let router = Arc::new(RaftRouter::new(config.clone()));

let mut log_index = router.new_nodes_from_single(old.clone(), btreeset! {}).await?;

tracing::info!("--- write 100 logs");
{
router.client_request_many(0, "client", 100).await;
log_index += 100;

router.wait_for_log(&old, Some(log_index), timeout(), &format!("write 100 logs, {}", mes)).await?;
}

// let mtx = router.wait(&0, timeout()).await?.log(Some(0), "get metrics").await?;
// let term_0 = mtx.current_term;

tracing::info!("--- change to {:?}", new);
{
for id in only_in_new {
router.new_raft_node(*id).await;
}

router.change_membership(0, new.clone()).await?;
log_index += 2; // two member-change logs

tracing::info!("--- wait for old leader or new leader");
{
for id in new.iter() {
router
.wait(id, Some(Duration::from_millis(5_000)))
.await?
.metrics(
|x| x.current_leader.is_some() && new.contains(&x.current_leader.unwrap()),
format!("node {} in new cluster has leader in new cluster, {}", id, mes),
)
.await?;
}
}

for id in new.iter() {
// new leader may already elected and committed a blank log.
router
.wait(id, timeout())
.await?
.log_at_least(Some(log_index), format!("new cluster, {}", mes))
.await?;
}

for id in only_in_old.clone() {
router
.wait(id, timeout())
.await?
.state(State::Learner, format!("node {} only in old, {}", id, mes))
.await?;
}
}

tracing::info!("--- write another 100 logs");
{
// get new leader

let m = router
.wait(new.iter().next().unwrap(), timeout())
.await?
.metrics(|x| x.current_leader.is_some(), format!("wait for new leader, {}", mes))
.await?;

let leader = m.current_leader.unwrap();

router.client_request_many(leader, "client", 100).await;
log_index += 100;
}

for id in new.iter() {
router
.wait(id, timeout())
.await?
// new leader may commit a blonk log
.log_at_least(Some(log_index), format!("new cluster recv logs 100~200, {}", mes))
.await?;
}

tracing::info!("--- log will not be sync to removed node");
{
for id in only_in_old {
let res = router
.wait(id, timeout())
.await?
.log(
Some(log_index),
format!("node {} in old cluster wont recv new logs, {}", id, mes),
)
.await;
assert!(res.is_err());
}
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(500))
}
4 changes: 2 additions & 2 deletions openraft/tests/membership/t30_step_down.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn step_down() -> Result<()> {
router
.wait(&1, timeout())
.await?
.log_at_least(log_index, "node in old cluster commits at least 1 membership log")
.log_at_least(Some(log_index), "node in old cluster commits at least 1 membership log")
.await?;

tracing::info!("--- new cluster commits 2 membership logs");
Expand All @@ -67,7 +67,7 @@ async fn step_down() -> Result<()> {
.wait(&id, timeout())
.await?
.log_at_least(
log_index,
Some(log_index),
"node in new cluster finally commit at least one blank leader-initialize log",
)
.await?;
Expand Down

0 comments on commit 86e2ccd

Please sign in to comment.