Skip to content

Commit

Permalink
re-assign sagas from expunged Nexus instances (#6215)
Browse files Browse the repository at this point in the history
  • Loading branch information
davepacheco authored Aug 16, 2024
1 parent c28455a commit 8e4ac4c
Show file tree
Hide file tree
Showing 8 changed files with 376 additions and 64 deletions.
249 changes: 225 additions & 24 deletions nexus/db-queries/src/db/datastore/saga.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@ use super::SQL_BATCH_SIZE;
use crate::db;
use crate::db::error::public_error_from_diesel;
use crate::db::error::ErrorHandler;
use crate::db::model::Generation;
use crate::db::pagination::paginated;
use crate::db::pagination::paginated_multicolumn;
use crate::db::pagination::Paginator;
use crate::db::update_and_check::UpdateAndCheck;
use crate::db::update_and_check::UpdateStatus;
use async_bb8_diesel::AsyncRunQueryDsl;
use diesel::prelude::*;
use nexus_auth::authz;
use nexus_auth::context::OpContext;
use omicron_common::api::external::Error;
use omicron_common::api::external::LookupType;
use omicron_common::api::external::ResourceType;
use std::ops::Add;

impl DataStore {
pub async fn saga_create(
Expand Down Expand Up @@ -80,29 +81,22 @@ impl DataStore {
/// now, we're implementing saga adoption only in cases where the original
/// SEC/Nexus has been expunged.)
///
/// However, in the future, it may be possible for multiple SECs to try and
/// update the same saga, and overwrite each other's state. For example,
/// one SEC might try and update the state to Running while the other one
/// updates it to Done. That case would have to be carefully considered and
/// tested here, probably using the (currently unused)
/// `current_adopt_generation` field to enable optimistic concurrency.
///
/// To reiterate, we are *not* considering the case where several SECs try
/// to update the same saga. That will be a future enhancement.
/// It's conceivable that multiple SECs do try to udpate the same saga
/// concurrently. That would be a bug. This is noticed and prevented by
/// making this query conditional on current_sec and failing with a conflict
/// if the current SEC has changed.
pub async fn saga_update_state(
&self,
saga_id: steno::SagaId,
new_state: steno::SagaCachedState,
current_sec: db::saga_types::SecId,
current_adopt_generation: Generation,
) -> Result<(), Error> {
use db::schema::saga::dsl;

let saga_id: db::saga_types::SagaId = saga_id.into();
let result = diesel::update(dsl::saga)
.filter(dsl::id.eq(saga_id))
.filter(dsl::current_sec.eq(current_sec))
.filter(dsl::adopt_generation.eq(current_adopt_generation))
.set(dsl::saga_state.eq(db::saga_types::SagaCachedState(new_state)))
.check_if_exists::<db::saga_types::Saga>(saga_id)
.execute_and_check(&*self.pool_connection_unauthorized().await?)
Expand All @@ -119,20 +113,19 @@ impl DataStore {

match result.status {
UpdateStatus::Updated => Ok(()),
UpdateStatus::NotUpdatedButExists => Err(Error::invalid_request(
format!(
"failed to update saga {:?} with state {:?}: preconditions not met: \
expected current_sec = {:?}, adopt_generation = {:?}, \
but found current_sec = {:?}, adopt_generation = {:?}, state = {:?}",
UpdateStatus::NotUpdatedButExists => {
Err(Error::invalid_request(format!(
"failed to update saga {:?} with state {:?}:\
preconditions not met: \
expected current_sec = {:?}, \
but found current_sec = {:?}, state = {:?}",
saga_id,
new_state,
current_sec,
current_adopt_generation,
result.found.current_sec,
result.found.adopt_generation,
result.found.saga_state,
)
)),
)))
}
}
}

Expand Down Expand Up @@ -207,16 +200,75 @@ impl DataStore {

Ok(events)
}

/// Updates all sagas that are currently assigned to any of the SEC ids in
/// `sec_ids`, assigning them to `new_sec_id` instead.
///
/// Generally, an SEC id corresponds to a Nexus id. This change causes the
/// Nexus instance `new_sec_id` to discover these sagas and resume executing
/// them the next time it performs saga recovery (which is normally on
/// startup and periodically). Generally, `new_sec_id` is the _current_
/// Nexus instance and the caller should activate the saga recovery
/// background task after calling this function to immediately resume the
/// newly-assigned sagas.
///
/// **Warning:** This operation is only safe if the other SECs `sec_ids` are
/// not currently running. If those SECs are still running, then two (or
/// more) SECs may wind up running the same saga concurrently. This would
/// likely violate implicit assumptions made by various saga actions,
/// leading to hard-to-debug errors and state corruption.
pub async fn sagas_reassign_sec(
&self,
opctx: &OpContext,
sec_ids: &[db::saga_types::SecId],
new_sec_id: db::saga_types::SecId,
) -> Result<usize, Error> {
opctx.authorize(authz::Action::Modify, &authz::FLEET).await?;

let now = chrono::Utc::now();
let conn = self.pool_connection_authorized(opctx).await?;

// It would be more robust to do this in batches. However, Diesel does
// not appear to support the UPDATE ... LIMIT syntax using the normal
// builder. In practice, it's extremely unlikely we'd have so many
// in-progress sagas that this would be a problem.
use db::schema::saga::dsl;
diesel::update(
dsl::saga
.filter(dsl::current_sec.is_not_null())
.filter(
dsl::current_sec.eq_any(
sec_ids.into_iter().cloned().collect::<Vec<_>>(),
),
)
.filter(dsl::saga_state.ne(db::saga_types::SagaCachedState(
steno::SagaCachedState::Done,
))),
)
.set((
dsl::current_sec.eq(Some(new_sec_id)),
dsl::adopt_generation.eq(dsl::adopt_generation.add(1)),
dsl::adopt_time.eq(now),
))
.execute_async(&*conn)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::db::datastore::test_utils::datastore_test;
use async_bb8_diesel::AsyncConnection;
use async_bb8_diesel::AsyncSimpleConnection;
use db::queries::ALLOW_FULL_TABLE_SCAN_SQL;
use nexus_db_model::{SagaNodeEvent, SecId};
use nexus_test_utils::db::test_setup_database;
use omicron_common::api::external::Generation;
use omicron_test_utils::dev;
use rand::seq::SliceRandom;
use std::collections::BTreeSet;
use uuid::Uuid;

// Tests pagination in listing sagas that are candidates for recovery
Expand Down Expand Up @@ -440,7 +492,6 @@ mod test {
node_cx.saga_id,
steno::SagaCachedState::Running,
node_cx.sec_id,
db::model::Generation::new(),
)
.await
.expect("updating state to Running again");
Expand All @@ -451,7 +502,6 @@ mod test {
node_cx.saga_id,
steno::SagaCachedState::Done,
node_cx.sec_id,
db::model::Generation::new(),
)
.await
.expect("updating state to Done");
Expand All @@ -463,7 +513,6 @@ mod test {
node_cx.saga_id,
steno::SagaCachedState::Done,
node_cx.sec_id,
db::model::Generation::new(),
)
.await
.expect("updating state to Done again");
Expand Down Expand Up @@ -509,4 +558,156 @@ mod test {
SagaNodeEvent::new(event, self.sec_id)
}
}

#[tokio::test]
async fn test_saga_reassignment() {
// Test setup
let logctx = dev::test_setup_log("test_saga_reassignment");
let mut db = test_setup_database(&logctx.log).await;
let (_, datastore) = datastore_test(&logctx, &db).await;
let opctx = OpContext::for_tests(logctx.log.clone(), datastore.clone());

// Populate the database with a few different sagas:
//
// - assigned to SEC A: done, running, and unwinding
// - assigned to SEC B: done, running, and unwinding
// - assigned to SEC C: done, running, and unwinding
// - assigned to SEC D: done, running, and unwinding
//
// Then we'll reassign SECs B's and C's sagas to SEC A and check exactly
// which sagas were changed by this. This exercises:
// - that we don't touch A's sagas (the one we're assigning *to*)
// - that we do touch both B's and C's sagas (the ones we're assigning
// *from*)
// - that we don't touch D's sagas (some other SEC)
// - that we don't touch any "done" sagas
// - that we do touch both running and unwinding sagas
let mut sagas_to_insert = Vec::new();
let sec_a = SecId(Uuid::new_v4());
let sec_b = SecId(Uuid::new_v4());
let sec_c = SecId(Uuid::new_v4());
let sec_d = SecId(Uuid::new_v4());

for sec_id in [sec_a, sec_b, sec_c, sec_d] {
for state in [
steno::SagaCachedState::Running,
steno::SagaCachedState::Unwinding,
steno::SagaCachedState::Done,
] {
let params = steno::SagaCreateParams {
id: steno::SagaId(Uuid::new_v4()),
name: steno::SagaName::new("tewst saga"),
dag: serde_json::value::Value::Null,
state,
};

sagas_to_insert
.push(db::model::saga_types::Saga::new(sec_id, params));
}
}
println!("sagas to insert: {:?}", sagas_to_insert);

// These two sets are complements, but we write out the conditions to
// double-check that we've got it right.
let sagas_affected: BTreeSet<_> = sagas_to_insert
.iter()
.filter_map(|saga| {
((saga.creator == sec_b || saga.creator == sec_c)
&& (saga.saga_state.0 == steno::SagaCachedState::Running
|| saga.saga_state.0
== steno::SagaCachedState::Unwinding))
.then(|| saga.id)
})
.collect();
let sagas_unaffected: BTreeSet<_> = sagas_to_insert
.iter()
.filter_map(|saga| {
(saga.creator == sec_a
|| saga.creator == sec_d
|| saga.saga_state.0 == steno::SagaCachedState::Done)
.then(|| saga.id)
})
.collect();
println!("sagas affected: {:?}", sagas_affected);
println!("sagas UNaffected: {:?}", sagas_unaffected);
assert_eq!(sagas_affected.intersection(&sagas_unaffected).count(), 0);
assert_eq!(
sagas_affected.len() + sagas_unaffected.len(),
sagas_to_insert.len()
);

// Insert the sagas.
let count = {
use db::schema::saga::dsl;
let conn = datastore.pool_connection_for_tests().await.unwrap();
diesel::insert_into(dsl::saga)
.values(sagas_to_insert)
.execute_async(&*conn)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
.expect("successful insertion")
};
assert_eq!(count, sagas_affected.len() + sagas_unaffected.len());

// Reassign uncompleted sagas from SECs B and C to SEC A.
let nreassigned = datastore
.sagas_reassign_sec(&opctx, &[sec_b, sec_c], sec_a)
.await
.expect("failed to re-assign sagas");

// Fetch all the sagas and check their states.
let all_sagas: Vec<_> = datastore
.pool_connection_for_tests()
.await
.unwrap()
.transaction_async(|conn| async move {
use db::schema::saga::dsl;
conn.batch_execute_async(ALLOW_FULL_TABLE_SCAN_SQL).await?;
dsl::saga
.select(nexus_db_model::Saga::as_select())
.load_async(&conn)
.await
})
.await
.unwrap();

for saga in all_sagas {
println!("checking saga: {:?}", saga);
let current_sec = saga.current_sec.unwrap();
if sagas_affected.contains(&saga.id) {
assert!(saga.creator == sec_b || saga.creator == sec_c);
assert_eq!(current_sec, sec_a);
assert_eq!(*saga.adopt_generation, Generation::from(2));
assert!(
saga.saga_state.0 == steno::SagaCachedState::Running
|| saga.saga_state.0
== steno::SagaCachedState::Unwinding
);
} else if sagas_unaffected.contains(&saga.id) {
assert_eq!(current_sec, saga.creator);
assert_eq!(*saga.adopt_generation, Generation::from(1));
// Its SEC and state could be anything since we've deliberately
// included sagas with various states and SECs that should not
// be affected by the reassignment.
} else {
println!(
"ignoring saga that was not created by this test: {:?}",
saga
);
}
}

assert_eq!(nreassigned, sagas_affected.len());

// If we do it again, we should make no changes.
let nreassigned = datastore
.sagas_reassign_sec(&opctx, &[sec_b, sec_c], sec_a)
.await
.expect("failed to re-assign sagas");
assert_eq!(nreassigned, 0);

// Test cleanup
db.cleanup().await.unwrap();
logctx.cleanup_successful();
}
}
9 changes: 2 additions & 7 deletions nexus/db-queries/src/db/sec_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

//! Implementation of [`steno::SecStore`] backed by Omicron's database
use crate::db::{self, model::Generation};
use crate::db;
use anyhow::Context;
use async_trait::async_trait;
use dropshot::HttpError;
Expand Down Expand Up @@ -102,12 +102,7 @@ impl steno::SecStore for CockroachDbSecStore {
&log,
|| {
self.datastore
.saga_update_state(
id,
update,
self.sec_id,
Generation::new(),
)
.saga_update_state(id, update, self.sec_id)
.map_err(backoff::BackoffError::transient)
},
"updating saga state",
Expand Down
3 changes: 2 additions & 1 deletion nexus/reconfigurator/execution/src/cockroachdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod test {
use nexus_test_utils_macros::nexus_test;
use nexus_types::deployment::CockroachDbClusterVersion;
use std::sync::Arc;
use uuid::Uuid;

type ControlPlaneTestContext =
nexus_test_utils::ControlPlaneTestContext<omicron_nexus::Server>;
Expand Down Expand Up @@ -101,7 +102,7 @@ mod test {
datastore,
resolver,
&blueprint,
"test-suite",
Uuid::new_v4(),
&overrides,
)
.await
Expand Down
Loading

0 comments on commit 8e4ac4c

Please sign in to comment.