Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[nexus] Make most transactions automatically retry #4487

Merged
merged 29 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
66100a5
WIP: First attempt at txn retry
smklein Nov 11, 2023
10a021d
Add transaction retry helper
smklein Nov 15, 2023
d77b58c
More conversions (tests passing)
smklein Nov 15, 2023
64d46a4
More conversions (collections, network, switch_port)
smklein Nov 15, 2023
10865b1
Clippy lints (tests still passing)
smklein Nov 15, 2023
49b4aa4
address_lot, saml_identity_provider, rack_set_initialized, snapshot, …
smklein Nov 15, 2023
04aa337
Rack setup not ready for retry (nested transactions)
smklein Nov 16, 2023
5b19bc5
Fix error propagation
smklein Nov 16, 2023
ad3bc08
bgp, db_metadata, volume converted
smklein Nov 16, 2023
a209651
switch_interface
smklein Nov 16, 2023
dbec9aa
Point to branch
smklein Nov 16, 2023
650af84
Merge branch 'main' into txn-retry
smklein Nov 16, 2023
b319f1c
Catch one more possibly retryable error
smklein Nov 16, 2023
aaeb8ab
Not sure how it worked without that, but added my rand dep
smklein Nov 16, 2023
4625d8a
review feedback
smklein Nov 17, 2023
761e046
Add tests for transaction_retry producing samples
smklein Nov 20, 2023
1e763dc
Merge branch 'main' into txn-retry
smklein Nov 29, 2023
180c872
Propagate more retry errors
smklein Dec 1, 2023
9ea030c
Merge branch 'main' into txn-retry
smklein Dec 1, 2023
da33017
Start refactoring RetryHelper to help callsites become simpler
smklein Dec 1, 2023
eb30977
RetryHelper refactor
smklein Dec 1, 2023
246ac8a
use retry wrapper everywhere, even for schema changes
smklein Dec 1, 2023
d6f7692
one more error cleanup
smklein Dec 1, 2023
7fff381
Remove config.xml from tests
smklein Dec 1, 2023
d362a9e
Merge branch 'main' into txn-retry
smklein Dec 2, 2023
a54925e
Make tests happier
smklein Dec 2, 2023
ec9f450
Add more automated retries
smklein Dec 4, 2023
538d48c
review feedback
smklein Dec 4, 2023
2058492
update async-bb8-diesel version
smklein Dec 6, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ api_identity = { path = "api_identity" }
approx = "0.5.1"
assert_matches = "1.5.0"
assert_cmd = "2.0.12"
async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "1446f7e0c1f05f33a0581abd51fa873c7652ab61" }
# TODO: Patch before merging
# async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", rev = "1446f7e0c1f05f33a0581abd51fa873c7652ab61" }
smklein marked this conversation as resolved.
Show resolved Hide resolved
async-bb8-diesel = { git = "https://github.com/oxidecomputer/async-bb8-diesel", branch = "txn-retry" }
async-trait = "0.1.74"
atomicwrites = "0.4.2"
authz-macros = { path = "nexus/authz-macros" }
Expand Down
2 changes: 1 addition & 1 deletion nexus/db-model/src/sled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl DatastoreCollectionConfig<super::Service> for Sled {
}

