Skip to content

Commit

Permalink
[nexus] Add StartSaga::saga_run to await completion (#6574)
Browse files Browse the repository at this point in the history
Nexus API handlers (and other sagas) that start sagas have the ability
to wait for the completion of the saga by `await`ing the `RunningSaga`
future returned by `RunnableSaga::start`. Background tasks, on the other
hand, cannot currently do this, as their only interface to the saga
executor is an `Arc<dyn StartSaga>`, which provides only the
[`saga_start` method][1]. This method throws away the `RunningSaga`
returned by `RunnableSaga::start`, so the completion of the saga cannot
be awaited. In some cases, it's desirable for background tasks to be
able to await a saga running to completion. I described some motivations
for this in #6569.

This commit adds a new `StartSaga::saga_run` method to the `StartSaga`
trait, which starts a saga and returns a second future that can be
awaited to wait for the saga to finish. Since many tests use a
`NoopStartSaga` type which doesn't actually start sagas, this interface
still throws away most of the saga *outputs* provided by `StoppedSaga`,
to make it easier for the noop test implementation to implement this
method. If that's an issue in the future, we can revisit the interface,
and maybe make `NoopStartSaga` return fake saga results or something.

I've updated the `instance_watcher` and `instance_updater` background
tasks to use the `saga_run` method, because they were written with the
intent to spawn tasks that run sagas to completion --- this is necessary
for how the number of concurrent update sagas is *supposed* to be
limited by `instance_watcher`. I left the region-replacement tasks using
`StartSaga::saga_start`, because --- per @jmpesp --- the "fire and forget"
behavior is desirable there.

Closes #6569

[1]: https://github.com/oxidecomputer/omicron/blob/8be99b0c0dd18495d4a98187145961eafdb17d8f/nexus/src/app/saga.rs#L96-L108
  • Loading branch information
hawkw authored Sep 16, 2024
1 parent a7d942f commit d2e3f34
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 32 deletions.
27 changes: 25 additions & 2 deletions nexus/src/app/background/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,7 @@ fn init_dns(

#[cfg(test)]
pub mod test {
use crate::app::saga::SagaCompletionFuture;
use crate::app::saga::StartSaga;
use dropshot::HandlerTaskMode;
use futures::FutureExt;
Expand All @@ -920,12 +921,14 @@ pub mod test {
use nexus_db_queries::db::DataStore;
use nexus_test_utils_macros::nexus_test;
use nexus_types::internal_api::params as nexus_params;
use omicron_common::api::external::Error;
use omicron_test_utils::dev::poll;
use std::net::SocketAddr;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tempfile::TempDir;
use uuid::Uuid;

/// Used by various tests of tasks that kick off sagas
pub(crate) struct NoopStartSaga {
Expand All @@ -946,12 +949,32 @@ pub mod test {
fn saga_start(
&self,
_: steno::SagaDag,
) -> futures::prelude::future::BoxFuture<'_, Result<steno::SagaId, Error>>
{
let _ = self.count.fetch_add(1, Ordering::SeqCst);
async {
// We've not actually started a real saga, so just make
// something up.
Ok(steno::SagaId(Uuid::new_v4()))
}
.boxed()
}

fn saga_run(
&self,
_: steno::SagaDag,
) -> futures::prelude::future::BoxFuture<
'_,
Result<(), omicron_common::api::external::Error>,
Result<(steno::SagaId, SagaCompletionFuture), Error>,
> {
let _ = self.count.fetch_add(1, Ordering::SeqCst);
async { Ok(()) }.boxed()
async {
let id = steno::SagaId(Uuid::new_v4());
// No-op sagas complete immediately.
let completed = async { Ok(()) }.boxed();
Ok((id, completed))
}
.boxed()
}
}

Expand Down
31 changes: 15 additions & 16 deletions nexus/src/app/background/tasks/instance_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use omicron_common::api::external::ListResultVec;
use serde_json::json;
use std::future::Future;
use std::sync::Arc;
use steno::SagaId;
use tokio::task::JoinSet;
use uuid::Uuid;

Expand Down Expand Up @@ -138,17 +139,18 @@ impl InstanceUpdater {
.saga_errors
.push((None, format!("unexpected JoinError: {err}")));
}
Ok(Err((instance_id, err))) => {
const MSG: &'static str = "update saga failed";
Ok(Err((instance_id, saga_id, err))) => {
warn!(
opctx.log,
"{MSG}!";
"update saga failed!";
"instance_id" => %instance_id,
"saga_id" => %saga_id,
"error" => &err,
);
status
.saga_errors
.push((Some(instance_id), err.to_string()));
status.saga_errors.push((
Some(instance_id),
format!("update saga {saga_id} failed: {err}"),
));
}
Ok(Ok(())) => status.sagas_completed += 1,
}
Expand All @@ -159,7 +161,7 @@ impl InstanceUpdater {
&self,
opctx: &OpContext,
status: &mut InstanceUpdaterStatus,
sagas: &mut JoinSet<Result<(), (Uuid, Error)>>,
sagas: &mut JoinSet<Result<(), (Uuid, SagaId, Error)>>,
instances: impl IntoIterator<Item = Instance>,
) {
let serialized_authn = authn::saga::Serialized::for_opctx(opctx);
Expand All @@ -171,24 +173,21 @@ impl InstanceUpdater {
.instance_id(instance_id)
.lookup_for(authz::Action::Modify)
.await?;
instance_update::SagaInstanceUpdate::prepare(
let dag = instance_update::SagaInstanceUpdate::prepare(
&instance_update::Params {
serialized_authn: serialized_authn.clone(),
authz_instance,
},
)
)?;
self.sagas.saga_run(dag).await
}
.await;
match saga {
Ok(saga) => {
let start_saga = self.sagas.clone();
Ok((saga_id, completed)) => {
status.sagas_started += 1;
sagas.spawn(async move {
start_saga
.saga_start(saga)
.await
.map_err(|e| (instance_id, e))
completed.await.map_err(|e| (instance_id, saga_id, e))
});
status.sagas_started += 1;
}
Err(err) => {
const ERR_MSG: &str = "failed to start update saga";
Expand Down
25 changes: 20 additions & 5 deletions nexus/src/app/background/tasks/instance_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,28 @@ impl InstanceWatcher {
_ => Err(Incomplete::UpdateFailed),
};
}
Ok(Some((_, saga))) => {
check.update_saga_queued = true;
if let Err(e) = sagas.saga_start(saga).await {
warn!(opctx.log, "update saga failed"; "error" => ?e);
Ok(Some((_, saga))) => match sagas.saga_run(saga).await {
Ok((saga_id, completed)) => {
check.update_saga_queued = true;
if let Err(e) = completed.await {
warn!(
opctx.log,
"update saga failed";
"saga_id" => %saga_id,
"error" => e,
);
check.result = Err(Incomplete::UpdateFailed);
}
}
Err(e) => {
warn!(
opctx.log,
"update saga could not be started";
"error" => e,
);
check.result = Err(Incomplete::UpdateFailed);
}
}
},
Ok(None) => {}
};

Expand Down
5 changes: 4 additions & 1 deletion nexus/src/app/background/tasks/region_replacement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ impl RegionReplacementDetector {
};

let saga_dag = SagaRegionReplacementStart::prepare(&params)?;
self.sagas.saga_start(saga_dag).await
// We only care that the saga was started, and don't wish to wait for it
// to complete, so use `StartSaga::saga_start`, rather than `saga_run`.
self.sagas.saga_start(saga_dag).await?;
Ok(())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ impl RegionSnapshotReplacementGarbageCollect {

let saga_dag =
SagaRegionSnapshotReplacementGarbageCollect::prepare(&params)?;
self.sagas.saga_start(saga_dag).await
// We only care that the saga was started, and don't wish to wait for it
// to complete, so use `StartSaga::saga_start`, rather than `saga_run`.
self.sagas.saga_start(saga_dag).await?;
Ok(())
}

async fn clean_up_region_snapshot_replacement_volumes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ impl RegionSnapshotReplacementDetector {
};

let saga_dag = SagaRegionSnapshotReplacementStart::prepare(&params)?;
self.sagas.saga_start(saga_dag).await
// We only care that the saga was started, and don't wish to wait for it
// to complete, so use `StartSaga::saga_start`, rather than `saga_run`.
self.sagas.saga_start(saga_dag).await?;
Ok(())
}

/// Find region snapshots on expunged physical disks and create region
Expand Down
12 changes: 10 additions & 2 deletions nexus/src/app/background/tasks/region_snapshot_replacement_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ impl RegionSnapshotReplacementFindAffected {
};

let saga_dag = SagaRegionSnapshotReplacementStep::prepare(&params)?;
self.sagas.saga_start(saga_dag).await
// We only care that the saga was started, and don't wish to wait for it
// to complete, so use `StartSaga::saga_start`, rather than `saga_run`.
self.sagas.saga_start(saga_dag).await?;
Ok(())
}

async fn send_garbage_collect_request(
Expand Down Expand Up @@ -89,7 +92,12 @@ impl RegionSnapshotReplacementFindAffected {

let saga_dag =
SagaRegionSnapshotReplacementStepGarbageCollect::prepare(&params)?;
self.sagas.saga_start(saga_dag).await

// We only care that the saga was started, and don't wish to wait for it
// to complete, so throwing out the future returned by `saga_start` is
// fine. Dropping it will *not* cancel the saga itself.
self.sagas.saga_start(saga_dag).await?;
Ok(())
}

async fn clean_up_region_snapshot_replacement_step_volumes(
Expand Down
51 changes: 47 additions & 4 deletions nexus/src/app/saga.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,61 @@ pub(crate) fn create_saga_dag<N: NexusSaga>(
pub(crate) trait StartSaga: Send + Sync {
/// Create a new saga (of type `N` with parameters `params`), start it
/// running, but do not wait for it to finish.
fn saga_start(&self, dag: SagaDag) -> BoxFuture<'_, Result<(), Error>>;
///
/// This method returns the ID of the running saga.
fn saga_start(&self, dag: SagaDag) -> BoxFuture<'_, Result<SagaId, Error>>;

/// Create a new saga (of type `N` with parameters `params`), start it
/// running, and return a future that can be used to wait for it to finish
/// (along with the saga's ID).
///
/// Callers who do not need to wait for the saga's completion should use
/// `StartSaga::saga_start`, instead, as it avoids allocating the second
/// `BoxFuture` for the completion future.
fn saga_run(
&self,
dag: SagaDag,
) -> BoxFuture<'_, Result<(SagaId, SagaCompletionFuture), Error>>;
}

pub type SagaCompletionFuture = BoxFuture<'static, Result<(), Error>>;

impl StartSaga for SagaExecutor {
fn saga_start(&self, dag: SagaDag) -> BoxFuture<'_, Result<(), Error>> {
fn saga_start(&self, dag: SagaDag) -> BoxFuture<'_, Result<SagaId, Error>> {
async move {
let runnable_saga = self.saga_prepare(dag).await?;
// start() returns a future that can be used to wait for the saga to
// complete. We don't need that here. (Cancelling this has no
// effect on the running saga.)
let _ = runnable_saga.start().await?;
Ok(())
let running_saga = runnable_saga.start().await?;

Ok(running_saga.id)
}
.boxed()
}

fn saga_run(
&self,
dag: SagaDag,
) -> BoxFuture<'_, Result<(SagaId, SagaCompletionFuture), Error>> {
async move {
let runnable_saga = self.saga_prepare(dag).await?;
let running_saga = runnable_saga.start().await?;
let id = running_saga.id;
let completed = async move {
running_saga
.wait_until_stopped()
.await
// Eat the saga's outputs, saga log, etc., and just return
// whether it succeeded or failed. This is necessary because
// some tests rely on a `NoopStartSaga` implementation that
// doesn't actually run sagas and therefore cannot produce a
// real saga log or outputs.
.into_omicron_result()
.map(|_| ())
}
.boxed();
Ok((id, completed))
}
.boxed()
}
Expand Down

0 comments on commit d2e3f34

Please sign in to comment.