diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index b7a875732..6acb8d938 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -52,13 +52,13 @@ pub struct ClientRequest { /// Helper trait to build `ClientRequest` for `MemStore` in generic test code. pub trait IntoMemClientRequest { - fn make_request(client_id: &str, serial: u64) -> T; + fn make_request(client_id: impl ToString, serial: u64) -> T; } impl IntoMemClientRequest for ClientRequest { - fn make_request(client_id: &str, serial: u64) -> Self { + fn make_request(client_id: impl ToString, serial: u64) -> Self { Self { - client: client_id.into(), + client: client_id.to_string(), serial, status: format!("request-{}", serial), } diff --git a/openraft/src/config/config.rs b/openraft/src/config/config.rs index 2704a028e..02d049f82 100644 --- a/openraft/src/config/config.rs +++ b/openraft/src/config/config.rs @@ -4,6 +4,7 @@ use std::ops::Deref; use std::sync::atomic::AtomicBool; use std::time::Duration; +use anyerror::AnyError; use clap::Parser; use rand::thread_rng; use rand::Rng; @@ -26,6 +27,12 @@ pub enum SnapshotPolicy { /// A snapshot will be generated once the log has grown the specified number of logs since /// the last snapshot. LogsSinceLast(u64), + + /// Openraft will never trigger a snapshot building. + /// With this option, the application calls + /// [`Raft::trigger_snapshot()`](`crate::Raft::trigger_snapshot`) to manually trigger a + /// snapshot. + Never, } impl SnapshotPolicy { @@ -35,6 +42,7 @@ impl SnapshotPolicy { SnapshotPolicy::LogsSinceLast(threshold) => { state.committed().next_index() >= state.snapshot_last_log_id().next_index() + threshold } + SnapshotPolicy::Never => false, } } } @@ -50,17 +58,21 @@ fn parse_bytes_with_unit(src: &str) -> Result { } fn parse_snapshot_policy(src: &str) -> Result { + if src == "never" { + return Ok(SnapshotPolicy::Never); + } + let elts = src.split(':').collect::>(); if elts.len() != 2 { return Err(ConfigError::InvalidSnapshotPolicy { - syntax: "since_last:".to_string(), + syntax: "never|since_last:".to_string(), invalid: src.to_string(), }); } if elts[0] != "since_last" { return Err(ConfigError::InvalidSnapshotPolicy { - syntax: "since_last:".to_string(), + syntax: "never|since_last:".to_string(), invalid: src.to_string(), }); } @@ -258,7 +270,10 @@ impl Config { /// /// The first element in `args` must be the application name. pub fn build(args: &[&str]) -> Result { - let config = ::parse_from(args); + let config = ::try_parse_from(args).map_err(|e| ConfigError::ParseError { + source: AnyError::from(&e), + args: args.iter().map(|x| x.to_string()).collect(), + })?; config.validate() } diff --git a/openraft/src/config/config_test.rs b/openraft/src/config/config_test.rs index 3e3207627..0708e7dbe 100644 --- a/openraft/src/config/config_test.rs +++ b/openraft/src/config/config_test.rs @@ -93,6 +93,20 @@ fn test_build() -> anyhow::Result<()> { Ok(()) } +#[test] +fn test_config_snapshot_policy() -> anyhow::Result<()> { + let config = Config::build(&["foo", "--snapshot-policy=never"])?; + assert_eq!(SnapshotPolicy::Never, config.snapshot_policy); + + let config = Config::build(&["foo", "--snapshot-policy=since_last:3"])?; + assert_eq!(SnapshotPolicy::LogsSinceLast(3), config.snapshot_policy); + + let res = Config::build(&["foo", "--snapshot-policy=bar:3"]); + assert!(res.is_err()); + + Ok(()) +} + #[test] fn test_config_enable_tick() -> anyhow::Result<()> { let config = Config::build(&["foo", "--enable-tick=false"])?; diff --git a/openraft/src/config/error.rs b/openraft/src/config/error.rs index b5d4a9442..5ff8a4df4 100644 --- a/openraft/src/config/error.rs +++ b/openraft/src/config/error.rs @@ -1,7 +1,12 @@ +use anyerror::AnyError; + /// Error variants related to configuration. #[derive(Debug, thiserror::Error)] #[derive(PartialEq, Eq)] pub enum ConfigError { + #[error("ParseError: {source} while parsing ({args:?})")] + ParseError { source: AnyError, args: Vec }, + /// The min election timeout is not smaller than the max election timeout. #[error("election timeout: min({min}) must be < max({max})")] ElectionTimeout { min: u64, max: u64 }, diff --git a/tests/tests/log_compaction/main.rs b/tests/tests/snapshot_building/main.rs similarity index 83% rename from tests/tests/log_compaction/main.rs rename to tests/tests/snapshot_building/main.rs index 0b9d5cbf6..a0e8afe57 100644 --- a/tests/tests/log_compaction/main.rs +++ b/tests/tests/snapshot_building/main.rs @@ -5,6 +5,7 @@ #[path = "../fixtures/mod.rs"] mod fixtures; -mod t10_compaction; +mod t10_build_snapshot; mod t35_building_snapshot_does_not_block_append; mod t35_building_snapshot_does_not_block_apply; +mod t60_snapshot_policy_never; diff --git a/tests/tests/log_compaction/t10_compaction.rs b/tests/tests/snapshot_building/t10_build_snapshot.rs similarity index 99% rename from tests/tests/log_compaction/t10_compaction.rs rename to tests/tests/snapshot_building/t10_build_snapshot.rs index cedd01cb3..9941ac409 100644 --- a/tests/tests/log_compaction/t10_compaction.rs +++ b/tests/tests/snapshot_building/t10_build_snapshot.rs @@ -31,7 +31,7 @@ use crate::fixtures::RaftRouter; /// - send enough requests to the node that log compaction will be triggered. /// - add new nodes and assert that they receive the snapshot. #[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] -async fn compaction() -> Result<()> { +async fn build_snapshot() -> Result<()> { let snapshot_threshold: u64 = 50; // Setup test dependencies. diff --git a/tests/tests/log_compaction/t35_building_snapshot_does_not_block_append.rs b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs similarity index 100% rename from tests/tests/log_compaction/t35_building_snapshot_does_not_block_append.rs rename to tests/tests/snapshot_building/t35_building_snapshot_does_not_block_append.rs diff --git a/tests/tests/log_compaction/t35_building_snapshot_does_not_block_apply.rs b/tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs similarity index 100% rename from tests/tests/log_compaction/t35_building_snapshot_does_not_block_apply.rs rename to tests/tests/snapshot_building/t35_building_snapshot_does_not_block_apply.rs diff --git a/tests/tests/snapshot_building/t60_snapshot_policy_never.rs b/tests/tests/snapshot_building/t60_snapshot_policy_never.rs new file mode 100644 index 000000000..57394a6d5 --- /dev/null +++ b/tests/tests/snapshot_building/t60_snapshot_policy_never.rs @@ -0,0 +1,82 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use futures::StreamExt; +use maplit::btreeset; +use openraft::Config; +use openraft::SnapshotPolicy; + +use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::RaftRouter; + +/// Compaction test. +/// +/// What does this test do? +/// +/// - build a stable single node cluster. +/// - send enough requests to the node that log compaction will be triggered. +/// - add new nodes and assert that they receive the snapshot. +#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] +async fn snapshot_policy_never() -> Result<()> { + let n_logs: u64 = 6000; + let default_config = Config::default().snapshot_policy; + let default_threshold = if let SnapshotPolicy::LogsSinceLast(n) = default_config { + n + } else { + panic!("snapshot_policy must be never"); + }; + + assert!( + n_logs > default_threshold, + "snapshot_threshold must be greater than default threshold" + ); + + // Setup test dependencies. + let config = Arc::new( + Config { + snapshot_policy: SnapshotPolicy::Never, + enable_tick: false, + ..Default::default() + } + .validate()?, + ); + let mut router = RaftRouter::new(config.clone()); + + tracing::info!("--- initializing cluster"); + let mut log_index = router.new_cluster(btreeset! {0}, btreeset! {}).await?; + + let mut clients = futures::stream::FuturesUnordered::new(); + + let n_clients = 20; + for i in 0..n_clients { + let per_client = n_logs / n_clients; + let r = router.clone(); + clients.push(async move { + let client_id = format!("{}", i); + r.client_request_many(0, &client_id, per_client as usize).await + }); + log_index += per_client; + } + + while clients.next().await.is_some() {} + + tracing::info!(log_index, "--- log_index: {}", log_index); + router + .wait(&0, timeout()) + .log(Some(log_index), format_args!("write log upto {}", log_index)) + .await?; + + let wait_snapshot_res = router + .wait(&0, Some(Duration::from_millis(3_000))) + .metrics(|m| m.snapshot.is_some(), "no snapshot will be built") + .await; + + assert!(wait_snapshot_res.is_err(), "no snapshot should be built"); + + Ok(()) +} + +fn timeout() -> Option { + Some(Duration::from_millis(1_000)) +} diff --git a/tests/tests/snapshot/main.rs b/tests/tests/snapshot_streaming/main.rs similarity index 100% rename from tests/tests/snapshot/main.rs rename to tests/tests/snapshot_streaming/main.rs diff --git a/tests/tests/snapshot/t10_api_install_snapshot.rs b/tests/tests/snapshot_streaming/t10_api_install_snapshot.rs similarity index 100% rename from tests/tests/snapshot/t10_api_install_snapshot.rs rename to tests/tests/snapshot_streaming/t10_api_install_snapshot.rs diff --git a/tests/tests/snapshot/t20_startup_snapshot.rs b/tests/tests/snapshot_streaming/t20_startup_snapshot.rs similarity index 100% rename from tests/tests/snapshot/t20_startup_snapshot.rs rename to tests/tests/snapshot_streaming/t20_startup_snapshot.rs diff --git a/tests/tests/snapshot/t20_trigger_snapshot.rs b/tests/tests/snapshot_streaming/t20_trigger_snapshot.rs similarity index 100% rename from tests/tests/snapshot/t20_trigger_snapshot.rs rename to tests/tests/snapshot_streaming/t20_trigger_snapshot.rs diff --git a/tests/tests/snapshot/t30_purge_in_snapshot_logs.rs b/tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs similarity index 100% rename from tests/tests/snapshot/t30_purge_in_snapshot_logs.rs rename to tests/tests/snapshot_streaming/t30_purge_in_snapshot_logs.rs diff --git a/tests/tests/snapshot/t31_snapshot_overrides_membership.rs b/tests/tests/snapshot_streaming/t31_snapshot_overrides_membership.rs similarity index 100% rename from tests/tests/snapshot/t31_snapshot_overrides_membership.rs rename to tests/tests/snapshot_streaming/t31_snapshot_overrides_membership.rs diff --git a/tests/tests/snapshot/t32_snapshot_uses_prev_snap_membership.rs b/tests/tests/snapshot_streaming/t32_snapshot_uses_prev_snap_membership.rs similarity index 100% rename from tests/tests/snapshot/t32_snapshot_uses_prev_snap_membership.rs rename to tests/tests/snapshot_streaming/t32_snapshot_uses_prev_snap_membership.rs diff --git a/tests/tests/snapshot/t33_snapshot_delete_conflict_logs.rs b/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs similarity index 100% rename from tests/tests/snapshot/t33_snapshot_delete_conflict_logs.rs rename to tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs diff --git a/tests/tests/snapshot/t34_replication_does_not_block_purge.rs b/tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs similarity index 100% rename from tests/tests/snapshot/t34_replication_does_not_block_purge.rs rename to tests/tests/snapshot_streaming/t34_replication_does_not_block_purge.rs diff --git a/tests/tests/snapshot/t50_snapshot_line_rate_to_snapshot.rs b/tests/tests/snapshot_streaming/t50_snapshot_line_rate_to_snapshot.rs similarity index 100% rename from tests/tests/snapshot/t50_snapshot_line_rate_to_snapshot.rs rename to tests/tests/snapshot_streaming/t50_snapshot_line_rate_to_snapshot.rs diff --git a/tests/tests/snapshot/t50_snapshot_when_lacking_log.rs b/tests/tests/snapshot_streaming/t50_snapshot_when_lacking_log.rs similarity index 100% rename from tests/tests/snapshot/t50_snapshot_when_lacking_log.rs rename to tests/tests/snapshot_streaming/t50_snapshot_when_lacking_log.rs diff --git a/tests/tests/snapshot/t51_after_snapshot_add_learner_and_request_a_log.rs b/tests/tests/snapshot_streaming/t51_after_snapshot_add_learner_and_request_a_log.rs similarity index 100% rename from tests/tests/snapshot/t51_after_snapshot_add_learner_and_request_a_log.rs rename to tests/tests/snapshot_streaming/t51_after_snapshot_add_learner_and_request_a_log.rs diff --git a/tests/tests/snapshot/t60_snapshot_chunk_size.rs b/tests/tests/snapshot_streaming/t60_snapshot_chunk_size.rs similarity index 100% rename from tests/tests/snapshot/t60_snapshot_chunk_size.rs rename to tests/tests/snapshot_streaming/t60_snapshot_chunk_size.rs diff --git a/tests/tests/snapshot/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs b/tests/tests/snapshot_streaming/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs similarity index 100% rename from tests/tests/snapshot/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs rename to tests/tests/snapshot_streaming/t90_issue_808_snapshot_to_unreachable_node_should_not_block.rs