/// A set of constraints that can be placed on operations that select a sled.
#[derive(Debug)]
#[derive(Clone, Debug)]
smklein marked this conversation as resolved.
Show resolved Hide resolved
pub struct SledReservationConstraints {
must_select_from: Vec<Uuid>,
}
Expand Down
1 change: 1 addition & 0 deletions nexus/db-queries/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ oso.workspace = true
paste.workspace = true
# See omicron-rpaths for more about the "pq-sys" dependency.
pq-sys = "*"
rand.workspace = true
smklein marked this conversation as resolved.
Show resolved Hide resolved
ref-cast.workspace = true
samael.workspace = true
serde.workspace = true
Expand Down
27 changes: 7 additions & 20 deletions nexus/db-queries/src/db/collection_attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,12 +563,9 @@ where
#[cfg(test)]
mod test {
use super::*;
use crate::db::{
self, error::TransactionError, identity::Resource as IdentityResource,
};
use crate::db::{self, identity::Resource as IdentityResource};
use async_bb8_diesel::{
AsyncConnection, AsyncRunQueryDsl, AsyncSimpleConnection,
ConnectionManager,
AsyncRunQueryDsl, AsyncSimpleConnection, ConnectionManager,
};
use chrono::Utc;
use db_macros::Resource;
Expand Down Expand Up @@ -999,22 +996,12 @@ mod test {
.set(resource::dsl::collection_id.eq(collection_id)),
);

type TxnError =
smklein marked this conversation as resolved.
Show resolved Hide resolved
TransactionError<AttachError<Resource, Collection, DieselError>>;
let result = conn
.transaction_async(|conn| async move {
attach_query.attach_and_get_result_async(&conn).await.map_err(
|e| match e {
AttachError::DatabaseError(e) => TxnError::from(e),
e => TxnError::CustomError(e),
},
)
})
.await;

// "attach_and_get_result" should return the "attached" resource.
let (returned_collection, returned_resource) =
result.expect("Attach should have worked");
let (returned_collection, returned_resource) = attach_query
.attach_and_get_result_async(&conn)
.await
.expect("Attach should have worked");

assert_eq!(
returned_resource.collection_id.expect("Expected a collection ID"),
collection_id
Expand Down
26 changes: 7 additions & 19 deletions nexus/db-queries/src/db/collection_detach_many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,12 +479,9 @@ where
mod test {
use super::*;
use crate::db::collection_attach::DatastoreAttachTarget;
use crate::db::{
self, error::TransactionError, identity::Resource as IdentityResource,
};
use crate::db::{self, identity::Resource as IdentityResource};
use async_bb8_diesel::{
AsyncConnection, AsyncRunQueryDsl, AsyncSimpleConnection,
ConnectionManager,
AsyncRunQueryDsl, AsyncSimpleConnection, ConnectionManager,
};
use chrono::Utc;
use db_macros::Resource;
Expand Down Expand Up @@ -919,21 +916,12 @@ mod test {
.set(resource::dsl::collection_id.eq(Option::<Uuid>::None)),
);

type TxnError =
TransactionError<DetachManyError<Collection, DieselError>>;
let result = conn
.transaction_async(|conn| async move {
smklein marked this conversation as resolved.
Show resolved Hide resolved
detach_query.detach_and_get_result_async(&conn).await.map_err(
|e| match e {
DetachManyError::DatabaseError(e) => TxnError::from(e),
e => TxnError::CustomError(e),
},
)
})
.await;

// "detach_and_get_result" should return the "detached" resource.
let returned_collection = result.expect("Detach should have worked");
let returned_collection = detach_query
.detach_and_get_result_async(&conn)
.await
.expect("Detach should have worked");

// The returned values should be the latest value in the DB.
assert_eq!(
returned_collection,
Expand Down
158 changes: 92 additions & 66 deletions nexus/db-queries/src/db/datastore/address_lot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::db::error::TransactionError;
use crate::db::model::Name;
use crate::db::model::{AddressLot, AddressLotBlock, AddressLotReservedBlock};
use crate::db::pagination::paginated;
use crate::transaction_retry::RetryHelper;
use async_bb8_diesel::{AsyncConnection, AsyncRunQueryDsl, Connection};
use chrono::Utc;
use diesel::result::Error as DieselError;
Expand All @@ -28,6 +29,7 @@ use omicron_common::api::external::{
};
use ref_cast::RefCast;
use serde::{Deserialize, Serialize};
use std::sync::{Arc, OnceLock};
use uuid::Uuid;

#[derive(Clone, Debug, Deserialize, Serialize)]
Expand All @@ -45,51 +47,61 @@ impl DataStore {
use db::schema::address_lot::dsl as lot_dsl;
use db::schema::address_lot_block::dsl as block_dsl;

let retry_helper = RetryHelper::new(
&self.transaction_retry_producer,
"address_lot_create",
);
self.pool_connection_authorized(opctx)
.await?
// TODO https://github.com/oxidecomputer/omicron/issues/2811
// Audit external networking database transaction usage
.transaction_async(|conn| async move {
let lot = AddressLot::new(&params.identity, params.kind.into());

let db_lot: AddressLot =
diesel::insert_into(lot_dsl::address_lot)
.values(lot)
.returning(AddressLot::as_returning())
.get_result_async(&conn)
.await?;

let blocks: Vec<AddressLotBlock> = params
.blocks
.iter()
.map(|b| {
AddressLotBlock::new(
db_lot.id(),
b.first_address.into(),
b.last_address.into(),
)
.transaction_async_with_retry(
luqmana marked this conversation as resolved.
Show resolved Hide resolved
|conn| async move {
let lot =
AddressLot::new(&params.identity, params.kind.into());

let db_lot: AddressLot =
diesel::insert_into(lot_dsl::address_lot)
.values(lot)
.returning(AddressLot::as_returning())
.get_result_async(&conn)
.await?;

let blocks: Vec<AddressLotBlock> = params
.blocks
.iter()
.map(|b| {
AddressLotBlock::new(
db_lot.id(),
b.first_address.into(),
b.last_address.into(),
)
})
.collect();

let db_blocks =
diesel::insert_into(block_dsl::address_lot_block)
.values(blocks)
.returning(AddressLotBlock::as_returning())
.get_results_async(&conn)
.await?;

Ok(AddressLotCreateResult {
lot: db_lot,
blocks: db_blocks,
})
.collect();

let db_blocks =
diesel::insert_into(block_dsl::address_lot_block)
.values(blocks)
.returning(AddressLotBlock::as_returning())
.get_results_async(&conn)
.await?;

Ok(AddressLotCreateResult { lot: db_lot, blocks: db_blocks })
})
},
retry_helper.as_callback(),
)
.await
.map_err(|e| match e {
DieselError::DatabaseError(_, _) => public_error_from_diesel(
.map_err(|e| {
public_error_from_diesel(
e,
ErrorHandler::Conflict(
ResourceType::AddressLot,
&params.identity.name.as_str(),
),
),
_ => public_error_from_diesel(e, ErrorHandler::Server),
)
})
}

Expand All @@ -113,46 +125,60 @@ impl DataStore {
LotInUse,
}

type TxnError = TransactionError<AddressLotDeleteError>;
let err = Arc::new(OnceLock::new());
let retry_helper = RetryHelper::new(
&self.transaction_retry_producer,
"address_lot_delete",
);

// TODO https://github.com/oxidecomputer/omicron/issues/2811
// Audit external networking database transaction usage
conn.transaction_async(|conn| async move {
let rsvd: Vec<AddressLotReservedBlock> =
rsvd_block_dsl::address_lot_rsvd_block
.filter(rsvd_block_dsl::address_lot_id.eq(id))
.select(AddressLotReservedBlock::as_select())
.limit(1)
.load_async(&conn)
.await?;

if !rsvd.is_empty() {
Err(TxnError::CustomError(AddressLotDeleteError::LotInUse))?;
}

let now = Utc::now();
diesel::update(lot_dsl::address_lot)
.filter(lot_dsl::time_deleted.is_null())
.filter(lot_dsl::id.eq(id))
.set(lot_dsl::time_deleted.eq(now))
.execute_async(&conn)
.await?;
conn.transaction_async_with_retry(
|conn| {
let err = err.clone();
smklein marked this conversation as resolved.
Show resolved Hide resolved
async move {
let rsvd: Vec<AddressLotReservedBlock> =
rsvd_block_dsl::address_lot_rsvd_block
.filter(rsvd_block_dsl::address_lot_id.eq(id))
.select(AddressLotReservedBlock::as_select())
.limit(1)
.load_async(&conn)
.await?;

if !rsvd.is_empty() {
err.set(AddressLotDeleteError::LotInUse).unwrap();
return Err(DieselError::RollbackTransaction);
}

let now = Utc::now();
diesel::update(lot_dsl::address_lot)
.filter(lot_dsl::time_deleted.is_null())
.filter(lot_dsl::id.eq(id))
.set(lot_dsl::time_deleted.eq(now))
.execute_async(&conn)
.await?;

diesel::delete(block_dsl::address_lot_block)
.filter(block_dsl::address_lot_id.eq(id))
.execute_async(&conn)
.await?;
diesel::delete(block_dsl::address_lot_block)
.filter(block_dsl::address_lot_id.eq(id))
.execute_async(&conn)
.await?;

Ok(())
})
Ok(())
}
},
retry_helper.as_callback(),
)
.await
.map_err(|e| match e {
TxnError::Database(e) => {
.map_err(|e| {
if let Some(err) = err.get() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... Following up from my aforementioned comment -- this changes the error handling, too. We'll always return a Diesel error from transaction_async_with_retry, but custom errors (if any are set) would be returned through err. That's why, if we use them, we'd check them here, instead of match-ing on a CustomError.

match err {
AddressLotDeleteError::LotInUse => {
Error::invalid_request("lot is in use")
}
}
} else {
public_error_from_diesel(e, ErrorHandler::Server)
}
TxnError::CustomError(AddressLotDeleteError::LotInUse) => {
Error::invalid_request("lot is in use")
}
})
}

Expand Down
Loading
Loading