diff --git a/nexus/src/app/background/init.rs b/nexus/src/app/background/init.rs index fedb74b81b..b839af2bb3 100644 --- a/nexus/src/app/background/init.rs +++ b/nexus/src/app/background/init.rs @@ -911,6 +911,7 @@ fn init_dns( #[cfg(test)] pub mod test { + use crate::app::saga::SagaCompletionFuture; use crate::app::saga::StartSaga; use dropshot::HandlerTaskMode; use futures::FutureExt; @@ -920,12 +921,14 @@ pub mod test { use nexus_db_queries::db::DataStore; use nexus_test_utils_macros::nexus_test; use nexus_types::internal_api::params as nexus_params; + use omicron_common::api::external::Error; use omicron_test_utils::dev::poll; use std::net::SocketAddr; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::time::Duration; use tempfile::TempDir; + use uuid::Uuid; /// Used by various tests of tasks that kick off sagas pub(crate) struct NoopStartSaga { @@ -946,12 +949,32 @@ pub mod test { fn saga_start( &self, _: steno::SagaDag, + ) -> futures::prelude::future::BoxFuture<'_, Result> + { + let _ = self.count.fetch_add(1, Ordering::SeqCst); + async { + // We've not actually started a real saga, so just make + // something up. + Ok(steno::SagaId(Uuid::new_v4())) + } + .boxed() + } + + fn saga_run( + &self, + _: steno::SagaDag, ) -> futures::prelude::future::BoxFuture< '_, - Result<(), omicron_common::api::external::Error>, + Result<(steno::SagaId, SagaCompletionFuture), Error>, > { let _ = self.count.fetch_add(1, Ordering::SeqCst); - async { Ok(()) }.boxed() + async { + let id = steno::SagaId(Uuid::new_v4()); + // No-op sagas complete immediately. + let completed = async { Ok(()) }.boxed(); + Ok((id, completed)) + } + .boxed() } } diff --git a/nexus/src/app/background/tasks/instance_updater.rs b/nexus/src/app/background/tasks/instance_updater.rs index 49d0adb006..b4951b2484 100644 --- a/nexus/src/app/background/tasks/instance_updater.rs +++ b/nexus/src/app/background/tasks/instance_updater.rs @@ -23,6 +23,7 @@ use omicron_common::api::external::ListResultVec; use serde_json::json; use std::future::Future; use std::sync::Arc; +use steno::SagaId; use tokio::task::JoinSet; use uuid::Uuid; @@ -138,17 +139,18 @@ impl InstanceUpdater { .saga_errors .push((None, format!("unexpected JoinError: {err}"))); } - Ok(Err((instance_id, err))) => { - const MSG: &'static str = "update saga failed"; + Ok(Err((instance_id, saga_id, err))) => { warn!( opctx.log, - "{MSG}!"; + "update saga failed!"; "instance_id" => %instance_id, + "saga_id" => %saga_id, "error" => &err, ); - status - .saga_errors - .push((Some(instance_id), err.to_string())); + status.saga_errors.push(( + Some(instance_id), + format!("update saga {saga_id} failed: {err}"), + )); } Ok(Ok(())) => status.sagas_completed += 1, } @@ -159,7 +161,7 @@ impl InstanceUpdater { &self, opctx: &OpContext, status: &mut InstanceUpdaterStatus, - sagas: &mut JoinSet>, + sagas: &mut JoinSet>, instances: impl IntoIterator, ) { let serialized_authn = authn::saga::Serialized::for_opctx(opctx); @@ -171,24 +173,21 @@ impl InstanceUpdater { .instance_id(instance_id) .lookup_for(authz::Action::Modify) .await?; - instance_update::SagaInstanceUpdate::prepare( + let dag = instance_update::SagaInstanceUpdate::prepare( &instance_update::Params { serialized_authn: serialized_authn.clone(), authz_instance, }, - ) + )?; + self.sagas.saga_run(dag).await } .await; match saga { - Ok(saga) => { - let start_saga = self.sagas.clone(); + Ok((saga_id, completed)) => { + status.sagas_started += 1; sagas.spawn(async move { - start_saga - .saga_start(saga) - .await - .map_err(|e| (instance_id, e)) + completed.await.map_err(|e| (instance_id, saga_id, e)) }); - status.sagas_started += 1; } Err(err) => { const ERR_MSG: &str = "failed to start update saga"; diff --git a/nexus/src/app/background/tasks/instance_watcher.rs b/nexus/src/app/background/tasks/instance_watcher.rs index c01997ffba..6bad1d7406 100644 --- a/nexus/src/app/background/tasks/instance_watcher.rs +++ b/nexus/src/app/background/tasks/instance_watcher.rs @@ -249,13 +249,28 @@ impl InstanceWatcher { _ => Err(Incomplete::UpdateFailed), }; } - Ok(Some((_, saga))) => { - check.update_saga_queued = true; - if let Err(e) = sagas.saga_start(saga).await { - warn!(opctx.log, "update saga failed"; "error" => ?e); + Ok(Some((_, saga))) => match sagas.saga_run(saga).await { + Ok((saga_id, completed)) => { + check.update_saga_queued = true; + if let Err(e) = completed.await { + warn!( + opctx.log, + "update saga failed"; + "saga_id" => %saga_id, + "error" => e, + ); + check.result = Err(Incomplete::UpdateFailed); + } + } + Err(e) => { + warn!( + opctx.log, + "update saga could not be started"; + "error" => e, + ); check.result = Err(Incomplete::UpdateFailed); } - } + }, Ok(None) => {} }; diff --git a/nexus/src/app/background/tasks/region_replacement.rs b/nexus/src/app/background/tasks/region_replacement.rs index dde34b017c..248fff6ac2 100644 --- a/nexus/src/app/background/tasks/region_replacement.rs +++ b/nexus/src/app/background/tasks/region_replacement.rs @@ -51,7 +51,10 @@ impl RegionReplacementDetector { }; let saga_dag = SagaRegionReplacementStart::prepare(¶ms)?; - self.sagas.saga_start(saga_dag).await + // We only care that the saga was started, and don't wish to wait for it + // to complete, so use `StartSaga::saga_start`, rather than `saga_run`. + self.sagas.saga_start(saga_dag).await?; + Ok(()) } } diff --git a/nexus/src/app/background/tasks/region_snapshot_replacement_garbage_collect.rs b/nexus/src/app/background/tasks/region_snapshot_replacement_garbage_collect.rs index 77dc87c060..f3b1b68198 100644 --- a/nexus/src/app/background/tasks/region_snapshot_replacement_garbage_collect.rs +++ b/nexus/src/app/background/tasks/region_snapshot_replacement_garbage_collect.rs @@ -56,7 +56,10 @@ impl RegionSnapshotReplacementGarbageCollect { let saga_dag = SagaRegionSnapshotReplacementGarbageCollect::prepare(¶ms)?; - self.sagas.saga_start(saga_dag).await + // We only care that the saga was started, and don't wish to wait for it + // to complete, so use `StartSaga::saga_start`, rather than `saga_run`. + self.sagas.saga_start(saga_dag).await?; + Ok(()) } async fn clean_up_region_snapshot_replacement_volumes( diff --git a/nexus/src/app/background/tasks/region_snapshot_replacement_start.rs b/nexus/src/app/background/tasks/region_snapshot_replacement_start.rs index 1fdc17690d..bc739ecf27 100644 --- a/nexus/src/app/background/tasks/region_snapshot_replacement_start.rs +++ b/nexus/src/app/background/tasks/region_snapshot_replacement_start.rs @@ -49,7 +49,10 @@ impl RegionSnapshotReplacementDetector { }; let saga_dag = SagaRegionSnapshotReplacementStart::prepare(¶ms)?; - self.sagas.saga_start(saga_dag).await + // We only care that the saga was started, and don't wish to wait for it + // to complete, so use `StartSaga::saga_start`, rather than `saga_run`. + self.sagas.saga_start(saga_dag).await?; + Ok(()) } /// Find region snapshots on expunged physical disks and create region diff --git a/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs b/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs index cd13a56642..522ce055c6 100644 --- a/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs +++ b/nexus/src/app/background/tasks/region_snapshot_replacement_step.rs @@ -59,7 +59,10 @@ impl RegionSnapshotReplacementFindAffected { }; let saga_dag = SagaRegionSnapshotReplacementStep::prepare(¶ms)?; - self.sagas.saga_start(saga_dag).await + // We only care that the saga was started, and don't wish to wait for it + // to complete, so use `StartSaga::saga_start`, rather than `saga_run`. + self.sagas.saga_start(saga_dag).await?; + Ok(()) } async fn send_garbage_collect_request( @@ -89,7 +92,12 @@ impl RegionSnapshotReplacementFindAffected { let saga_dag = SagaRegionSnapshotReplacementStepGarbageCollect::prepare(¶ms)?; - self.sagas.saga_start(saga_dag).await + + // We only care that the saga was started, and don't wish to wait for it + // to complete, so throwing out the future returned by `saga_start` is + // fine. Dropping it will *not* cancel the saga itself. + self.sagas.saga_start(saga_dag).await?; + Ok(()) } async fn clean_up_region_snapshot_replacement_step_volumes( diff --git a/nexus/src/app/saga.rs b/nexus/src/app/saga.rs index 975df7fc3b..93dc5a135a 100644 --- a/nexus/src/app/saga.rs +++ b/nexus/src/app/saga.rs @@ -90,18 +90,61 @@ pub(crate) fn create_saga_dag( pub(crate) trait StartSaga: Send + Sync { /// Create a new saga (of type `N` with parameters `params`), start it /// running, but do not wait for it to finish. - fn saga_start(&self, dag: SagaDag) -> BoxFuture<'_, Result<(), Error>>; + /// + /// This method returns the ID of the running saga. + fn saga_start(&self, dag: SagaDag) -> BoxFuture<'_, Result>; + + /// Create a new saga (of type `N` with parameters `params`), start it + /// running, and return a future that can be used to wait for it to finish + /// (along with the saga's ID). + /// + /// Callers who do not need to wait for the saga's completion should use + /// `StartSaga::saga_start`, instead, as it avoids allocating the second + /// `BoxFuture` for the completion future. + fn saga_run( + &self, + dag: SagaDag, + ) -> BoxFuture<'_, Result<(SagaId, SagaCompletionFuture), Error>>; } +pub type SagaCompletionFuture = BoxFuture<'static, Result<(), Error>>; + impl StartSaga for SagaExecutor { - fn saga_start(&self, dag: SagaDag) -> BoxFuture<'_, Result<(), Error>> { + fn saga_start(&self, dag: SagaDag) -> BoxFuture<'_, Result> { async move { let runnable_saga = self.saga_prepare(dag).await?; // start() returns a future that can be used to wait for the saga to // complete. We don't need that here. (Cancelling this has no // effect on the running saga.) - let _ = runnable_saga.start().await?; - Ok(()) + let running_saga = runnable_saga.start().await?; + + Ok(running_saga.id) + } + .boxed() + } + + fn saga_run( + &self, + dag: SagaDag, + ) -> BoxFuture<'_, Result<(SagaId, SagaCompletionFuture), Error>> { + async move { + let runnable_saga = self.saga_prepare(dag).await?; + let running_saga = runnable_saga.start().await?; + let id = running_saga.id; + let completed = async move { + running_saga + .wait_until_stopped() + .await + // Eat the saga's outputs, saga log, etc., and just return + // whether it succeeded or failed. This is necessary because + // some tests rely on a `NoopStartSaga` implementation that + // doesn't actually run sagas and therefore cannot produce a + // real saga log or outputs. + .into_omicron_result() + .map(|_| ()) + } + .boxed(); + Ok((id, completed)) } .boxed() }