Skip to content

Commit

Permalink
Fix: restore replication progress when a leader starts up (#884)
Browse files Browse the repository at this point in the history
As a leader, the replication progress to itself should be restored upon
startup.

And if this leader is the only node in a cluster, it should re-apply all
of the logs to state machine at once.

- Fix: #883
  • Loading branch information
drmingdrmer committed Nov 28, 2023
1 parent 4601faf commit 63e69b9
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 9 deletions.
6 changes: 6 additions & 0 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ impl MemStore {
self.sm.write().await.clone()
}

/// Clear the state machine for testing purposes.
pub async fn clear_state_machine(&self) {
let mut sm = self.sm.write().await;
*sm = MemStoreStateMachine::default();
}

/// Block an operation for testing purposes.
pub fn set_blocking(&self, block: BlockOperation, d: Duration) {
self.block.lock().unwrap().insert(block, d);
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ where C: RaftTypeConfig

let mut rh = self.replication_handler();
rh.rebuild_replication_streams();

// Restore the progress about the local log
rh.update_local_progress(rh.state.last_log_id().copied());

rh.initiate_replication(SendNone::False);

return;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/handler/log_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::RaftTypeConfig;
#[cfg(test)] mod calc_purge_upto_test;
#[cfg(test)] mod purge_log_test;

/// Handle raft vote related operations
/// Handle raft-log related operations
pub(crate) struct LogHandler<'x, C>
where C: RaftTypeConfig
{
Expand Down
15 changes: 15 additions & 0 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,14 @@ where C: RaftTypeConfig
/// accepted.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_matching(&mut self, node_id: C::NodeId, inflight_id: u64, log_id: Option<LogId<C::NodeId>>) {
tracing::debug!(
node_id = display(node_id),
inflight_id = display(inflight_id),
log_id = display(log_id.display()),
"{}",
func_name!()
);

debug_assert!(log_id.is_some(), "a valid update can never set matching to None");

// The value granted by a quorum may not yet be a committed.
Expand Down Expand Up @@ -443,6 +451,8 @@ where C: RaftTypeConfig
/// Writing to local log store does not have to wait for a replication response from remote
/// node. Thus it can just be done in a fast-path.
pub(crate) fn update_local_progress(&mut self, upto: Option<LogId<C::NodeId>>) {
tracing::debug!(upto = display(upto.display()), "{}", func_name!());

if upto.is_none() {
return;
}
Expand All @@ -451,6 +461,11 @@ where C: RaftTypeConfig

// The leader may not be in membership anymore
if let Some(prog_entry) = self.leader.progress.get_mut(&id) {
tracing::debug!(
self_matching = display(prog_entry.matching.display()),
"update progress"
);

if prog_entry.matching >= upto {
return;
}
Expand Down
9 changes: 1 addition & 8 deletions openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::engine::ValueSender;
use crate::error::RejectVoteRequest;
use crate::internal_server_state::InternalServerState;
use crate::leader::Leading;
use crate::progress::Progress;
use crate::raft::ResultSender;
use crate::raft_state::LogStateReader;
use crate::utime::UTime;
Expand Down Expand Up @@ -143,19 +142,13 @@ where C: RaftTypeConfig
// Re-create a new Leader instance.

let em = &self.state.membership_state.effective();
let mut leader = Leading::new(
let leader = Leading::new(
*self.state.vote_ref(),
em.membership().to_quorum_set(),
em.learner_ids(),
self.state.last_log_id().copied(),
);

// TODO: the progress should be initialized when the leader is elected.
// TODO: we do not need to update the progress until the first blank log is appended.
// We can just ignore the result here:
// The `committed` will not be updated until a log of current term is granted by a quorum
let _ = leader.progress.update_with(&self.config.id, |v| v.matching = self.state.last_log_id().copied());

// Do not update clock_progress, until the first blank log is committed.

*self.internal_server_state = InternalServerState::Leading(leader);
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/internal_server_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub(crate) type LeaderQuorumSet<NID> = Joint<NID, Vec<NID>, Vec<Vec<NID>>>;
#[derive(PartialEq, Eq)]
#[allow(clippy::large_enum_variant)]
// TODO(9): consider moving Leader to a Box
// TODO(9): Make InternalServerState an Option, separate Leading(Proposer) role and
// Following(Acceptor) role
pub(crate) enum InternalServerState<NID>
where NID: NodeId
{
Expand Down
30 changes: 30 additions & 0 deletions scripts/mprocs-check.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env mprocs --config


# run local check in parallel with mprocs
#
# Usage:
# mprocs --config ./scripts/check.yaml
#
# Install:
# cargo install mprocs
#
# See: https://github.com/pvolok/mprocs


procs:
test-lib:
cmd: ["cargo", "test", "--lib"]
it:
cmd: ["cargo", "test", "--test", "*"]
clippy:
cmd: ["cargo", "clippy", "--no-deps", "--all-targets", "--", "-D", "warnings"]

# # keeps examples:
# xx:
# shell: "nodemon server.js"
# webpack: "webpack serve"
# tests:
# shell: "jest -w"
# env:
# NODE_ENV: test
1 change: 1 addition & 0 deletions tests/tests/life_cycle/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ mod t10_initialization;
mod t11_shutdown;
mod t50_follower_restart_does_not_interrupt;
mod t50_single_follower_restart;
mod t50_single_leader_restart_re_apply_logs;
mod t90_issue_607_single_restart;
60 changes: 60 additions & 0 deletions tests/tests/life_cycle/t50_single_leader_restart_re_apply_logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::sync::Arc;
use std::time::Duration;

use maplit::btreeset;
use openraft::Config;
use openraft::ServerState;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::MemLogStore;
use crate::fixtures::MemRaft;
use crate::fixtures::MemStateMachine;
use crate::fixtures::RaftRouter;

/// A single leader should re-apply all logs upon startup,
/// because itself is a quorum.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn single_leader_restart_re_apply_logs() -> anyhow::Result<()> {
let config = Arc::new(
Config {
enable_heartbeat: false,
..Default::default()
}
.validate()?,
);

let mut router = RaftRouter::new(config.clone());

tracing::info!("--- bring up cluster of 1 node");
let mut log_index = router.new_cluster(btreeset! {0}, btreeset! {}).await?;

tracing::info!(log_index, "--- write to 1 log");
{
log_index += router.client_request_many(0, "foo", 1).await?;
}

tracing::info!(log_index, "--- stop and restart node-0");
{
let (node, ls, sm): (MemRaft, MemLogStore, MemStateMachine) = router.remove_node(0).unwrap();
node.shutdown().await?;

// Clear state machine, logs should be re-applied upon restart, because it is a leader.
ls.storage().await.clear_state_machine().await;

tracing::info!(log_index, "--- restart node-0");

router.new_raft_node_with_sto(0, ls, sm).await;
router.wait(&0, timeout()).state(ServerState::Leader, "become leader upon restart").await?;
}

tracing::info!(log_index, "--- a single leader should re-apply all logs");
{
router.wait(&0, timeout()).log(Some(log_index), "node-0 works").await?;
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(1_000))
}

0 comments on commit 63e69b9

Please sign in to comment.