Skip to content

Commit

Permalink
Improve: build snapshot in anohter task
Browse files Browse the repository at this point in the history
Before this commit, snapshot is built in the `sm::Worker`, which blocks
other state-machine writes, such as applying log entries.

This commit parallels applying log entries and building snapshot: A
snapshot is built in another `tokio::task`.

Because building snapshot is a read operation, it does not have to
block the entire state machine. Instead, it only needs a consistent view
of the state machine or holding a lock of the state machine.

- Fix: #596
  • Loading branch information
drmingdrmer committed May 2, 2023
1 parent 5e8a115 commit fa4085b
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 21 deletions.
8 changes: 8 additions & 0 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ pub struct MemStoreStateMachine {
#[derive(PartialEq, Eq)]
#[derive(PartialOrd, Ord)]
pub enum BlockOperation {
/// Block building a snapshot but does not hold a lock on the state machine.
/// This will prevent building snapshot returning but should not block applying entries.
DelayBuildingSnapshot,
BuildSnapshot,
PurgeLog,
}
Expand Down Expand Up @@ -239,6 +242,11 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<MemStore> {
let last_applied_log;
let last_membership;

if let Some(d) = self.get_blocking(&BlockOperation::DelayBuildingSnapshot) {
tracing::info!(?d, "delay snapshot build");
tokio::time::sleep(d).await;
}

{
// Serialize the data of the state machine.
let sm = self.sm.read().await;
Expand Down
26 changes: 18 additions & 8 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1276,15 +1276,25 @@ where
Notify::StateMachine { command_result } => {
tracing::debug!("sm::StateMachine command result: {:?}", command_result);

debug_assert!(
self.command_state.finished_sm_seq < command_result.command_seq,
"sm::StateMachine command result is out of order: expect {} < {}",
self.command_state.finished_sm_seq,
command_result.command_seq
);
self.command_state.finished_sm_seq = command_result.command_seq;
let seq = command_result.command_seq;
let res = command_result.result?;

match res {
// BuildSnapshot is a read operation that does not have to be serialized by
// sm::Worker. Thus it may finish out of order.
sm::Response::BuildSnapshot(_) => {}
_ => {
debug_assert!(
self.command_state.finished_sm_seq < seq,
"sm::StateMachine command result is out of order: expect {} < {}",
self.command_state.finished_sm_seq,
seq
);
}
}
self.command_state.finished_sm_seq = seq;

match command_result.result? {
match res {
sm::Response::BuildSnapshot(meta) => {
tracing::info!(
"sm::StateMachine command done: BuildSnapshot: {}: {}",
Expand Down
25 changes: 18 additions & 7 deletions openraft/src/core/sm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,8 @@ where
CommandPayload::BuildSnapshot => {
tracing::info!("{}: build snapshot", func_name!());

let resp = self.build_snapshot().await?;
let res = CommandResult::new(cmd.seq, Ok(Response::BuildSnapshot(resp)));
let _ = self.resp_tx.send(Notify::sm(res));
// It is a read operation and is spawned, and it responds in another task
self.build_snapshot(cmd.seq, self.resp_tx.clone()).await;
}
CommandPayload::GetSnapshot { tx } => {
tracing::info!("{}: get snapshot", func_name!());
Expand Down Expand Up @@ -219,18 +218,30 @@ where
Ok(resp)
}

/// Build a snapshot from the state machine.
///
/// Building snapshot is a read-only operation, so it can be run in another task in parallel.
/// This parallelization depends on the [`RaftSnapshotBuilder`] implementation returned by
/// [`get_snapshot_builder()`](`RaftStateMachine::get_snapshot_builder()`): The builder must:
/// - hold a consistent view of the state machine that won't be affected by further writes such
/// as applying a log entry,
/// - or it must be able to acquire a lock that prevents any write operations.
#[tracing::instrument(level = "info", skip_all)]
async fn build_snapshot(&mut self) -> Result<SnapshotMeta<C::NodeId, C::Node>, StorageError<C::NodeId>> {
// TODO(3): move it to another task
async fn build_snapshot(&mut self, seq: CommandSeq, resp_tx: mpsc::UnboundedSender<Notify<C>>) {
// TODO: need to be abortable?
// use futures::future::abortable;
// let (fu, abort_handle) = abortable(async move { builder.build_snapshot().await });

tracing::info!("{}", func_name!());

let mut builder = self.state_machine.get_snapshot_builder().await;
let snapshot = builder.build_snapshot().await?;

Ok(snapshot.meta)
let _handle = tokio::spawn(async move {
let res = builder.build_snapshot().await;
let res = res.map(|snap| Response::BuildSnapshot(snap.meta));
let cmd_res = CommandResult::new(seq, res);
let _ = resp_tx.send(Notify::sm(cmd_res));
});
}

#[tracing::instrument(level = "info", skip_all)]
Expand Down
8 changes: 8 additions & 0 deletions openraft/src/storage_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ pub enum Violation<NID: NodeId> {

/// A storage error could be either a defensive check error or an error occurred when doing the
/// actual io operation.
///
/// It indicates a data crash.
/// An application returning this error will shutdown the Openraft node immediately to prevent
/// further damage.
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub enum StorageError<NID>
Expand Down Expand Up @@ -212,6 +216,10 @@ where NID: NodeId
}

/// Error that occurs when operating the store.
///
/// It indicates a data crash.
/// An application returning this error will shutdown the Openraft node immediately to prevent
/// further damage.
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct StorageIOError<NID>
Expand Down
1 change: 1 addition & 0 deletions tests/tests/log_compaction/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ mod fixtures;

mod t10_compaction;
mod t35_building_snapshot_does_not_block_append;
mod t35_building_snapshot_does_not_block_apply;
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use maplit::btreeset;
use openraft::raft::AppendEntriesRequest;
use openraft::testing::blank_ent;
use openraft::testing::log_id;
use openraft::Config;
use openraft::RaftNetwork;
use openraft::RaftNetworkFactory;
use openraft::Vote;
use openraft_memstore::BlockOperation;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// When building a snapshot, applying-entries request should not be blocked.
///
/// Issue: https://github.com/datafuselabs/openraft/issues/596
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn building_snapshot_does_not_block_apply() -> Result<()> {
let config = Arc::new(
Config {
enable_tick: false,
..Default::default()
}
.validate()?,
);

let mut router = RaftRouter::new(config.clone());
let mut log_index = router.new_cluster(btreeset! {0,1}, btreeset! {}).await?;

let follower = router.get_raft_handle(&1)?;

tracing::info!(log_index, "--- set flag to delay snapshot building");
{
let (mut _sto1, sm1) = router.get_storage_handle(&1)?;
sm1.storage_mut()
.await
.set_blocking(BlockOperation::DelayBuildingSnapshot, Duration::from_millis(5_000));
}

tracing::info!(log_index, "--- build snapshot on follower, it should block");
{
log_index += router.client_request_many(0, "0", 10).await?;
router.wait(&1, timeout()).log(Some(log_index), "written 10 logs").await?;

follower.trigger_snapshot().await?;

tracing::info!(log_index, "--- sleep 500 ms to make sure snapshot is started");
tokio::time::sleep(Duration::from_millis(500)).await;

let res = router
.wait(&1, Some(Duration::from_millis(500)))
.snapshot(log_id(1, 0, log_index), "building snapshot is blocked")
.await;
assert!(res.is_err(), "snapshot should be blocked and can not finish");
}

tracing::info!(
log_index,
"--- send append-entries request to the follower that is building snapshot"
);
{
let next = log_index + 1;

let rpc = AppendEntriesRequest::<openraft_memstore::TypeConfig> {
vote: Vote::new_committed(1, 0),
prev_log_id: Some(log_id(1, 0, log_index)),
entries: vec![blank_ent(1, 0, next)],
// Append and commit this entry
leader_commit: Some(log_id(1, 0, next)),
};

let mut cli = router.new_client(1, &()).await;
let fu = cli.send_append_entries(rpc);
let fu = tokio::time::timeout(Duration::from_millis(500), fu);
let resp = fu.await??;
assert!(resp.is_success());

router
.wait(&1, timeout())
.log(
Some(next),
format!("log at index {} can be applied, while snapshot is building", next),
)
.await?;
}

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(1_000))
}
8 changes: 2 additions & 6 deletions tests/tests/snapshot/t20_startup_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use openraft::storage::RaftLogStorage;
use openraft::storage::RaftStateMachine;
use openraft::testing::log_id;
use openraft::Config;
use openraft::SnapshotPolicy;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;
Expand All @@ -15,12 +14,9 @@ use crate::fixtures::RaftRouter;
/// once.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn startup_build_snapshot() -> anyhow::Result<()> {
let snapshot_threshold = 10;

let config = Arc::new(
Config {
enable_heartbeat: false,
snapshot_policy: SnapshotPolicy::LogsSinceLast(snapshot_threshold),
max_in_snapshot_log_to_keep: 0,
..Default::default()
}
Expand All @@ -34,10 +30,10 @@ async fn startup_build_snapshot() -> anyhow::Result<()> {

tracing::info!(log_index, "--- send client requests");
{
router.client_request_many(0, "0", (20 - 1 - log_index) as usize).await?;
log_index = 20 - 1;
log_index += router.client_request_many(0, "0", (20 - 1 - log_index) as usize).await?;

router.wait(&0, timeout()).log(Some(log_index), "node-0 applied all requests").await?;
router.get_raft_handle(&0)?.trigger_snapshot().await?;
router.wait(&0, timeout()).snapshot(log_id(1, 0, log_index), "node-0 snapshot").await?;
}

Expand Down

0 comments on commit fa4085b

Please sign in to comment.