Skip to content

Commit

Permalink
Feature: add SnapshotPolicy::Never
Browse files Browse the repository at this point in the history
With `SnapshotPolicy::Never`, Openraft will not build snapshots
automatically based on a policy. Instead, the application has full
control over when snapshots are built. In this scenario, the application
can call the `Raft::trigger_snapshot()` API at the desired times to
manually trigger Openraft to build a snapshot.

Rename integration tests:
- `log_compaction -> snapshot_building`
- `snapshto -> snapshot_streaming`

-  Fix: databendlabs#851
  • Loading branch information
drmingdrmer committed May 24, 2023
1 parent 4ab38e7 commit 269d221
Show file tree
Hide file tree
Showing 23 changed files with 125 additions and 8 deletions.
6 changes: 3 additions & 3 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ pub struct ClientRequest {

/// Helper trait to build `ClientRequest` for `MemStore` in generic test code.
pub trait IntoMemClientRequest<T> {
fn make_request(client_id: &str, serial: u64) -> T;
fn make_request(client_id: impl ToString, serial: u64) -> T;
}

impl IntoMemClientRequest<ClientRequest> for ClientRequest {
fn make_request(client_id: &str, serial: u64) -> Self {
fn make_request(client_id: impl ToString, serial: u64) -> Self {
Self {
client: client_id.into(),
client: client_id.to_string(),
serial,
status: format!("request-{}", serial),
}
Expand Down
21 changes: 18 additions & 3 deletions openraft/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::ops::Deref;
use std::sync::atomic::AtomicBool;
use std::time::Duration;

use anyerror::AnyError;
use clap::Parser;
use rand::thread_rng;
use rand::Rng;
Expand All @@ -26,6 +27,12 @@ pub enum SnapshotPolicy {
/// A snapshot will be generated once the log has grown the specified number of logs since
/// the last snapshot.
LogsSinceLast(u64),

/// Openraft will never trigger a snapshot building.
/// With this option, the application calls
/// [`Raft::trigger_snapshot()`](`crate::Raft::trigger_snapshot`) to manually trigger a
/// snapshot.
Never,
}

impl SnapshotPolicy {
Expand All @@ -35,6 +42,7 @@ impl SnapshotPolicy {
SnapshotPolicy::LogsSinceLast(threshold) => {
state.committed().next_index() >= state.snapshot_last_log_id().next_index() + threshold
}
SnapshotPolicy::Never => false,
}
}
}
Expand All @@ -50,17 +58,21 @@ fn parse_bytes_with_unit(src: &str) -> Result<u64, ConfigError> {
}

fn parse_snapshot_policy(src: &str) -> Result<SnapshotPolicy, ConfigError> {
if src == "never" {
return Ok(SnapshotPolicy::Never);
}

let elts = src.split(':').collect::<Vec<_>>();
if elts.len() != 2 {
return Err(ConfigError::InvalidSnapshotPolicy {
syntax: "since_last:<num>".to_string(),
syntax: "never|since_last:<num>".to_string(),
invalid: src.to_string(),
});
}

if elts[0] != "since_last" {
return Err(ConfigError::InvalidSnapshotPolicy {
syntax: "since_last:<num>".to_string(),
syntax: "never|since_last:<num>".to_string(),
invalid: src.to_string(),
});
}
Expand Down Expand Up @@ -258,7 +270,10 @@ impl Config {
///
/// The first element in `args` must be the application name.
pub fn build(args: &[&str]) -> Result<Config, ConfigError> {
let config = <Self as Parser>::parse_from(args);
let config = <Self as Parser>::try_parse_from(args).map_err(|e| ConfigError::ParseError {
source: AnyError::from(&e),
args: args.iter().map(|x| x.to_string()).collect(),
})?;
config.validate()
}

Expand Down
14 changes: 14 additions & 0 deletions openraft/src/config/config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,20 @@ fn test_build() -> anyhow::Result<()> {
Ok(())
}

#[test]
fn test_config_snapshot_policy() -> anyhow::Result<()> {
let config = Config::build(&["foo", "--snapshot-policy=never"])?;
assert_eq!(SnapshotPolicy::Never, config.snapshot_policy);

let config = Config::build(&["foo", "--snapshot-policy=since_last:3"])?;
assert_eq!(SnapshotPolicy::LogsSinceLast(3), config.snapshot_policy);

let res = Config::build(&["foo", "--snapshot-policy=bar:3"]);
assert!(res.is_err());

Ok(())
}

#[test]
fn test_config_enable_tick() -> anyhow::Result<()> {
let config = Config::build(&["foo", "--enable-tick=false"])?;
Expand Down
5 changes: 5 additions & 0 deletions openraft/src/config/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use anyerror::AnyError;

/// Error variants related to configuration.
#[derive(Debug, thiserror::Error)]
#[derive(PartialEq, Eq)]
pub enum ConfigError {
#[error("ParseError: {source} while parsing ({args:?})")]
ParseError { source: AnyError, args: Vec<String> },

/// The min election timeout is not smaller than the max election timeout.
#[error("election timeout: min({min}) must be < max({max})")]
ElectionTimeout { min: u64, max: u64 },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#[path = "../fixtures/mod.rs"]
mod fixtures;

mod t10_compaction;
mod t10_build_snapshot;
mod t35_building_snapshot_does_not_block_append;
mod t35_building_snapshot_does_not_block_apply;
mod t60_snapshot_policy_never;
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::fixtures::RaftRouter;
/// - send enough requests to the node that log compaction will be triggered.
/// - add new nodes and assert that they receive the snapshot.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn compaction() -> Result<()> {
async fn build_snapshot() -> Result<()> {
let snapshot_threshold: u64 = 50;

// Setup test dependencies.
Expand Down
82 changes: 82 additions & 0 deletions tests/tests/snapshot_building/t60_snapshot_policy_never.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use futures::StreamExt;
use maplit::btreeset;
use openraft::Config;
use openraft::SnapshotPolicy;

use crate::fixtures::init_default_ut_tracing;
use crate::fixtures::RaftRouter;

/// Compaction test.
///
/// What does this test do?
///
/// - build a stable single node cluster.
/// - send enough requests to the node that log compaction will be triggered.
/// - add new nodes and assert that they receive the snapshot.
#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")]
async fn snapshot_policy_never() -> Result<()> {
let n_logs: u64 = 6000;
let default_config = Config::default().snapshot_policy;
let default_threshold = if let SnapshotPolicy::LogsSinceLast(n) = default_config {
n
} else {
panic!("snapshot_policy must be never");
};

assert!(
n_logs > default_threshold,
"snapshot_threshold must be greater than default threshold"
);

// Setup test dependencies.
let config = Arc::new(
Config {
snapshot_policy: SnapshotPolicy::Never,
enable_tick: false,
..Default::default()
}
.validate()?,
);
let mut router = RaftRouter::new(config.clone());

tracing::info!("--- initializing cluster");
let mut log_index = router.new_cluster(btreeset! {0}, btreeset! {}).await?;

let mut clients = futures::stream::FuturesUnordered::new();

let n_clients = 20;
for i in 0..n_clients {
let per_client = n_logs / n_clients;
let r = router.clone();
clients.push(async move {
let client_id = format!("{}", i);
r.client_request_many(0, &client_id, per_client as usize).await
});
log_index += per_client;
}

while clients.next().await.is_some() {}

tracing::info!(log_index, "--- log_index: {}", log_index);
router
.wait(&0, timeout())
.log(Some(log_index), format_args!("write log upto {}", log_index))
.await?;

let wait_snapshot_res = router
.wait(&0, Some(Duration::from_millis(3_000)))
.metrics(|m| m.snapshot.is_some(), "no snapshot will be built")
.await;

assert!(wait_snapshot_res.is_err(), "no snapshot should be built");

Ok(())
}

fn timeout() -> Option<Duration> {
Some(Duration::from_millis(1_000))
}
File renamed without changes.

0 comments on commit 269d221

Please sign in to comment.