Skip to content

Commit

Permalink
[nexus-db-queries] make saga_update more resilient, record_event idem…
Browse files Browse the repository at this point in the history
…potent (#6113)

See discussion in #2416
and #6090 (comment).

A summary of the changes here:

1. Made `saga_create_event` idempotent. Previously, creating another
event that duplicated the one which already existed would fail -- now it
succeeds. These events are meant to be an append-only idempotent log, so
this is okay. Also added a test for this.
2. `saga_update_state` was already idempotent -- added a test which made
sure of this. Also added a comment about how idempotence may not be
enough in the future.
3. Added a retry loop around saga state updates, similar to the one
around recording events.
  • Loading branch information
sunshowers authored Jul 18, 2024
1 parent 2eb63b1 commit 708c288
Show file tree
Hide file tree
Showing 2 changed files with 268 additions and 88 deletions.
201 changes: 172 additions & 29 deletions nexus/db-queries/src/db/datastore/saga.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ impl DataStore {
// owning this saga.
diesel::insert_into(dsl::saga_node_event)
.values(event.clone())
// (saga_id, node_id, event_type) is the primary key, and this is
// expected to be idempotent.
//
// Consider the situation where a saga event gets recorded and
// committed, but there's a network reset which makes the client
// (us) believe that the event wasn't recorded. If we retry the
// event, we want to not fail with a conflict.
.on_conflict((dsl::saga_id, dsl::node_id, dsl::event_type))
.do_nothing()
.execute_async(&*self.pool_connection_unauthorized().await?)
.await
.map_err(|e| {
Expand All @@ -58,6 +67,28 @@ impl DataStore {
Ok(())
}

/// Update the state of a saga in the database.
///
/// This function is meant to be called in a loop, so that in the event of
/// network flakiness, the operation is retried until successful.
///
/// ## About conflicts
///
/// Currently, if the value of `saga_state` in the database is the same as
/// the value we're trying to set it to, the update will be a no-op. That
/// is okay, because at any time only one SEC will update the saga. (For
/// now, we're implementing saga adoption only in cases where the original
/// SEC/Nexus has been expunged.)
///
/// However, in the future, it may be possible for multiple SECs to try and
/// update the same saga, and overwrite each other's state. For example,
/// one SEC might try and update the state to Running while the other one
/// updates it to Done. That case would have to be carefully considered and
/// tested here, probably using the (currently unused)
/// `current_adopt_generation` field to enable optimistic concurrency.
///
/// To reiterate, we are *not* considering the case where several SECs try
/// to update the same saga. That will be a future enhancement.
pub async fn saga_update_state(
&self,
saga_id: steno::SagaId,
Expand Down Expand Up @@ -182,6 +213,7 @@ impl DataStore {
mod test {
use super::*;
use crate::db::datastore::test_utils::datastore_test;
use nexus_db_model::{SagaNodeEvent, SecId};
use nexus_test_utils::db::test_setup_database;
use omicron_test_utils::dev;
use rand::seq::SliceRandom;
Expand All @@ -195,20 +227,8 @@ mod test {
let mut db = test_setup_database(&logctx.log).await;
let (opctx, datastore) = datastore_test(&logctx, &db).await;
let sec_id = db::SecId(uuid::Uuid::new_v4());

// Create a couple batches of sagas.
let new_running_db_saga = || {
let params = steno::SagaCreateParams {
id: steno::SagaId(Uuid::new_v4()),
name: steno::SagaName::new("test saga"),
dag: serde_json::value::Value::Null,
state: steno::SagaCachedState::Running,
};

db::model::saga_types::Saga::new(sec_id, params)
};
let mut inserted_sagas = (0..SQL_BATCH_SIZE.get() * 2)
.map(|_| new_running_db_saga())
.map(|_| SagaTestContext::new(sec_id).new_running_db_saga())
.collect::<Vec<_>>();

// Shuffle these sagas into a random order to check that the pagination
Expand Down Expand Up @@ -263,30 +283,19 @@ mod test {
let logctx = dev::test_setup_log("test_list_unfinished_nodes");
let mut db = test_setup_database(&logctx.log).await;
let (opctx, datastore) = datastore_test(&logctx, &db).await;
let sec_id = db::SecId(uuid::Uuid::new_v4());
let saga_id = steno::SagaId(Uuid::new_v4());
let node_cx = SagaTestContext::new(SecId(Uuid::new_v4()));

// Create a couple batches of saga events
let new_db_saga_nodes =
|node_id: u32, event_type: steno::SagaNodeEventType| {
let event = steno::SagaNodeEvent {
saga_id,
node_id: steno::SagaNodeId::from(node_id),
event_type,
};

db::model::saga_types::SagaNodeEvent::new(event, sec_id)
};
let mut inserted_nodes = (0..SQL_BATCH_SIZE.get() * 2)
.flat_map(|i| {
// This isn't an exhaustive list of event types, but gives us a
// few options to pick from. Since this is a pagination key,
// it's important to include a variety here.
use steno::SagaNodeEventType::*;
[
new_db_saga_nodes(i, Started),
new_db_saga_nodes(i, UndoStarted),
new_db_saga_nodes(i, UndoFinished),
node_cx.new_db_event(i, Started),
node_cx.new_db_event(i, UndoStarted),
node_cx.new_db_event(i, UndoFinished),
]
})
.collect::<Vec<_>>();
Expand All @@ -311,7 +320,7 @@ mod test {
let observed_nodes = datastore
.saga_fetch_log_batched(
&opctx,
nexus_db_model::saga_types::SagaId::from(saga_id),
nexus_db_model::saga_types::SagaId::from(node_cx.saga_id),
)
.await
.expect("Failed to list nodes of unfinished saga");
Expand Down Expand Up @@ -366,4 +375,138 @@ mod test {
db.cleanup().await.unwrap();
logctx.cleanup_successful();
}

#[tokio::test]
async fn test_create_event_idempotent() {
// Test setup
let logctx = dev::test_setup_log("test_create_event_idempotent");
let mut db = test_setup_database(&logctx.log).await;
let (_, datastore) = datastore_test(&logctx, &db).await;
let node_cx = SagaTestContext::new(SecId(Uuid::new_v4()));

// Generate a bunch of events.
let inserted_nodes = (0..2)
.flat_map(|i| {
use steno::SagaNodeEventType::*;
[
node_cx.new_db_event(i, Started),
node_cx.new_db_event(i, UndoStarted),
node_cx.new_db_event(i, UndoFinished),
]
})
.collect::<Vec<_>>();

// Insert the events into the database.
for node in &inserted_nodes {
datastore
.saga_create_event(node)
.await
.expect("inserting first node events");
}

// Insert the events again into the database and ensure that we don't
// get a conflict.
for node in &inserted_nodes {
datastore
.saga_create_event(node)
.await
.expect("inserting duplicate node events");
}

// Test cleanup
db.cleanup().await.unwrap();
logctx.cleanup_successful();
}

#[tokio::test]
async fn test_update_state_idempotent() {
// Test setup
let logctx = dev::test_setup_log("test_create_event_idempotent");
let mut db = test_setup_database(&logctx.log).await;
let (_, datastore) = datastore_test(&logctx, &db).await;
let node_cx = SagaTestContext::new(SecId(Uuid::new_v4()));

// Create a saga in the running state.
let params = node_cx.new_running_db_saga();
datastore
.saga_create(&params)
.await
.expect("creating saga in Running state");

// Attempt to update its state to Running, which is a no-op -- this
// should be idempotent, so expect success.
datastore
.saga_update_state(
node_cx.saga_id,
steno::SagaCachedState::Running,
node_cx.sec_id,
db::model::Generation::new(),
)
.await
.expect("updating state to Running again");

// Update the state to Done.
datastore
.saga_update_state(
node_cx.saga_id,
steno::SagaCachedState::Done,
node_cx.sec_id,
db::model::Generation::new(),
)
.await
.expect("updating state to Done");

// Attempt to update its state to Done again, which is a no-op -- this
// should be idempotent, so expect success.
datastore
.saga_update_state(
node_cx.saga_id,
steno::SagaCachedState::Done,
node_cx.sec_id,
db::model::Generation::new(),
)
.await
.expect("updating state to Done again");

// Test cleanup
db.cleanup().await.unwrap();
logctx.cleanup_successful();
}

/// Helpers to create sagas.
struct SagaTestContext {
saga_id: steno::SagaId,
sec_id: SecId,
}

impl SagaTestContext {
fn new(sec_id: SecId) -> Self {
Self { saga_id: steno::SagaId(Uuid::new_v4()), sec_id }
}

fn new_running_db_saga(&self) -> db::model::saga_types::Saga {
let params = steno::SagaCreateParams {
id: self.saga_id,
name: steno::SagaName::new("test saga"),
dag: serde_json::value::Value::Null,
state: steno::SagaCachedState::Running,
};

db::model::saga_types::Saga::new(self.sec_id, params)
}

fn new_db_event(
&self,
node_id: u32,
event_type: steno::SagaNodeEventType,
) -> SagaNodeEvent {
let event = steno::SagaNodeEvent {
saga_id: self.saga_id,
node_id: steno::SagaNodeId::from(node_id),
event_type,
};

SagaNodeEvent::new(event, self.sec_id)
}
}
}
Loading

0 comments on commit 708c288

Please sign in to comment.