From 708c288345a4405d9573f0d8945c316057b64149 Mon Sep 17 00:00:00 2001 From: Rain Date: Thu, 18 Jul 2024 15:14:08 -0700 Subject: [PATCH] [nexus-db-queries] make saga_update more resilient, record_event idempotent (#6113) See discussion in https://github.com/oxidecomputer/omicron/issues/2416 and https://github.com/oxidecomputer/omicron/issues/6090#issuecomment-2229509411. A summary of the changes here: 1. Made `saga_create_event` idempotent. Previously, creating another event that duplicated the one which already existed would fail -- now it succeeds. These events are meant to be an append-only idempotent log, so this is okay. Also added a test for this. 2. `saga_update_state` was already idempotent -- added a test which made sure of this. Also added a comment about how idempotence may not be enough in the future. 3. Added a retry loop around saga state updates, similar to the one around recording events. --- nexus/db-queries/src/db/datastore/saga.rs | 201 ++++++++++++++++++---- nexus/db-queries/src/db/sec_store.rs | 155 ++++++++++------- 2 files changed, 268 insertions(+), 88 deletions(-) diff --git a/nexus/db-queries/src/db/datastore/saga.rs b/nexus/db-queries/src/db/datastore/saga.rs index e632bce881..939929e665 100644 --- a/nexus/db-queries/src/db/datastore/saga.rs +++ b/nexus/db-queries/src/db/datastore/saga.rs @@ -47,6 +47,15 @@ impl DataStore { // owning this saga. diesel::insert_into(dsl::saga_node_event) .values(event.clone()) + // (saga_id, node_id, event_type) is the primary key, and this is + // expected to be idempotent. + // + // Consider the situation where a saga event gets recorded and + // committed, but there's a network reset which makes the client + // (us) believe that the event wasn't recorded. If we retry the + // event, we want to not fail with a conflict. + .on_conflict((dsl::saga_id, dsl::node_id, dsl::event_type)) + .do_nothing() .execute_async(&*self.pool_connection_unauthorized().await?) .await .map_err(|e| { @@ -58,6 +67,28 @@ impl DataStore { Ok(()) } + /// Update the state of a saga in the database. + /// + /// This function is meant to be called in a loop, so that in the event of + /// network flakiness, the operation is retried until successful. + /// + /// ## About conflicts + /// + /// Currently, if the value of `saga_state` in the database is the same as + /// the value we're trying to set it to, the update will be a no-op. That + /// is okay, because at any time only one SEC will update the saga. (For + /// now, we're implementing saga adoption only in cases where the original + /// SEC/Nexus has been expunged.) + /// + /// However, in the future, it may be possible for multiple SECs to try and + /// update the same saga, and overwrite each other's state. For example, + /// one SEC might try and update the state to Running while the other one + /// updates it to Done. That case would have to be carefully considered and + /// tested here, probably using the (currently unused) + /// `current_adopt_generation` field to enable optimistic concurrency. + /// + /// To reiterate, we are *not* considering the case where several SECs try + /// to update the same saga. That will be a future enhancement. pub async fn saga_update_state( &self, saga_id: steno::SagaId, @@ -182,6 +213,7 @@ impl DataStore { mod test { use super::*; use crate::db::datastore::test_utils::datastore_test; + use nexus_db_model::{SagaNodeEvent, SecId}; use nexus_test_utils::db::test_setup_database; use omicron_test_utils::dev; use rand::seq::SliceRandom; @@ -195,20 +227,8 @@ mod test { let mut db = test_setup_database(&logctx.log).await; let (opctx, datastore) = datastore_test(&logctx, &db).await; let sec_id = db::SecId(uuid::Uuid::new_v4()); - - // Create a couple batches of sagas. - let new_running_db_saga = || { - let params = steno::SagaCreateParams { - id: steno::SagaId(Uuid::new_v4()), - name: steno::SagaName::new("test saga"), - dag: serde_json::value::Value::Null, - state: steno::SagaCachedState::Running, - }; - - db::model::saga_types::Saga::new(sec_id, params) - }; let mut inserted_sagas = (0..SQL_BATCH_SIZE.get() * 2) - .map(|_| new_running_db_saga()) + .map(|_| SagaTestContext::new(sec_id).new_running_db_saga()) .collect::>(); // Shuffle these sagas into a random order to check that the pagination @@ -263,20 +283,9 @@ mod test { let logctx = dev::test_setup_log("test_list_unfinished_nodes"); let mut db = test_setup_database(&logctx.log).await; let (opctx, datastore) = datastore_test(&logctx, &db).await; - let sec_id = db::SecId(uuid::Uuid::new_v4()); - let saga_id = steno::SagaId(Uuid::new_v4()); + let node_cx = SagaTestContext::new(SecId(Uuid::new_v4())); // Create a couple batches of saga events - let new_db_saga_nodes = - |node_id: u32, event_type: steno::SagaNodeEventType| { - let event = steno::SagaNodeEvent { - saga_id, - node_id: steno::SagaNodeId::from(node_id), - event_type, - }; - - db::model::saga_types::SagaNodeEvent::new(event, sec_id) - }; let mut inserted_nodes = (0..SQL_BATCH_SIZE.get() * 2) .flat_map(|i| { // This isn't an exhaustive list of event types, but gives us a @@ -284,9 +293,9 @@ mod test { // it's important to include a variety here. use steno::SagaNodeEventType::*; [ - new_db_saga_nodes(i, Started), - new_db_saga_nodes(i, UndoStarted), - new_db_saga_nodes(i, UndoFinished), + node_cx.new_db_event(i, Started), + node_cx.new_db_event(i, UndoStarted), + node_cx.new_db_event(i, UndoFinished), ] }) .collect::>(); @@ -311,7 +320,7 @@ mod test { let observed_nodes = datastore .saga_fetch_log_batched( &opctx, - nexus_db_model::saga_types::SagaId::from(saga_id), + nexus_db_model::saga_types::SagaId::from(node_cx.saga_id), ) .await .expect("Failed to list nodes of unfinished saga"); @@ -366,4 +375,138 @@ mod test { db.cleanup().await.unwrap(); logctx.cleanup_successful(); } + + #[tokio::test] + async fn test_create_event_idempotent() { + // Test setup + let logctx = dev::test_setup_log("test_create_event_idempotent"); + let mut db = test_setup_database(&logctx.log).await; + let (_, datastore) = datastore_test(&logctx, &db).await; + let node_cx = SagaTestContext::new(SecId(Uuid::new_v4())); + + // Generate a bunch of events. + let inserted_nodes = (0..2) + .flat_map(|i| { + use steno::SagaNodeEventType::*; + [ + node_cx.new_db_event(i, Started), + node_cx.new_db_event(i, UndoStarted), + node_cx.new_db_event(i, UndoFinished), + ] + }) + .collect::>(); + + // Insert the events into the database. + for node in &inserted_nodes { + datastore + .saga_create_event(node) + .await + .expect("inserting first node events"); + } + + // Insert the events again into the database and ensure that we don't + // get a conflict. + for node in &inserted_nodes { + datastore + .saga_create_event(node) + .await + .expect("inserting duplicate node events"); + } + + // Test cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_update_state_idempotent() { + // Test setup + let logctx = dev::test_setup_log("test_create_event_idempotent"); + let mut db = test_setup_database(&logctx.log).await; + let (_, datastore) = datastore_test(&logctx, &db).await; + let node_cx = SagaTestContext::new(SecId(Uuid::new_v4())); + + // Create a saga in the running state. + let params = node_cx.new_running_db_saga(); + datastore + .saga_create(¶ms) + .await + .expect("creating saga in Running state"); + + // Attempt to update its state to Running, which is a no-op -- this + // should be idempotent, so expect success. + datastore + .saga_update_state( + node_cx.saga_id, + steno::SagaCachedState::Running, + node_cx.sec_id, + db::model::Generation::new(), + ) + .await + .expect("updating state to Running again"); + + // Update the state to Done. + datastore + .saga_update_state( + node_cx.saga_id, + steno::SagaCachedState::Done, + node_cx.sec_id, + db::model::Generation::new(), + ) + .await + .expect("updating state to Done"); + + // Attempt to update its state to Done again, which is a no-op -- this + // should be idempotent, so expect success. + datastore + .saga_update_state( + node_cx.saga_id, + steno::SagaCachedState::Done, + node_cx.sec_id, + db::model::Generation::new(), + ) + .await + .expect("updating state to Done again"); + + // Test cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + /// Helpers to create sagas. + struct SagaTestContext { + saga_id: steno::SagaId, + sec_id: SecId, + } + + impl SagaTestContext { + fn new(sec_id: SecId) -> Self { + Self { saga_id: steno::SagaId(Uuid::new_v4()), sec_id } + } + + fn new_running_db_saga(&self) -> db::model::saga_types::Saga { + let params = steno::SagaCreateParams { + id: self.saga_id, + name: steno::SagaName::new("test saga"), + dag: serde_json::value::Value::Null, + state: steno::SagaCachedState::Running, + }; + + db::model::saga_types::Saga::new(self.sec_id, params) + } + + fn new_db_event( + &self, + node_id: u32, + event_type: steno::SagaNodeEventType, + ) -> SagaNodeEvent { + let event = steno::SagaNodeEvent { + saga_id: self.saga_id, + node_id: steno::SagaNodeId::from(node_id), + event_type, + }; + + SagaNodeEvent::new(event, self.sec_id) + } + } } diff --git a/nexus/db-queries/src/db/sec_store.rs b/nexus/db-queries/src/db/sec_store.rs index 72de02ff54..0dcc3aa717 100644 --- a/nexus/db-queries/src/db/sec_store.rs +++ b/nexus/db-queries/src/db/sec_store.rs @@ -8,7 +8,8 @@ use crate::db::{self, model::Generation}; use anyhow::Context; use async_trait::async_trait; use dropshot::HttpError; -use futures::TryFutureExt; +use futures::{Future, TryFutureExt}; +use omicron_common::api::external; use omicron_common::backoff; use slog::Logger; use std::fmt; @@ -66,78 +67,114 @@ impl steno::SecStore for CockroachDbSecStore { debug!(&log, "recording saga event"); let our_event = db::saga_types::SagaNodeEvent::new(event, self.sec_id); - backoff::retry_notify_ext( - // This is an internal service query to CockroachDB. - backoff::retry_policy_internal_service(), + // Add retries for this operation. saga_create_event is internally + // idempotent, so we can retry indefinitely until the event has been + // durably recorded. + backoff_saga_operation( + &log, || { - // In general, there are some kinds of database errors that are - // temporary/server errors (e.g. network failures), and some - // that are permanent/client errors (e.g. conflict during - // insertion). The permanent ones would require operator - // intervention to fix. - // - // However, there is no way to bubble up errors here, and for - // good reason: it is inherent to the nature of sagas that - // progress is durably recorded. So within *this* code there is - // no option but to retry forever. (Below, however, we do mark - // errors that likely require operator intervention.) - // - // At a higher level, callers should plan for the fact that - // record_event (and, so, saga execution) could potentially loop - // indefinitely while the datastore (or other dependent - // services) are down. self.datastore .saga_create_event(&our_event) .map_err(backoff::BackoffError::transient) }, - move |error, call_count, total_duration| { - let http_error = HttpError::from(error.clone()); - if http_error.status_code.is_client_error() { - error!( - &log, - "client error while recording saga event (likely \ - requires operator intervention), retrying anyway"; - "error" => &error, - "call_count" => call_count, - "total_duration" => ?total_duration, - ); - } else if total_duration > Duration::from_secs(20) { - warn!( - &log, - "server error while recording saga event, retrying"; - "error" => &error, - "call_count" => call_count, - "total_duration" => ?total_duration, - ); - } else { - info!( - &log, - "server error while recording saga event, retrying"; - "error" => &error, - "call_count" => call_count, - "total_duration" => ?total_duration, - ); - } - }, + "recording saga event", ) .await - .expect("the above backoff retries forever") } async fn saga_update(&self, id: SagaId, update: steno::SagaCachedState) { // TODO-robustness We should track the current generation of the saga // and use it. We'll know this either from when it was created or when // it was recovered. - info!(&self.log, "updating state"; + + let log = self.log.new(o!( "saga_id" => id.to_string(), - "new_state" => update.to_string() - ); + "new_state" => update.to_string(), + )); - // TODO-robustness This should be wrapped with a retry loop rather than - // unwrapping the result. See omicron#2416. - self.datastore - .saga_update_state(id, update, self.sec_id, Generation::new()) - .await - .unwrap(); + info!(&log, "updating state"); + + // Add retries for this operation. saga_update_state is internally + // idempotent, so we can retry indefinitely until the event has been + // durably recorded. (But see the note in saga_update_state about how + // idempotence is enough for now, but may not be in the future.) + backoff_saga_operation( + &log, + || { + self.datastore + .saga_update_state( + id, + update, + self.sec_id, + Generation::new(), + ) + .map_err(backoff::BackoffError::transient) + }, + "updating saga state", + ) + .await } } + +/// Implements backoff retry logic for saga operations. +/// +/// In general, there are some kinds of database errors that are +/// temporary/server errors (e.g. network failures), and some that are +/// permanent/client errors (e.g. conflict during insertion). The permanent +/// ones would require operator intervention to fix. +/// +/// However, there is no way to bubble up errors from the SEC store, and for +/// good reason: it is inherent to the nature of sagas that progress is durably +/// recorded. So inside this code there is no option but to retry forever. +/// (Below, however, we do mark errors that likely require operator +/// intervention.) +/// +/// At a higher level, callers should plan for the fact saga execution could +/// potentially loop indefinitely while the datastore (or other dependent +/// services) are down. +async fn backoff_saga_operation(log: &Logger, op: F, description: &str) +where + F: Fn() -> Fut, + Fut: Future>>, +{ + backoff::retry_notify_ext( + // This is an internal service query to CockroachDB. + backoff::retry_policy_internal_service(), + op, + move |error, call_count, total_duration| { + let http_error = HttpError::from(error.clone()); + if http_error.status_code.is_client_error() { + error!( + &log, + "client error while {description} (likely \ + requires operator intervention), retrying anyway"; + "error" => &error, + "call_count" => call_count, + "total_duration" => ?total_duration, + ); + } else if total_duration > WARN_DURATION { + warn!( + &log, + "server error while {description}, retrying"; + "error" => &error, + "call_count" => call_count, + "total_duration" => ?total_duration, + ); + } else { + info!( + &log, + "server error while {description}, retrying"; + "error" => &error, + "call_count" => call_count, + "total_duration" => ?total_duration, + ); + } + }, + ) + .await + .expect("the above backoff retries forever") +} + +/// Threshold at which logs about server errors during retries switch from INFO +/// to WARN. +const WARN_DURATION: Duration = Duration::from_secs(20);