From 53129dd9c011d1f9d2fd781676243198e1e297bc Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Tue, 27 Dec 2022 19:06:28 -0500 Subject: [PATCH] Add API for injecting 'repetitions' into saga executor (#88) Provides a simple API for instructing a node to "execute twice". This provides a "bare-minimum" helper API for testing idempotency within a saga. When combined with https://github.com/oxidecomputer/steno/pull/67 - which was used to test unwind safety - it should be possible to test that all actions / undo actions within a saga are idempotent, at least across being called twice. Part of https://github.com/oxidecomputer/steno/issues/31 --- CHANGELOG.adoc | 2 + src/saga_exec.rs | 80 ++++++++++++- src/sec.rs | 299 ++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 335 insertions(+), 46 deletions(-) diff --git a/CHANGELOG.adoc b/CHANGELOG.adoc index eeabb33..bc21d5b 100644 --- a/CHANGELOG.adoc +++ b/CHANGELOG.adoc @@ -15,6 +15,8 @@ https://github.com/oxidecomputer/steno/compare/v0.3.0\...HEAD[Full list of commits] +* https://github.com/oxidecomputer/steno/pull/88[#88] Add `SecClient::saga_inject_repeat` method to help with testing idempotency + == 0.3.0 (released 2022-11-02) https://github.com/oxidecomputer/steno/compare/v0.2.0\...v0.3.0[Full list of commits] diff --git a/src/saga_exec.rs b/src/saga_exec.rs index 0776713..d94e2bb 100644 --- a/src/saga_exec.rs +++ b/src/saga_exec.rs @@ -10,6 +10,7 @@ use crate::saga_action_generic::ActionData; use crate::saga_action_generic::ActionInjectError; use crate::saga_log::SagaNodeEventType; use crate::saga_log::SagaNodeLoadStatus; +use crate::sec::RepeatInjected; use crate::sec::SecExecClient; use crate::ActionRegistry; use crate::SagaCachedState; @@ -287,6 +288,10 @@ struct TaskParams { saga_params: Arc, /// The action itself that we're executing. action: Arc>, + /// If true, indicates that the action should be executed multiple + /// times, and the latter result should be used. This is useful + /// when testing idempotency of a user-specified action. + injected_repeat: Option, } /// Executes a saga @@ -399,6 +404,7 @@ impl SagaExecutor { node_errors: BTreeMap::new(), sglog, injected_errors: BTreeSet::new(), + injected_repeats: BTreeMap::new(), sec_hdl, saga_id, }; @@ -839,6 +845,21 @@ impl SagaExecutor { live_state.injected_errors.insert(node_id); } + /// Forces a given node to be executed twice + /// + /// When execution reaches this node, the action and undo actions + /// are invoked twice by the saga executor. + /// + /// If this node produces output, only the second value is stored. + pub async fn inject_repeat( + &self, + node_id: NodeIndex, + repeat: RepeatInjected, + ) { + let mut live_state = self.live_state.lock().await; + live_state.injected_repeats.insert(node_id, repeat); + } + /// Runs the saga /// /// This might be running a saga that has never been started before or @@ -964,6 +985,10 @@ impl SagaExecutor { saga_params, action: sgaction, user_context: Arc::clone(&self.user_context), + injected_repeat: live_state + .injected_repeats + .get(&node_id) + .map(|r| *r), }; let task = tokio::spawn(SagaExecutor::exec_node(task_params)); @@ -1001,6 +1026,10 @@ impl SagaExecutor { saga_params, action: sgaction, user_context: Arc::clone(&self.user_context), + injected_repeat: live_state + .injected_repeats + .get(&node_id) + .map(|r| *r), }; let task = tokio::spawn(SagaExecutor::undo_node(task_params)); @@ -1087,14 +1116,22 @@ impl SagaExecutor { } } - let exec_future = task_params.action.do_it(ActionContext { + let make_action_context = || ActionContext { ancestor_tree: Arc::clone(&task_params.ancestor_tree), saga_params: Arc::clone(&task_params.saga_params), node_id, dag: Arc::clone(&task_params.dag), user_context: Arc::clone(&task_params.user_context), - }); - let result = exec_future.await; + }; + + let mut result = task_params.action.do_it(make_action_context()).await; + + if let Some(repeat) = task_params.injected_repeat { + for _ in 0..repeat.action.get() - 1 { + result = task_params.action.do_it(make_action_context()).await; + } + } + let node: Box> = match result { Ok(output) => { Box::new(SagaNode { node_id, state: SgnsDone(output) }) @@ -1137,16 +1174,26 @@ impl SagaExecutor { } } - let exec_future = task_params.action.undo_it(ActionContext { + let make_action_context = || ActionContext { ancestor_tree: Arc::clone(&task_params.ancestor_tree), saga_params: Arc::clone(&task_params.saga_params), node_id, dag: Arc::clone(&task_params.dag), user_context: Arc::clone(&task_params.user_context), - }); + }; + // TODO-robustness We have to figure out what it means to fail here and // what we want to do about it. - exec_future.await.unwrap(); + task_params.action.undo_it(make_action_context()).await.unwrap(); + if let Some(repeat) = task_params.injected_repeat { + for _ in 0..repeat.undo.get() - 1 { + task_params + .action + .undo_it(make_action_context()) + .await + .unwrap(); + } + } let node = Box::new(SagaNode { node_id, state: SgnsUndone(UndoMode::ActionUndone), @@ -1333,6 +1380,9 @@ struct SagaExecLiveState { /// Injected errors injected_errors: BTreeSet, + + /// Injected actions which should be called repeatedly + injected_repeats: BTreeMap, } #[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)] @@ -1998,6 +2048,16 @@ pub trait SagaExecManager: fmt::Debug + Send + Sync { /// /// See [`Dag::get_index()`] to get the node_id for a node. fn inject_error(&self, node_id: NodeIndex) -> BoxFuture<'_, ()>; + + /// Replaces the action at the specified node with one that calls both the + /// action (and undo action, if called) twice. + /// + /// See [`Dag::get_index()`] to get the node_id for a node. + fn inject_repeat( + &self, + node_id: NodeIndex, + repeat: RepeatInjected, + ) -> BoxFuture<'_, ()>; } impl SagaExecManager for SagaExecutor @@ -2019,6 +2079,14 @@ where fn inject_error(&self, node_id: NodeIndex) -> BoxFuture<'_, ()> { self.inject_error(node_id).boxed() } + + fn inject_repeat( + &self, + node_id: NodeIndex, + repeat: RepeatInjected, + ) -> BoxFuture<'_, ()> { + self.inject_repeat(node_id, repeat).boxed() + } } #[cfg(test)] diff --git a/src/sec.rs b/src/sec.rs index aea4575..82aaf7e 100644 --- a/src/sec.rs +++ b/src/sec.rs @@ -240,7 +240,33 @@ impl SecClient { let (ack_tx, ack_rx) = oneshot::channel(); self.sec_cmd( ack_rx, - SecClientMsg::SagaInjectError { ack_tx, saga_id, node_id }, + SecClientMsg::SagaInjectError { + ack_tx, + saga_id, + node_id, + error_type: ErrorInjected::Fail, + }, + ) + .await + } + + /// Inject a node "repetition" into the saga, forcing the do + undo actions + /// to be called multiple times. + pub async fn saga_inject_repeat( + &self, + saga_id: SagaId, + node_id: NodeIndex, + repeat: RepeatInjected, + ) -> Result<(), anyhow::Error> { + let (ack_tx, ack_rx) = oneshot::channel(); + self.sec_cmd( + ack_rx, + SecClientMsg::SagaInjectError { + ack_tx, + saga_id, + node_id, + error_type: ErrorInjected::Repeat(repeat), + }, ) .await } @@ -395,6 +421,25 @@ impl SagaStateView { // SEC Client/Server interface +/// Arguments which can be passed to the SEC instructing it to change +/// the number of times a node is executed. +/// +/// Providing the counts of "1" for action and undo acts as a no-op, since +/// that's the default. +/// +/// Should only be used for saga testing. +#[derive(Debug, Copy, Clone)] +pub struct RepeatInjected { + pub action: NonZeroU32, + pub undo: NonZeroU32, +} + +#[derive(Debug)] +enum ErrorInjected { + Fail, + Repeat(RepeatInjected), +} + /// Message passed from the [`SecClient`] to the [`Sec`] // TODO-cleanup This might be cleaner using separate named structs for the // enums, similar to what we do for SecStep. @@ -472,6 +517,7 @@ enum SecClientMsg { /// id of the node to inject the error (see /// [`SagaTemplateMetadata::node_for_name`]) node_id: NodeIndex, + error_type: ErrorInjected, }, /// Shut down the SEC @@ -502,10 +548,16 @@ impl fmt::Debug for SecClientMsg { SecClientMsg::SagaGet { saga_id, .. } => { f.debug_struct("SagaGet").field("saga_id", saga_id).finish() } - SecClientMsg::SagaInjectError { saga_id, node_id, .. } => f + SecClientMsg::SagaInjectError { + saga_id, + node_id, + error_type, + .. + } => f .debug_struct("SagaInjectError") .field("saga_id", saga_id) .field("node_Id", node_id) + .field("error_type", error_type) .finish(), SecClientMsg::Shutdown { .. } => f.write_str("Shutdown"), SecClientMsg::SagaStart { saga_id, .. } => { @@ -973,8 +1025,15 @@ impl Sec { SecClientMsg::SagaGet { ack_tx, saga_id } => { self.cmd_saga_get(ack_tx, saga_id); } - SecClientMsg::SagaInjectError { ack_tx, saga_id, node_id } => { - self.cmd_saga_inject_error(ack_tx, saga_id, node_id); + SecClientMsg::SagaInjectError { + ack_tx, + saga_id, + node_id, + error_type, + } => { + self.cmd_saga_inject_error( + ack_tx, saga_id, node_id, error_type, + ); } SecClientMsg::Shutdown => self.cmd_shutdown(), } @@ -1155,6 +1214,7 @@ impl Sec { ack_tx: oneshot::Sender>, saga_id: SagaId, node_id: NodeIndex, + error_type: ErrorInjected, ) { trace!( &self.log, @@ -1183,7 +1243,14 @@ impl Sec { }; let log = self.log.new(o!()); let fut = async move { - exec.inject_error(node_id).await; + match error_type { + ErrorInjected::Fail => { + exec.inject_error(node_id).await; + } + ErrorInjected::Repeat(repeat) => { + exec.inject_repeat(node_id, repeat).await; + } + } Sec::client_respond(&log, ack_tx, Ok(())); None } @@ -1426,8 +1493,7 @@ mod test { type ExecContextType = TestContext; } - fn make_test_one_node_saga() -> (Arc>, Arc) - { + fn make_test_saga() -> (Arc>, Arc) { async fn do_n1( ctx: ActionContext, ) -> Result { @@ -1441,12 +1507,28 @@ mod test { Ok(()) } + async fn do_n2( + ctx: ActionContext, + ) -> Result { + ctx.user_data().call("do_n2"); + Ok(2) + } + async fn undo_n2( + ctx: ActionContext, + ) -> Result<(), anyhow::Error> { + ctx.user_data().call("undo_n2"); + Ok(()) + } + let mut registry = ActionRegistry::new(); let action_n1 = ActionFunc::new_action("n1_out", do_n1, undo_n1); registry.register(Arc::clone(&action_n1)); + let action_n2 = ActionFunc::new_action("n2_out", do_n2, undo_n2); + registry.register(Arc::clone(&action_n2)); let mut builder = DagBuilder::new(SagaName::new("test-saga")); builder.append(Node::action("n1_out", "n1", &*action_n1)); + builder.append(Node::action("n2_out", "n2", &*action_n2)); ( Arc::new(registry), Arc::new(SagaDag::new( @@ -1456,13 +1538,28 @@ mod test { ) } - // Tests the "normal flow" for a newly created saga: create + start. - #[tokio::test] - async fn test_saga_create_and_start_executes_saga() { + struct TestArguments<'a> { + repeat: Option<(NodeIndex, RepeatInjected)>, + fail_node: Option, + counts: &'a [Counts], + } + + struct Counts { + action: u32, + undo: u32, + } + + // We have a lot of tests which attempt to: + // - Inject some repeats + // - Inject some failures + // - Observe the count of "which nodes were called" + // + // This helper intends to reduce some of that boilerplate. + async fn saga_runner_helper(arguments: TestArguments<'_>) { // Test setup let log = new_log(); let sec = new_sec(&log); - let (registry, dag) = make_test_one_node_saga(); + let (registry, dag) = make_test_saga(); // Saga Creation let saga_id = SagaId(Uuid::new_v4()); @@ -1472,39 +1569,157 @@ mod test { .await .expect("failed to create saga"); + // Only injects an error if one was requested + if let Some((repeat_node, repeat_operation)) = arguments.repeat { + sec.saga_inject_repeat(saga_id, repeat_node, repeat_operation) + .await + .expect("failed to inject repeat"); + } + + // Only injects a failure if one was requested + if let Some(fail_node) = arguments.fail_node { + sec.saga_inject_error(saga_id, fail_node) + .await + .expect("failed to inject error"); + } + sec.saga_start(saga_id).await.expect("failed to start saga running"); let result = saga_future.await; - let output = result.kind.unwrap(); - assert_eq!(output.lookup_node_output::("n1_out").unwrap(), 1); - assert_eq!(context.get_count("do_n1"), 1); - assert_eq!(context.get_count("undo_n1"), 0); + if arguments.fail_node.is_some() { + result.kind.expect_err("should have failed; we injected an error!"); + } else { + let output = result.kind.unwrap(); + assert_eq!(output.lookup_node_output::("n1_out").unwrap(), 1); + assert_eq!(output.lookup_node_output::("n2_out").unwrap(), 2); + } + let counts = &arguments.counts; + assert_eq!(context.get_count("do_n1"), counts[0].action); + assert_eq!(context.get_count("undo_n1"), counts[0].undo); + assert_eq!(context.get_count("do_n2"), counts[1].action); + assert_eq!(context.get_count("undo_n2"), counts[1].undo); } - // Tests error injection skips execution of the actions, and fails the saga. + // Tests the "normal flow" for a newly created saga: create + start. #[tokio::test] - async fn test_saga_fails_after_error_injection() { - // Test setup - let log = new_log(); - let sec = new_sec(&log); - let (registry, dag) = make_test_one_node_saga(); + async fn test_saga_create_and_start_executes_saga() { + saga_runner_helper(TestArguments { + repeat: None, + fail_node: None, + counts: &[ + Counts { action: 1, undo: 0 }, + Counts { action: 1, undo: 0 }, + ], + }) + .await; + } - // Saga Creation - let saga_id = SagaId(Uuid::new_v4()); - let context = Arc::new(TestContext::new()); - let saga_future = sec - .saga_create(saga_id, Arc::clone(&context), dag, registry) - .await - .expect("failed to create saga"); + #[tokio::test] + async fn test_saga_inject_repeat_and_then_succeed() { + saga_runner_helper(TestArguments { + repeat: Some(( + NodeIndex::new(0), + RepeatInjected { + action: NonZeroU32::new(2).unwrap(), + undo: NonZeroU32::new(1).unwrap(), + }, + )), + fail_node: None, + counts: &[ + Counts { action: 2, undo: 0 }, + Counts { action: 1, undo: 0 }, + ], + }) + .await; + } - sec.saga_inject_error(saga_id, NodeIndex::new(0)) - .await - .expect("failed to inject error"); + #[tokio::test] + async fn test_saga_inject_repeat_and_then_fail() { + saga_runner_helper(TestArguments { + repeat: Some(( + NodeIndex::new(0), + RepeatInjected { + action: NonZeroU32::new(2).unwrap(), + undo: NonZeroU32::new(1).unwrap(), + }, + )), + fail_node: Some(NodeIndex::new(1)), + counts: &[ + Counts { action: 2, undo: 1 }, + Counts { action: 0, undo: 0 }, + ], + }) + .await; + } - sec.saga_start(saga_id).await.expect("failed to start saga running"); - let result = saga_future.await; - result.kind.expect_err("should have failed; we injected an error!"); - assert_eq!(context.get_count("do_n1"), 0); - assert_eq!(context.get_count("undo_n1"), 0); + #[tokio::test] + async fn test_saga_inject_repeat_fail_and_repeat_undo() { + saga_runner_helper(TestArguments { + repeat: Some(( + NodeIndex::new(0), + RepeatInjected { + action: NonZeroU32::new(2).unwrap(), + undo: NonZeroU32::new(2).unwrap(), + }, + )), + fail_node: Some(NodeIndex::new(1)), + counts: &[ + Counts { action: 2, undo: 2 }, + Counts { action: 0, undo: 0 }, + ], + }) + .await; + } + + #[tokio::test] + async fn test_saga_inject_and_fail_repeat_undo_only() { + saga_runner_helper(TestArguments { + repeat: Some(( + NodeIndex::new(0), + RepeatInjected { + action: NonZeroU32::new(1).unwrap(), + undo: NonZeroU32::new(2).unwrap(), + }, + )), + fail_node: Some(NodeIndex::new(1)), + counts: &[ + Counts { action: 1, undo: 2 }, + Counts { action: 0, undo: 0 }, + ], + }) + .await; + } + + #[tokio::test] + async fn test_saga_inject_and_fail_repeat_many_times() { + saga_runner_helper(TestArguments { + repeat: Some(( + NodeIndex::new(0), + RepeatInjected { + action: NonZeroU32::new(3).unwrap(), + undo: NonZeroU32::new(5).unwrap(), + }, + )), + fail_node: Some(NodeIndex::new(1)), + counts: &[ + Counts { action: 3, undo: 5 }, + Counts { action: 0, undo: 0 }, + ], + }) + .await; + } + + // Tests error injection skips execution of the actions, and fails the saga. + #[tokio::test] + async fn test_saga_fails_after_error_injection() { + saga_runner_helper(TestArguments { + repeat: None, + fail_node: Some(NodeIndex::new(0)), + counts: &[ + Counts { action: 0, undo: 0 }, + Counts { action: 0, undo: 0 }, + ], + }) + .await; } // Tests that omitting "start" after creation doesn't execute the saga. @@ -1513,7 +1728,7 @@ mod test { // Test setup let log = new_log(); let sec = new_sec(&log); - let (registry, dag) = make_test_one_node_saga(); + let (registry, dag) = make_test_saga(); // Saga Creation let saga_id = SagaId(Uuid::new_v4()); @@ -1531,6 +1746,8 @@ mod test { } assert_eq!(context.get_count("do_n1"), 0); assert_eq!(context.get_count("undo_n1"), 0); + assert_eq!(context.get_count("do_n2"), 0); + assert_eq!(context.get_count("undo_n2"), 0); } // Tests that resume + start executes the saga. This is the normal flow @@ -1541,7 +1758,7 @@ mod test { // Test setup let log = new_log(); let sec = new_sec(&log); - let (registry, dag) = make_test_one_node_saga(); + let (registry, dag) = make_test_saga(); // Saga Creation let saga_id = SagaId(Uuid::new_v4()); @@ -1563,6 +1780,8 @@ mod test { assert_eq!(output.lookup_node_output::("n1_out").unwrap(), 1); assert_eq!(context.get_count("do_n1"), 1); assert_eq!(context.get_count("undo_n1"), 0); + assert_eq!(context.get_count("do_n2"), 1); + assert_eq!(context.get_count("undo_n2"), 0); } // Tests that at *most* one of create + resume should be used for a saga, @@ -1572,7 +1791,7 @@ mod test { // Test setup let log = new_log(); let sec = new_sec(&log); - let (registry, dag) = make_test_one_node_saga(); + let (registry, dag) = make_test_saga(); // Saga Creation let saga_id = SagaId(Uuid::new_v4()); @@ -1626,7 +1845,7 @@ mod test { // Test setup let log = new_log(); let sec = new_sec(&log); - let (registry, dag) = make_test_one_node_saga(); + let (registry, dag) = make_test_saga(); // Saga Creation let saga_id = SagaId(Uuid::new_v4());