From 666caa83050fb80dffd5e3640ad8b1fa978b3d96 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Thu, 25 May 2023 13:37:30 -0400 Subject: [PATCH 1/6] Retry progenitor client calls in sagas Generalize the retry_until_known_result macro, and wrap progenitor client calls from saga nodes. This will retry in the face of transient errors, and reduce 1) the times that sagas fail due to network weather, and 2) the times that saga unwinds fail for the same reason. --- Cargo.lock | 1 + nexus/Cargo.toml | 1 + nexus/src/app/sagas/common_storage.rs | 82 +++------------- nexus/src/app/sagas/finalize_disk.rs | 19 +--- nexus/src/app/sagas/import_blocks_from_url.rs | 33 ++++++- nexus/src/app/sagas/instance_create.rs | 14 +-- nexus/src/app/sagas/instance_delete.rs | 13 +-- .../src/app/sagas/loopback_address_create.rs | 15 +-- .../src/app/sagas/loopback_address_delete.rs | 11 ++- nexus/src/app/sagas/mod.rs | 48 ++++++++++ nexus/src/app/sagas/snapshot_create.rs | 95 +++++++++---------- .../app/sagas/switch_port_settings_apply.rs | 27 +++--- .../app/sagas/switch_port_settings_clear.rs | 19 ++-- 13 files changed, 198 insertions(+), 180 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 54005f2493..3e9157cadd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4599,6 +4599,7 @@ dependencies = [ "pem", "petgraph", "pq-sys", + "progenitor-client", "propolis-client", "rand 0.8.5", "rcgen", diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index 49de7cea8a..5b975b8df5 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -51,6 +51,7 @@ parse-display.workspace = true paste.workspace = true # See omicron-rpaths for more about the "pq-sys" dependency. pq-sys = "*" +progenitor-client.workspace = true propolis-client.workspace = true rand.workspace = true ref-cast.workspace = true diff --git a/nexus/src/app/sagas/common_storage.rs b/nexus/src/app/sagas/common_storage.rs index 4c5853d916..5698d6afce 100644 --- a/nexus/src/app/sagas/common_storage.rs +++ b/nexus/src/app/sagas/common_storage.rs @@ -10,6 +10,7 @@ use crate::authz; use crate::db; use crate::db::identity::Asset; use crate::db::lookup::LookupPath; +use crate::retry_until_known_result; use crate::Nexus; use anyhow::anyhow; use crucible_agent_client::{ @@ -281,73 +282,6 @@ pub async fn get_pantry_address( // Common Pantry operations -#[macro_export] -macro_rules! retry_until_known_result { - ( $log:ident, $func:block ) => {{ - use omicron_common::backoff; - - #[derive(Debug, thiserror::Error)] - enum InnerError { - #[error("Reqwest error: {0}")] - Reqwest(#[from] reqwest::Error), - - #[error("Pantry client error: {0}")] - PantryClient( - #[from] - crucible_pantry_client::Error< - crucible_pantry_client::types::Error, - >, - ), - } - - backoff::retry_notify( - backoff::retry_policy_internal_service(), - || async { - match ($func).await { - Err(crucible_pantry_client::Error::CommunicationError( - e, - )) => { - warn!( - $log, - "saw transient communication error {}, retrying...", - e, - ); - - Err(backoff::BackoffError::::transient( - e.into(), - )) - } - - Err(e) => { - warn!($log, "saw permanent error {}, aborting", e,); - - Err(backoff::BackoffError::::Permanent( - e.into(), - )) - } - - Ok(v) => Ok(v), - } - }, - |error: InnerError, delay| { - warn!( - $log, - "failed external call ({:?}), will retry in {:?}", - error, - delay, - ); - }, - ) - .await - .map_err(|e| { - ActionError::action_failed(format!( - "gave up on external call due to {:?}", - e - )) - }) - }}; -} - pub async fn call_pantry_attach_for_disk( log: &slog::Logger, opctx: &OpContext, @@ -394,6 +328,12 @@ pub async fn call_pantry_attach_for_disk( retry_until_known_result!(log, { client.attach(&disk_id.to_string(), &attach_request) + }) + .map_err(|e| { + ActionError::action_failed(format!( + "pantry attach failed with {:?}", + e + )) })?; Ok(()) @@ -410,7 +350,13 @@ pub async fn call_pantry_detach_for_disk( let client = crucible_pantry_client::Client::new(&endpoint); - retry_until_known_result!(log, { client.detach(&disk_id.to_string()) })?; + retry_until_known_result!(log, { client.detach(&disk_id.to_string()) }) + .map_err(|e| { + ActionError::action_failed(format!( + "pantry detach failed with {:?}", + e + )) + })?; Ok(()) } diff --git a/nexus/src/app/sagas/finalize_disk.rs b/nexus/src/app/sagas/finalize_disk.rs index f8ddff87ba..a26806c6b5 100644 --- a/nexus/src/app/sagas/finalize_disk.rs +++ b/nexus/src/app/sagas/finalize_disk.rs @@ -10,10 +10,10 @@ use super::ActionRegistry; use super::NexusActionContext; use super::NexusSaga; use super::SagaInitError; +use crate::app::sagas::common_storage::call_pantry_detach_for_disk; use crate::app::sagas::snapshot_create; use crate::db::lookup::LookupPath; use crate::external_api::params; -use crate::retry_until_known_result; use crate::{authn, authz}; use nexus_db_model::Generation; use omicron_common::api::external; @@ -284,24 +284,9 @@ async fn sfd_call_pantry_detach_for_disk( ) -> Result<(), ActionError> { let log = sagactx.user_data().log(); let params = sagactx.saga_params::()?; - let pantry_address = sagactx.lookup::("pantry_address")?; - let endpoint = format!("http://{}", pantry_address); - - info!( - log, - "sending detach request for disk {} to pantry endpoint {}", - params.disk_id, - endpoint, - ); - - let disk_id = params.disk_id.to_string(); - let client = crucible_pantry_client::Client::new(&endpoint); - - retry_until_known_result!(log, { client.detach(&disk_id) })?; - - Ok(()) + call_pantry_detach_for_disk(&log, params.disk_id, pantry_address).await } async fn sfd_clear_pantry_address( diff --git a/nexus/src/app/sagas/import_blocks_from_url.rs b/nexus/src/app/sagas/import_blocks_from_url.rs index a623a4636f..47360a0975 100644 --- a/nexus/src/app/sagas/import_blocks_from_url.rs +++ b/nexus/src/app/sagas/import_blocks_from_url.rs @@ -274,6 +274,12 @@ async fn sibfu_call_pantry_import_from_url_for_disk( let response = retry_until_known_result!(log, { client.import_from_url(&disk_id, &request) + }) + .map_err(|e| { + ActionError::action_failed(format!( + "import from url failed with {:?}", + e + )) })?; Ok(response.job_id.clone()) @@ -301,7 +307,20 @@ async fn sibfu_wait_for_import_from_url( endpoint, ); - while !client.is_job_finished(&job_id).await.unwrap().job_is_finished { + loop { + let result = + retry_until_known_result!(log, { client.is_job_finished(&job_id) }) + .map_err(|e| { + ActionError::action_failed(format!( + "is_job_finished failed with {:?}", + e + )) + })?; + + if result.job_is_finished { + break; + } + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } @@ -313,10 +332,14 @@ async fn sibfu_wait_for_import_from_url( endpoint, ); - let response = client - .job_result_ok(&job_id) - .await - .map_err(|e| ActionError::action_failed(e.to_string()))?; + let response = + retry_until_known_result!(log, { client.job_result_ok(&job_id) }) + .map_err(|e| { + ActionError::action_failed(format!( + "job_result_ok failed with {:?}", + e + )) + })?; if !response.job_result_ok { return Err(ActionError::action_failed(format!("Job {job_id} failed"))); diff --git a/nexus/src/app/sagas/instance_create.rs b/nexus/src/app/sagas/instance_create.rs index e160218a5f..f137823bfe 100644 --- a/nexus/src/app/sagas/instance_create.rs +++ b/nexus/src/app/sagas/instance_create.rs @@ -15,6 +15,7 @@ use crate::db::lookup::LookupPath; use crate::db::model::ByteCount as DbByteCount; use crate::db::queries::network_interface::InsertError as InsertNicError; use crate::external_api::params; +use crate::retry_until_known_result; use crate::{authn, authz, db}; use chrono::Utc; use nexus_db_model::NetworkInterfaceKind; @@ -437,16 +438,17 @@ async fn sic_remove_network_config( let result = match target_ip.ip { ipnetwork::IpNetwork::V4(network) => { - dpd_client - .nat_ipv4_delete(&network.ip(), *target_ip.first_port) - .await + retry_until_known_result!(log, { + dpd_client.nat_ipv4_delete(&network.ip(), *target_ip.first_port) + }) } ipnetwork::IpNetwork::V6(network) => { - dpd_client - .nat_ipv6_delete(&network.ip(), *target_ip.first_port) - .await + retry_until_known_result!(log, { + dpd_client.nat_ipv6_delete(&network.ip(), *target_ip.first_port) + }) } }; + match result { Ok(_) => { debug!(log, "deletion of nat entry successful for: {target_ip:#?}"); diff --git a/nexus/src/app/sagas/instance_delete.rs b/nexus/src/app/sagas/instance_delete.rs index cd6dc86181..ec4b9ece36 100644 --- a/nexus/src/app/sagas/instance_delete.rs +++ b/nexus/src/app/sagas/instance_delete.rs @@ -8,6 +8,7 @@ use super::NexusSaga; use crate::app::sagas::declare_saga_actions; use crate::db; use crate::db::lookup::LookupPath; +use crate::retry_until_known_result; use crate::{authn, authz}; use nexus_types::identity::Resource; use omicron_common::api::external::{Error, ResourceType}; @@ -167,14 +168,14 @@ async fn sid_delete_network_config( debug!(log, "deleting nat mapping for entry: {entry:#?}"); let result = match entry.ip { ipnetwork::IpNetwork::V4(network) => { - dpd_client - .nat_ipv4_delete(&network.ip(), *entry.first_port) - .await + retry_until_known_result!(log, { + dpd_client.nat_ipv4_delete(&network.ip(), *entry.first_port) + }) } ipnetwork::IpNetwork::V6(network) => { - dpd_client - .nat_ipv6_delete(&network.ip(), *entry.first_port) - .await + retry_until_known_result!(log, { + dpd_client.nat_ipv6_delete(&network.ip(), *entry.first_port) + }) } }; diff --git a/nexus/src/app/sagas/loopback_address_create.rs b/nexus/src/app/sagas/loopback_address_create.rs index 7f698f3bda..2d4e1a6792 100644 --- a/nexus/src/app/sagas/loopback_address_create.rs +++ b/nexus/src/app/sagas/loopback_address_create.rs @@ -10,6 +10,7 @@ use crate::authn; use crate::authz; use crate::db::model::LoopbackAddress; use crate::external_api::params; +use crate::retry_until_known_result; use anyhow::Error; use dpd_client::types::{Ipv4Entry, Ipv6Entry}; use serde::{Deserialize, Serialize}; @@ -135,10 +136,10 @@ async fn slc_loopback_address_create( ) -> Result<(), ActionError> { let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; + let log = sagactx.user_data().log(); // TODO: https://github.com/oxidecomputer/omicron/issues/2629 if let Ok(_) = std::env::var("SKIP_ASIC_CONFIG") { - let log = sagactx.user_data().log(); debug!(log, "SKIP_ASIC_CONFIG is set, disabling calls to dendrite"); return Ok(()); }; @@ -151,20 +152,20 @@ async fn slc_loopback_address_create( let result = match ¶ms.loopback_address.address { IpAddr::V4(a) => { - dpd_client - .loopback_ipv4_create(&Ipv4Entry { + retry_until_known_result!(log, { + dpd_client.loopback_ipv4_create(&Ipv4Entry { addr: *a, tag: NEXUS_DPD_TAG.into(), }) - .await + }) } IpAddr::V6(a) => { - dpd_client - .loopback_ipv6_create(&Ipv6Entry { + retry_until_known_result!(log, { + dpd_client.loopback_ipv6_create(&Ipv6Entry { addr: *a, tag: NEXUS_DPD_TAG.into(), }) - .await + }) } }; diff --git a/nexus/src/app/sagas/loopback_address_delete.rs b/nexus/src/app/sagas/loopback_address_delete.rs index 4790401034..3d0ef2099d 100644 --- a/nexus/src/app/sagas/loopback_address_delete.rs +++ b/nexus/src/app/sagas/loopback_address_delete.rs @@ -10,6 +10,7 @@ use crate::authn; use crate::authz; use crate::db::model::{LoopbackAddress, Name}; use crate::external_api::params; +use crate::retry_until_known_result; use anyhow::{anyhow, Error}; use nexus_types::identity::Asset; use omicron_common::api::external::{IpNet, NameOrId}; @@ -162,10 +163,10 @@ async fn slc_loopback_address_delete( ) -> Result<(), ActionError> { let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; + let log = sagactx.user_data().log(); // TODO: https://github.com/oxidecomputer/omicron/issues/2629 if let Ok(_) = std::env::var("SKIP_ASIC_CONFIG") { - let log = sagactx.user_data().log(); debug!(log, "SKIP_ASIC_CONFIG is set, disabling calls to dendrite"); return Ok(()); }; @@ -177,8 +178,12 @@ async fn slc_loopback_address_delete( Arc::clone(&osagactx.nexus().dpd_client); match ¶ms.address { - IpNet::V4(a) => dpd_client.loopback_ipv4_delete(&a.ip()).await, - IpNet::V6(a) => dpd_client.loopback_ipv6_delete(&a.ip()).await, + IpNet::V4(a) => retry_until_known_result!(log, { + dpd_client.loopback_ipv4_delete(&a.ip()) + }), + IpNet::V6(a) => retry_until_known_result!(log, { + dpd_client.loopback_ipv6_delete(&a.ip()) + }), } .map_err(|e| ActionError::action_failed(e.to_string()))?; diff --git a/nexus/src/app/sagas/mod.rs b/nexus/src/app/sagas/mod.rs index 84c20e5196..7dce4a6c36 100644 --- a/nexus/src/app/sagas/mod.rs +++ b/nexus/src/app/sagas/mod.rs @@ -304,3 +304,51 @@ pub(crate) use __action_name; pub(crate) use __emit_action; pub(crate) use __stringify_ident; pub(crate) use declare_saga_actions; + +/// Retry a progenitor client operation until a known result is returned. +/// +/// Saga execution relies on the outcome of an external call being known: since +/// they are idempotent, reissue the external call until a known result comes +/// back. Retry if a communication error is seen. +#[macro_export] +macro_rules! retry_until_known_result { + ( $log:ident, $func:block ) => {{ + use omicron_common::backoff; + + backoff::retry_notify( + backoff::retry_policy_internal_service(), + || async { + match ($func).await { + Err(progenitor_client::Error::CommunicationError(e)) => { + warn!( + $log, + "saw transient communication error {}, retrying...", + e, + ); + + Err(backoff::BackoffError::transient( + progenitor_client::Error::CommunicationError(e), + )) + } + + Err(e) => { + warn!($log, "saw permanent error {}, aborting", e,); + + Err(backoff::BackoffError::Permanent(e)) + } + + Ok(v) => Ok(v), + } + }, + |error: progenitor_client::Error<_>, delay| { + warn!( + $log, + "failed external call ({:?}), will retry in {:?}", + error, + delay, + ); + }, + ) + .await + }}; +} diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs index ddd32fad1b..c92edc487a 100644 --- a/nexus/src/app/sagas/snapshot_create.rs +++ b/nexus/src/app/sagas/snapshot_create.rs @@ -101,6 +101,7 @@ use crate::app::sagas::declare_saga_actions; use crate::db::identity::{Asset, Resource}; use crate::db::lookup::LookupPath; use crate::external_api::params; +use crate::retry_until_known_result; use crate::{authn, authz, db}; use anyhow::anyhow; use crucible_agent_client::{types::RegionId, Client as CrucibleAgentClient}; @@ -680,15 +681,15 @@ async fn ssc_send_snapshot_request_to_sled_agent( info!(log, "instance {} sled agent created ok", instance_id); // Send a snapshot request to propolis through sled agent - sled_agent_client - .instance_issue_disk_snapshot_request( + retry_until_known_result!(log, { + sled_agent_client.instance_issue_disk_snapshot_request( &instance.id(), &disk.id(), &InstanceIssueDiskSnapshotRequestBody { snapshot_id }, ) - .await - .map_err(|e| e.to_string()) - .map_err(ActionError::action_failed)?; + }) + .map_err(|e| e.to_string()) + .map_err(ActionError::action_failed)?; Ok(()) } @@ -733,12 +734,12 @@ async fn ssc_send_snapshot_request_to_sled_agent_undo( let url = format!("http://{}", dataset.address()); let client = CrucibleAgentClient::new(&url); - client - .region_delete_snapshot( + retry_until_known_result!(log, { + client.region_delete_snapshot( &RegionId(region.id().to_string()), &snapshot_id.to_string(), ) - .await?; + })?; } Ok(()) } @@ -1006,17 +1007,17 @@ async fn ssc_call_pantry_snapshot_for_disk( let client = crucible_pantry_client::Client::new(&endpoint); - client - .snapshot( + retry_until_known_result!(log, { + client.snapshot( ¶ms.disk_id.to_string(), &crucible_pantry_client::types::SnapshotRequest { snapshot_id: snapshot_id.to_string(), }, ) - .await - .map_err(|e| { - ActionError::action_failed(Error::internal_error(&e.to_string())) - })?; + }) + .map_err(|e| { + ActionError::action_failed(Error::internal_error(&e.to_string())) + })?; Ok(()) } @@ -1049,12 +1050,12 @@ async fn ssc_call_pantry_snapshot_for_disk_undo( let url = format!("http://{}", dataset.address()); let client = CrucibleAgentClient::new(&url); - client - .region_delete_snapshot( + retry_until_known_result!(log, { + client.region_delete_snapshot( &RegionId(region.id().to_string()), &snapshot_id.to_string(), ) - .await?; + })?; } Ok(()) } @@ -1196,34 +1197,34 @@ async fn ssc_start_running_snapshot( info!(log, "dataset {:?} region {:?} url {}", dataset, region, url); // Validate with the Crucible agent that the snapshot exists - let crucible_region = client - .region_get(&RegionId(region.id().to_string())) - .await - .map_err(|e| e.to_string()) - .map_err(ActionError::action_failed)?; + let crucible_region = retry_until_known_result!(log, { + client.region_get(&RegionId(region.id().to_string())) + }) + .map_err(|e| e.to_string()) + .map_err(ActionError::action_failed)?; info!(log, "crucible region {:?}", crucible_region); - let crucible_snapshot = client - .region_get_snapshot( + let crucible_snapshot = retry_until_known_result!(log, { + client.region_get_snapshot( &RegionId(region.id().to_string()), &snapshot_id.to_string(), ) - .await - .map_err(|e| e.to_string()) - .map_err(ActionError::action_failed)?; + }) + .map_err(|e| e.to_string()) + .map_err(ActionError::action_failed)?; info!(log, "crucible snapshot {:?}", crucible_snapshot); // Start the snapshot running - let crucible_running_snapshot = client - .region_run_snapshot( + let crucible_running_snapshot = retry_until_known_result!(log, { + client.region_run_snapshot( &RegionId(region.id().to_string()), &snapshot_id.to_string(), ) - .await - .map_err(|e| e.to_string()) - .map_err(ActionError::action_failed)?; + }) + .map_err(|e| e.to_string()) + .map_err(ActionError::action_failed)?; info!(log, "crucible running snapshot {:?}", crucible_running_snapshot); @@ -1288,25 +1289,23 @@ async fn ssc_start_running_snapshot_undo( use crucible_agent_client::Error::ErrorResponse; use http::status::StatusCode; - client - .region_delete_running_snapshot( + retry_until_known_result!(log, { + client.region_delete_running_snapshot( &RegionId(region.id().to_string()), &snapshot_id.to_string(), ) - .await - .map(|_| ()) - // NOTE: If we later create a volume record and delete it, the - // running snapshot may be deleted (see: - // ssc_create_volume_record_undo). - // - // To cope, we treat "running snapshot not found" as "Ok", since it - // may just be the result of the volume deletion steps completing. - .or_else(|err| match err { - ErrorResponse(r) if r.status() == StatusCode::NOT_FOUND => { - Ok(()) - } - _ => Err(err), - })?; + }) + .map(|_| ()) + // NOTE: If we later create a volume record and delete it, the + // running snapshot may be deleted (see: + // ssc_create_volume_record_undo). + // + // To cope, we treat "running snapshot not found" as "Ok", since it + // may just be the result of the volume deletion steps completing. + .or_else(|err| match err { + ErrorResponse(r) if r.status() == StatusCode::NOT_FOUND => Ok(()), + _ => Err(err), + })?; osagactx .datastore() .region_snapshot_remove(dataset.id(), region.id(), snapshot_id) diff --git a/nexus/src/app/sagas/switch_port_settings_apply.rs b/nexus/src/app/sagas/switch_port_settings_apply.rs index 0e94618afc..b3b82f6e1e 100644 --- a/nexus/src/app/sagas/switch_port_settings_apply.rs +++ b/nexus/src/app/sagas/switch_port_settings_apply.rs @@ -7,6 +7,7 @@ use crate::app::sagas::{ declare_saga_actions, ActionRegistry, NexusSaga, SagaInitError, }; use crate::db::datastore::UpdatePrecondition; +use crate::retry_until_known_result; use crate::{authn, db}; use anyhow::Error; use db::datastore::SwitchPortSettingsCombinedResult; @@ -200,6 +201,7 @@ async fn spa_ensure_switch_port_settings( ) -> Result<(), ActionError> { let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; + let log = sagactx.user_data().log(); let settings = sagactx .lookup::("switch_port_settings")?; @@ -213,10 +215,10 @@ async fn spa_ensure_switch_port_settings( let dpd_port_settings = api_to_dpd_port_settings(&settings) .map_err(ActionError::action_failed)?; - dpd_client - .port_settings_apply(&port_id, &dpd_port_settings) - .await - .map_err(|e| ActionError::action_failed(e.to_string()))?; + retry_until_known_result!(log, { + dpd_client.port_settings_apply(&port_id, &dpd_port_settings) + }) + .map_err(|e| ActionError::action_failed(e.to_string()))?; Ok(()) } @@ -231,6 +233,7 @@ async fn spa_undo_ensure_switch_port_settings( &sagactx, ¶ms.serialized_authn, ); + let log = sagactx.user_data().log(); let port_id: PortId = PortId::from_str(¶ms.switch_port_name) .map_err(|e| external::Error::internal_error(e))?; @@ -245,10 +248,10 @@ async fn spa_undo_ensure_switch_port_settings( let id = match orig_port_settings_id { Some(id) => id, None => { - dpd_client - .port_settings_clear(&port_id) - .await - .map_err(|e| external::Error::internal_error(&e.to_string()))?; + retry_until_known_result!(log, { + dpd_client.port_settings_clear(&port_id) + }) + .map_err(|e| external::Error::internal_error(&e.to_string()))?; return Ok(()); } @@ -262,10 +265,10 @@ async fn spa_undo_ensure_switch_port_settings( let dpd_port_settings = api_to_dpd_port_settings(&settings) .map_err(ActionError::action_failed)?; - dpd_client - .port_settings_apply(&port_id, &dpd_port_settings) - .await - .map_err(|e| external::Error::internal_error(&e.to_string()))?; + retry_until_known_result!(log, { + dpd_client.port_settings_apply(&port_id, &dpd_port_settings) + }) + .map_err(|e| external::Error::internal_error(&e.to_string()))?; Ok(()) } diff --git a/nexus/src/app/sagas/switch_port_settings_clear.rs b/nexus/src/app/sagas/switch_port_settings_clear.rs index 7462f8d33a..54952345f9 100644 --- a/nexus/src/app/sagas/switch_port_settings_clear.rs +++ b/nexus/src/app/sagas/switch_port_settings_clear.rs @@ -9,6 +9,7 @@ use crate::app::sagas::{ }; use crate::authn; use crate::db::datastore::UpdatePrecondition; +use crate::retry_until_known_result; use anyhow::Error; use dpd_client::types::PortId; use omicron_common::api::external::{self, NameOrId}; @@ -123,6 +124,7 @@ async fn spa_clear_switch_port_settings( ) -> Result<(), ActionError> { let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; + let log = sagactx.user_data().log(); let port_id: PortId = PortId::from_str(¶ms.port_name) .map_err(|e| ActionError::action_failed(e.to_string()))?; @@ -130,10 +132,10 @@ async fn spa_clear_switch_port_settings( let dpd_client: Arc = Arc::clone(&osagactx.nexus().dpd_client); - dpd_client - .port_settings_clear(&port_id) - .await - .map_err(|e| ActionError::action_failed(e.to_string()))?; + retry_until_known_result!(log, { + dpd_client.port_settings_clear(&port_id) + }) + .map_err(|e| ActionError::action_failed(e.to_string()))?; Ok(()) } @@ -148,6 +150,7 @@ async fn spa_undo_clear_switch_port_settings( &sagactx, ¶ms.serialized_authn, ); + let log = sagactx.user_data().log(); let port_id: PortId = PortId::from_str(¶ms.port_name) .map_err(|e| external::Error::internal_error(e))?; @@ -172,10 +175,10 @@ async fn spa_undo_clear_switch_port_settings( let dpd_port_settings = api_to_dpd_port_settings(&settings) .map_err(ActionError::action_failed)?; - dpd_client - .port_settings_apply(&port_id, &dpd_port_settings) - .await - .map_err(|e| external::Error::internal_error(&e.to_string()))?; + retry_until_known_result!(log, { + dpd_client.port_settings_apply(&port_id, &dpd_port_settings) + }) + .map_err(|e| external::Error::internal_error(&e.to_string()))?; Ok(()) } From cbd6853e00d316ac66c88e35b755aad4e412cdff Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Thu, 25 May 2023 17:09:15 -0400 Subject: [PATCH 2/6] fmt --- nexus/src/app/sagas/common_storage.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/nexus/src/app/sagas/common_storage.rs b/nexus/src/app/sagas/common_storage.rs index 5698d6afce..a0f3074071 100644 --- a/nexus/src/app/sagas/common_storage.rs +++ b/nexus/src/app/sagas/common_storage.rs @@ -330,10 +330,7 @@ pub async fn call_pantry_attach_for_disk( client.attach(&disk_id.to_string(), &attach_request) }) .map_err(|e| { - ActionError::action_failed(format!( - "pantry attach failed with {:?}", - e - )) + ActionError::action_failed(format!("pantry attach failed with {:?}", e)) })?; Ok(()) From 47102f82df3ef6cac7f598e861964866ff192ffc Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Thu, 25 May 2023 17:45:04 -0400 Subject: [PATCH 3/6] v2p mapping functions are called from sagas --- nexus/src/app/sled.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/nexus/src/app/sled.rs b/nexus/src/app/sled.rs index 1e2da91f65..64f6c3e90c 100644 --- a/nexus/src/app/sled.rs +++ b/nexus/src/app/sled.rs @@ -14,6 +14,7 @@ use crate::internal_api::params::{ PhysicalDiskDeleteRequest, PhysicalDiskPutRequest, SledAgentStartupInfo, SledRole, ZpoolPutRequest, }; +use crate::retry_until_known_result; use nexus_db_queries::context::OpContext; use nexus_db_queries::db::lookup; use omicron_common::api::external::DataPageParams; @@ -380,11 +381,15 @@ impl super::Nexus { vni: nic.vni.clone(), }; + let log = self.log.clone(); + // This function is idempotent: calling the set_v2p ioctl with // the same information is a no-op. join_handles.push(tokio::spawn(futures::future::lazy( move |_ctx| async move { - client.set_v2p(&nic_id, &mapping).await + retry_until_known_result!(log, { + client.set_v2p(&nic_id, &mapping) + }) }, ))); } @@ -483,11 +488,15 @@ impl super::Nexus { vni: nic.vni.clone(), }; + let log = self.log.clone(); + // This function is idempotent: calling the set_v2p ioctl with // the same information is a no-op. join_handles.push(tokio::spawn(futures::future::lazy( move |_ctx| async move { - client.del_v2p(&nic_id, &mapping).await + retry_until_known_result!(log, { + client.del_v2p(&nic_id, &mapping) + }) }, ))); } From 472f77a99a09db761ba75b18b5b1ab88f958938b Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Fri, 26 May 2023 16:11:19 -0400 Subject: [PATCH 4/6] retry on 503 or 429 add comment about requiring that calls are idempotent --- nexus/src/app/sagas/mod.rs | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/nexus/src/app/sagas/mod.rs b/nexus/src/app/sagas/mod.rs index 7dce4a6c36..9d816dae9c 100644 --- a/nexus/src/app/sagas/mod.rs +++ b/nexus/src/app/sagas/mod.rs @@ -309,7 +309,10 @@ pub(crate) use declare_saga_actions; /// /// Saga execution relies on the outcome of an external call being known: since /// they are idempotent, reissue the external call until a known result comes -/// back. Retry if a communication error is seen. +/// back. Retry if a communication error is seen, or if another retryable error +/// is seen. +/// +/// Note that retrying is only valid if the call itself is idempotent. #[macro_export] macro_rules! retry_until_known_result { ( $log:ident, $func:block ) => {{ @@ -331,6 +334,29 @@ macro_rules! retry_until_known_result { )) } + Err(progenitor_client::Error::ErrorResponse( + response_value, + )) => { + match response_value.status() { + // Retry on 503 or 429 + http::StatusCode::SERVICE_UNAVAILABLE + | http::StatusCode::TOO_MANY_REQUESTS => { + Err(backoff::BackoffError::transient( + progenitor_client::Error::ErrorResponse( + response_value, + ), + )) + } + + // Anything elses is a permanent error + _ => Err(backoff::BackoffError::Permanent( + progenitor_client::Error::ErrorResponse( + response_value, + ), + )), + } + } + Err(e) => { warn!($log, "saw permanent error {}, aborting", e,); From 8608dfa7c6cd34a3f00805e1ff5f22da44ee888c Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Fri, 26 May 2023 17:49:51 -0400 Subject: [PATCH 5/6] Create more dpd ensure functions dpd api server endpoints are not idempotent, so ensure functions are required when calling them for sagas. --- dpd-client/src/lib.rs | 128 +++++++++++++++++- nexus/src/app/instance.rs | 17 +-- nexus/src/app/sagas/instance_create.rs | 33 ++--- nexus/src/app/sagas/instance_delete.rs | 36 ++--- .../src/app/sagas/loopback_address_create.rs | 42 ++---- .../src/app/sagas/loopback_address_delete.rs | 15 +- 6 files changed, 173 insertions(+), 98 deletions(-) diff --git a/dpd-client/src/lib.rs b/dpd-client/src/lib.rs index e1c52a180b..1186e1722f 100644 --- a/dpd-client/src/lib.rs +++ b/dpd-client/src/lib.rs @@ -33,6 +33,9 @@ pub struct ClientState { impl Client { /// Ensure that a NAT entry exists, overwriting a previous conflicting entry if /// applicable. + /// + /// nat_ipv[46]_create are not idempotent (see oxidecomputer/dendrite#343), + /// but this wrapper function is. Call this from sagas instead. #[allow(clippy::too_many_arguments)] pub async fn ensure_nat_entry( &self, @@ -131,13 +134,136 @@ impl Client { Ok(()) } + + /// Ensure that a NAT entry is deleted. + /// + /// nat_ipv[46]_delete are not idempotent (see oxidecomputer/dendrite#343), + /// but this wrapper function is. Call this from sagas instead. + pub async fn ensure_nat_entry_deleted( + &self, + log: &Logger, + target_ip: ipnetwork::IpNetwork, + target_first_port: u16, + ) -> Result<(), progenitor_client::Error> { + let result = match target_ip { + ipnetwork::IpNetwork::V4(network) => { + self.nat_ipv4_delete(&network.ip(), target_first_port).await + } + ipnetwork::IpNetwork::V6(network) => { + self.nat_ipv6_delete(&network.ip(), target_first_port).await + } + }; + + match result { + Ok(_) => { + info!(log, "deleted old nat entry"; "target_ip" => ?target_ip); + } + + Err(e) => { + if e.status() == Some(http::StatusCode::NOT_FOUND) { + info!(log, "no nat entry found for: {target_ip:#?}"); + } else { + return Err(e); + } + } + } + + Ok(()) + } + + /// Ensure that a loopback address is created. + /// + /// loopback_ipv[46]_create are not idempotent (see + /// oxidecomputer/dendrite#343), but this wrapper function is. Call this + /// from sagas instead. + pub async fn ensure_loopback_created( + &self, + log: &Logger, + address: IpAddr, + tag: &str, + ) -> Result<(), progenitor_client::Error> { + let result = match &address { + IpAddr::V4(a) => { + self.loopback_ipv4_create(&types::Ipv4Entry { + addr: *a, + tag: tag.into(), + }) + .await + } + IpAddr::V6(a) => { + self.loopback_ipv6_create(&types::Ipv6Entry { + addr: *a, + tag: tag.into(), + }) + .await + } + }; + + match result { + Ok(_) => { + info!(log, "created loopback address"; "address" => ?address); + Ok(()) + } + + Err(progenitor_client::Error::ErrorResponse(er)) => { + match er.status() { + http::StatusCode::CONFLICT => { + info!(log, "loopback address already created"; "address" => ?address); + + Ok(()) + } + + _ => Err(progenitor_client::Error::ErrorResponse(er)), + } + } + + Err(e) => Err(e), + } + } + + /// Ensure that a loopback address is deleted. + /// + /// loopback_ipv[46]_delete are not idempotent (see + /// oxidecomputer/dendrite#343), but this wrapper function is. Call this + /// from sagas instead. + pub async fn ensure_loopback_deleted( + &self, + log: &Logger, + address: IpAddr, + ) -> Result<(), progenitor_client::Error> { + let result = match &address { + IpAddr::V4(a) => self.loopback_ipv4_delete(&a).await, + IpAddr::V6(a) => self.loopback_ipv6_delete(&a).await, + }; + + match result { + Ok(_) => { + info!(log, "deleted loopback address"; "address" => ?address); + Ok(()) + } + + Err(progenitor_client::Error::ErrorResponse(er)) => { + match er.status() { + http::StatusCode::NOT_FOUND => { + info!(log, "loopback address already deleted"; "address" => ?address); + Ok(()) + } + + _ => Err(progenitor_client::Error::ErrorResponse(er)), + } + } + + Err(e) => Err(e), + } + } } + // XXX delete everything below once we use the real dpd-client crate. // https://github.com/oxidecomputer/omicron/issues/2775 use std::convert::TryFrom; use std::fmt; -use std::net::{Ipv4Addr, Ipv6Addr}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::str::FromStr; use schemars::JsonSchema; diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index 79f973b10e..44dc09cfe6 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -16,6 +16,7 @@ use crate::db::identity::Resource; use crate::db::lookup; use crate::db::lookup::LookupPath; use crate::external_api::params; +use crate::retry_until_known_result; use futures::future::Fuse; use futures::{FutureExt, SinkExt, StreamExt}; use nexus_db_model::IpKind; @@ -1240,8 +1241,8 @@ impl super::Nexus { }) .map(|(_, ip)| ip) { - dpd_client - .ensure_nat_entry( + retry_until_known_result!(log, { + dpd_client.ensure_nat_entry( &log, target_ip.ip, dpd_client::types::MacAddr { a: mac_address.into_array() }, @@ -1250,12 +1251,12 @@ impl super::Nexus { vni, sled_ip_address.ip(), ) - .await - .map_err(|e| { - Error::internal_error(&format!( - "failed to ensure dpd entry: {e}" - )) - })?; + }) + .map_err(|e| { + Error::internal_error(&format!( + "failed to ensure dpd entry: {e}" + )) + })?; } Ok(()) diff --git a/nexus/src/app/sagas/instance_create.rs b/nexus/src/app/sagas/instance_create.rs index f137823bfe..e88d11957d 100644 --- a/nexus/src/app/sagas/instance_create.rs +++ b/nexus/src/app/sagas/instance_create.rs @@ -436,35 +436,24 @@ async fn sic_remove_network_config( debug!(log, "deleting nat mapping for entry: {target_ip:#?}"); - let result = match target_ip.ip { - ipnetwork::IpNetwork::V4(network) => { - retry_until_known_result!(log, { - dpd_client.nat_ipv4_delete(&network.ip(), *target_ip.first_port) - }) - } - ipnetwork::IpNetwork::V6(network) => { - retry_until_known_result!(log, { - dpd_client.nat_ipv6_delete(&network.ip(), *target_ip.first_port) - }) - } - }; + let result = retry_until_known_result!(log, { + dpd_client.ensure_nat_entry_deleted( + log, + target_ip.ip, + *target_ip.first_port, + ) + }); match result { Ok(_) => { debug!(log, "deletion of nat entry successful for: {target_ip:#?}"); Ok(()) } - Err(e) => { - if e.status() == Some(http::StatusCode::NOT_FOUND) { - debug!(log, "no nat entry found for: {target_ip:#?}"); - Ok(()) - } else { - Err(ActionError::action_failed(Error::internal_error( - &format!("failed to delete nat entry via dpd: {e}"), - ))) - } - } + Err(e) => Err(Error::internal_error(&format!( + "failed to delete nat entry via dpd: {e}" + ))), }?; + Ok(()) } diff --git a/nexus/src/app/sagas/instance_delete.rs b/nexus/src/app/sagas/instance_delete.rs index ec4b9ece36..c139332a45 100644 --- a/nexus/src/app/sagas/instance_delete.rs +++ b/nexus/src/app/sagas/instance_delete.rs @@ -166,34 +166,26 @@ async fn sid_delete_network_config( // bailing out before we've attempted deletion of all entries. for entry in external_ips { debug!(log, "deleting nat mapping for entry: {entry:#?}"); - let result = match entry.ip { - ipnetwork::IpNetwork::V4(network) => { - retry_until_known_result!(log, { - dpd_client.nat_ipv4_delete(&network.ip(), *entry.first_port) - }) - } - ipnetwork::IpNetwork::V6(network) => { - retry_until_known_result!(log, { - dpd_client.nat_ipv6_delete(&network.ip(), *entry.first_port) - }) - } - }; + + let result = retry_until_known_result!(log, { + dpd_client.ensure_nat_entry_deleted( + log, + entry.ip, + *entry.first_port, + ) + }); match result { Ok(_) => { debug!(log, "deletion of nat entry successful for: {entry:#?}"); } Err(e) => { - if e.status() == Some(http::StatusCode::NOT_FOUND) { - debug!(log, "no nat entry found for: {entry:#?}"); - } else { - let new_error = - ActionError::action_failed(Error::internal_error( - &format!("failed to delete nat entry via dpd: {e}"), - )); - error!(log, "{new_error:#?}"); - errors.push(new_error); - } + let new_error = + ActionError::action_failed(Error::internal_error( + &format!("failed to delete nat entry via dpd: {e}"), + )); + error!(log, "{new_error:#?}"); + errors.push(new_error); } } } diff --git a/nexus/src/app/sagas/loopback_address_create.rs b/nexus/src/app/sagas/loopback_address_create.rs index 2d4e1a6792..dfdc5e70c9 100644 --- a/nexus/src/app/sagas/loopback_address_create.rs +++ b/nexus/src/app/sagas/loopback_address_create.rs @@ -12,9 +12,7 @@ use crate::db::model::LoopbackAddress; use crate::external_api::params; use crate::retry_until_known_result; use anyhow::Error; -use dpd_client::types::{Ipv4Entry, Ipv6Entry}; use serde::{Deserialize, Serialize}; -use std::net::IpAddr; use std::sync::Arc; use steno::ActionError; @@ -150,36 +148,12 @@ async fn slc_loopback_address_create( let dpd_client: Arc = Arc::clone(&osagactx.nexus().dpd_client); - let result = match ¶ms.loopback_address.address { - IpAddr::V4(a) => { - retry_until_known_result!(log, { - dpd_client.loopback_ipv4_create(&Ipv4Entry { - addr: *a, - tag: NEXUS_DPD_TAG.into(), - }) - }) - } - IpAddr::V6(a) => { - retry_until_known_result!(log, { - dpd_client.loopback_ipv6_create(&Ipv6Entry { - addr: *a, - tag: NEXUS_DPD_TAG.into(), - }) - }) - } - }; - - if let Err(e) = result { - match e { - sled_agent_client::Error::ErrorResponse(ref er) => { - match er.status() { - http::StatusCode::CONFLICT => Ok(()), - _ => Err(ActionError::action_failed(e.to_string())), - } - } - _ => Err(ActionError::action_failed(e.to_string())), - }?; - } - - Ok(()) + retry_until_known_result!(log, { + dpd_client.ensure_loopback_created( + log, + params.loopback_address.address, + NEXUS_DPD_TAG, + ) + }) + .map_err(|e| ActionError::action_failed(e.to_string())) } diff --git a/nexus/src/app/sagas/loopback_address_delete.rs b/nexus/src/app/sagas/loopback_address_delete.rs index 3d0ef2099d..12c22595ff 100644 --- a/nexus/src/app/sagas/loopback_address_delete.rs +++ b/nexus/src/app/sagas/loopback_address_delete.rs @@ -177,15 +177,8 @@ async fn slc_loopback_address_delete( let dpd_client: Arc = Arc::clone(&osagactx.nexus().dpd_client); - match ¶ms.address { - IpNet::V4(a) => retry_until_known_result!(log, { - dpd_client.loopback_ipv4_delete(&a.ip()) - }), - IpNet::V6(a) => retry_until_known_result!(log, { - dpd_client.loopback_ipv6_delete(&a.ip()) - }), - } - .map_err(|e| ActionError::action_failed(e.to_string()))?; - - Ok(()) + retry_until_known_result!(log, { + dpd_client.ensure_loopback_deleted(log, params.address.ip()) + }) + .map_err(|e| ActionError::action_failed(e.to_string())) } From b80679efce5d8fb8157677d4fdcf407053d45566 Mon Sep 17 00:00:00 2001 From: James MacMahon Date: Fri, 2 Jun 2023 16:42:47 -0400 Subject: [PATCH 6/6] Turn retry_until_known_result into a function Credit goes to jgallagher, I just copied what he wrote --- nexus/src/app/instance.rs | 27 ++-- nexus/src/app/sagas/common_storage.rs | 21 ++-- nexus/src/app/sagas/import_blocks_from_url.rs | 40 +++--- nexus/src/app/sagas/instance_create.rs | 15 ++- nexus/src/app/sagas/instance_delete.rs | 15 ++- .../src/app/sagas/loopback_address_create.rs | 17 +-- .../src/app/sagas/loopback_address_delete.rs | 7 +- nexus/src/app/sagas/mod.rs | 54 ++++---- nexus/src/app/sagas/snapshot_create.rs | 115 +++++++++++------- .../app/sagas/switch_port_settings_apply.rs | 17 +-- .../app/sagas/switch_port_settings_clear.rs | 12 +- nexus/src/app/sled.rs | 12 +- 12 files changed, 200 insertions(+), 152 deletions(-) diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index ae898b6d4e..27bd029d12 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -8,6 +8,7 @@ use super::MAX_DISKS_PER_INSTANCE; use super::MAX_EXTERNAL_IPS_PER_INSTANCE; use super::MAX_NICS_PER_INSTANCE; use crate::app::sagas; +use crate::app::sagas::retry_until_known_result; use crate::authn; use crate::authz; use crate::cidata::InstanceCiData; @@ -16,7 +17,6 @@ use crate::db::identity::Resource; use crate::db::lookup; use crate::db::lookup::LookupPath; use crate::external_api::params; -use crate::retry_until_known_result; use futures::future::Fuse; use futures::{FutureExt, SinkExt, StreamExt}; use nexus_db_model::IpKind; @@ -1241,17 +1241,22 @@ impl super::Nexus { }) .map(|(_, ip)| ip) { - retry_until_known_result!(log, { - dpd_client.ensure_nat_entry( - &log, - target_ip.ip, - dpd_client::types::MacAddr { a: mac_address.into_array() }, - *target_ip.first_port, - *target_ip.last_port, - vni, - sled_ip_address.ip(), - ) + retry_until_known_result(log, || async { + dpd_client + .ensure_nat_entry( + &log, + target_ip.ip, + dpd_client::types::MacAddr { + a: mac_address.into_array(), + }, + *target_ip.first_port, + *target_ip.last_port, + vni, + sled_ip_address.ip(), + ) + .await }) + .await .map_err(|e| { Error::internal_error(&format!( "failed to ensure dpd entry: {e}" diff --git a/nexus/src/app/sagas/common_storage.rs b/nexus/src/app/sagas/common_storage.rs index a0f3074071..a9c5a0f265 100644 --- a/nexus/src/app/sagas/common_storage.rs +++ b/nexus/src/app/sagas/common_storage.rs @@ -6,11 +6,11 @@ use super::*; +use crate::app::sagas::retry_until_known_result; use crate::authz; use crate::db; use crate::db::identity::Asset; use crate::db::lookup::LookupPath; -use crate::retry_until_known_result; use crate::Nexus; use anyhow::anyhow; use crucible_agent_client::{ @@ -326,9 +326,10 @@ pub async fn call_pantry_attach_for_disk( volume_construction_request, }; - retry_until_known_result!(log, { - client.attach(&disk_id.to_string(), &attach_request) + retry_until_known_result(log, || async { + client.attach(&disk_id.to_string(), &attach_request).await }) + .await .map_err(|e| { ActionError::action_failed(format!("pantry attach failed with {:?}", e)) })?; @@ -347,13 +348,13 @@ pub async fn call_pantry_detach_for_disk( let client = crucible_pantry_client::Client::new(&endpoint); - retry_until_known_result!(log, { client.detach(&disk_id.to_string()) }) - .map_err(|e| { - ActionError::action_failed(format!( - "pantry detach failed with {:?}", - e - )) - })?; + retry_until_known_result(log, || async { + client.detach(&disk_id.to_string()).await + }) + .await + .map_err(|e| { + ActionError::action_failed(format!("pantry detach failed with {:?}", e)) + })?; Ok(()) } diff --git a/nexus/src/app/sagas/import_blocks_from_url.rs b/nexus/src/app/sagas/import_blocks_from_url.rs index 47360a0975..b9a1a01ee5 100644 --- a/nexus/src/app/sagas/import_blocks_from_url.rs +++ b/nexus/src/app/sagas/import_blocks_from_url.rs @@ -10,8 +10,8 @@ use super::ActionRegistry; use super::NexusActionContext; use super::NexusSaga; use super::SagaInitError; +use crate::app::sagas::retry_until_known_result; use crate::db::lookup::LookupPath; -use crate::retry_until_known_result; use crate::{authn, authz}; use nexus_db_model::Generation; use nexus_types::external_api::params; @@ -272,9 +272,10 @@ async fn sibfu_call_pantry_import_from_url_for_disk( }, }; - let response = retry_until_known_result!(log, { - client.import_from_url(&disk_id, &request) + let response = retry_until_known_result(log, || async { + client.import_from_url(&disk_id, &request).await }) + .await .map_err(|e| { ActionError::action_failed(format!( "import from url failed with {:?}", @@ -308,14 +309,16 @@ async fn sibfu_wait_for_import_from_url( ); loop { - let result = - retry_until_known_result!(log, { client.is_job_finished(&job_id) }) - .map_err(|e| { - ActionError::action_failed(format!( - "is_job_finished failed with {:?}", - e - )) - })?; + let result = retry_until_known_result(log, || async { + client.is_job_finished(&job_id).await + }) + .await + .map_err(|e| { + ActionError::action_failed(format!( + "is_job_finished failed with {:?}", + e + )) + })?; if result.job_is_finished { break; @@ -332,14 +335,13 @@ async fn sibfu_wait_for_import_from_url( endpoint, ); - let response = - retry_until_known_result!(log, { client.job_result_ok(&job_id) }) - .map_err(|e| { - ActionError::action_failed(format!( - "job_result_ok failed with {:?}", - e - )) - })?; + let response = retry_until_known_result(log, || async { + client.job_result_ok(&job_id).await + }) + .await + .map_err(|e| { + ActionError::action_failed(format!("job_result_ok failed with {:?}", e)) + })?; if !response.job_result_ok { return Err(ActionError::action_failed(format!("Job {job_id} failed"))); diff --git a/nexus/src/app/sagas/instance_create.rs b/nexus/src/app/sagas/instance_create.rs index e88d11957d..9841ef6a04 100644 --- a/nexus/src/app/sagas/instance_create.rs +++ b/nexus/src/app/sagas/instance_create.rs @@ -6,6 +6,7 @@ use super::{NexusActionContext, NexusSaga, SagaInitError, ACTION_GENERATE_ID}; use crate::app::instance::WriteBackUpdatedInstance; use crate::app::sagas::declare_saga_actions; use crate::app::sagas::disk_create::{self, SagaDiskCreate}; +use crate::app::sagas::retry_until_known_result; use crate::app::{ MAX_DISKS_PER_INSTANCE, MAX_EXTERNAL_IPS_PER_INSTANCE, MAX_NICS_PER_INSTANCE, @@ -15,7 +16,6 @@ use crate::db::lookup::LookupPath; use crate::db::model::ByteCount as DbByteCount; use crate::db::queries::network_interface::InsertError as InsertNicError; use crate::external_api::params; -use crate::retry_until_known_result; use crate::{authn, authz, db}; use chrono::Utc; use nexus_db_model::NetworkInterfaceKind; @@ -436,13 +436,12 @@ async fn sic_remove_network_config( debug!(log, "deleting nat mapping for entry: {target_ip:#?}"); - let result = retry_until_known_result!(log, { - dpd_client.ensure_nat_entry_deleted( - log, - target_ip.ip, - *target_ip.first_port, - ) - }); + let result = retry_until_known_result(log, || async { + dpd_client + .ensure_nat_entry_deleted(log, target_ip.ip, *target_ip.first_port) + .await + }) + .await; match result { Ok(_) => { diff --git a/nexus/src/app/sagas/instance_delete.rs b/nexus/src/app/sagas/instance_delete.rs index c139332a45..4e9654dcd9 100644 --- a/nexus/src/app/sagas/instance_delete.rs +++ b/nexus/src/app/sagas/instance_delete.rs @@ -6,9 +6,9 @@ use super::ActionRegistry; use super::NexusActionContext; use super::NexusSaga; use crate::app::sagas::declare_saga_actions; +use crate::app::sagas::retry_until_known_result; use crate::db; use crate::db::lookup::LookupPath; -use crate::retry_until_known_result; use crate::{authn, authz}; use nexus_types::identity::Resource; use omicron_common::api::external::{Error, ResourceType}; @@ -167,13 +167,12 @@ async fn sid_delete_network_config( for entry in external_ips { debug!(log, "deleting nat mapping for entry: {entry:#?}"); - let result = retry_until_known_result!(log, { - dpd_client.ensure_nat_entry_deleted( - log, - entry.ip, - *entry.first_port, - ) - }); + let result = retry_until_known_result(log, || async { + dpd_client + .ensure_nat_entry_deleted(log, entry.ip, *entry.first_port) + .await + }) + .await; match result { Ok(_) => { diff --git a/nexus/src/app/sagas/loopback_address_create.rs b/nexus/src/app/sagas/loopback_address_create.rs index dfdc5e70c9..26c5d0bf3a 100644 --- a/nexus/src/app/sagas/loopback_address_create.rs +++ b/nexus/src/app/sagas/loopback_address_create.rs @@ -3,6 +3,7 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use super::{NexusActionContext, NEXUS_DPD_TAG}; +use crate::app::sagas::retry_until_known_result; use crate::app::sagas::{ declare_saga_actions, ActionRegistry, NexusSaga, SagaInitError, }; @@ -10,7 +11,6 @@ use crate::authn; use crate::authz; use crate::db::model::LoopbackAddress; use crate::external_api::params; -use crate::retry_until_known_result; use anyhow::Error; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -148,12 +148,15 @@ async fn slc_loopback_address_create( let dpd_client: Arc = Arc::clone(&osagactx.nexus().dpd_client); - retry_until_known_result!(log, { - dpd_client.ensure_loopback_created( - log, - params.loopback_address.address, - NEXUS_DPD_TAG, - ) + retry_until_known_result(log, || async { + dpd_client + .ensure_loopback_created( + log, + params.loopback_address.address, + NEXUS_DPD_TAG, + ) + .await }) + .await .map_err(|e| ActionError::action_failed(e.to_string())) } diff --git a/nexus/src/app/sagas/loopback_address_delete.rs b/nexus/src/app/sagas/loopback_address_delete.rs index 12c22595ff..0419659324 100644 --- a/nexus/src/app/sagas/loopback_address_delete.rs +++ b/nexus/src/app/sagas/loopback_address_delete.rs @@ -3,6 +3,7 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use super::NexusActionContext; +use crate::app::sagas::retry_until_known_result; use crate::app::sagas::{ declare_saga_actions, ActionRegistry, NexusSaga, SagaInitError, }; @@ -10,7 +11,6 @@ use crate::authn; use crate::authz; use crate::db::model::{LoopbackAddress, Name}; use crate::external_api::params; -use crate::retry_until_known_result; use anyhow::{anyhow, Error}; use nexus_types::identity::Asset; use omicron_common::api::external::{IpNet, NameOrId}; @@ -177,8 +177,9 @@ async fn slc_loopback_address_delete( let dpd_client: Arc = Arc::clone(&osagactx.nexus().dpd_client); - retry_until_known_result!(log, { - dpd_client.ensure_loopback_deleted(log, params.address.ip()) + retry_until_known_result(log, || async { + dpd_client.ensure_loopback_deleted(log, params.address.ip()).await }) + .await .map_err(|e| ActionError::action_failed(e.to_string())) } diff --git a/nexus/src/app/sagas/mod.rs b/nexus/src/app/sagas/mod.rs index 9a34eba866..1b7b85ae13 100644 --- a/nexus/src/app/sagas/mod.rs +++ b/nexus/src/app/sagas/mod.rs @@ -309,6 +309,8 @@ pub(crate) use __emit_action; pub(crate) use __stringify_ident; pub(crate) use declare_saga_actions; +use futures::Future; + /// Retry a progenitor client operation until a known result is returned. /// /// Saga execution relies on the outcome of an external call being known: since @@ -317,18 +319,26 @@ pub(crate) use declare_saga_actions; /// is seen. /// /// Note that retrying is only valid if the call itself is idempotent. -#[macro_export] -macro_rules! retry_until_known_result { - ( $log:ident, $func:block ) => {{ - use omicron_common::backoff; - - backoff::retry_notify( - backoff::retry_policy_internal_service(), - || async { - match ($func).await { +pub(crate) async fn retry_until_known_result( + log: &slog::Logger, + mut f: F, +) -> Result> +where + F: FnMut() -> Fut, + Fut: Future>>, + E: std::fmt::Debug, +{ + use omicron_common::backoff; + + backoff::retry_notify( + backoff::retry_policy_internal_service(), + move || { + let fut = f(); + async move { + match fut.await { Err(progenitor_client::Error::CommunicationError(e)) => { warn!( - $log, + log, "saw transient communication error {}, retrying...", e, ); @@ -362,23 +372,21 @@ macro_rules! retry_until_known_result { } Err(e) => { - warn!($log, "saw permanent error {}, aborting", e,); + warn!(log, "saw permanent error {}, aborting", e,); Err(backoff::BackoffError::Permanent(e)) } Ok(v) => Ok(v), } - }, - |error: progenitor_client::Error<_>, delay| { - warn!( - $log, - "failed external call ({:?}), will retry in {:?}", - error, - delay, - ); - }, - ) - .await - }}; + } + }, + |error: progenitor_client::Error<_>, delay| { + warn!( + log, + "failed external call ({:?}), will retry in {:?}", error, delay, + ); + }, + ) + .await } diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs index c92edc487a..cb9547217b 100644 --- a/nexus/src/app/sagas/snapshot_create.rs +++ b/nexus/src/app/sagas/snapshot_create.rs @@ -98,10 +98,10 @@ use super::{ ACTION_GENERATE_ID, }; use crate::app::sagas::declare_saga_actions; +use crate::app::sagas::retry_until_known_result; use crate::db::identity::{Asset, Resource}; use crate::db::lookup::LookupPath; use crate::external_api::params; -use crate::retry_until_known_result; use crate::{authn, authz, db}; use anyhow::anyhow; use crucible_agent_client::{types::RegionId, Client as CrucibleAgentClient}; @@ -681,13 +681,16 @@ async fn ssc_send_snapshot_request_to_sled_agent( info!(log, "instance {} sled agent created ok", instance_id); // Send a snapshot request to propolis through sled agent - retry_until_known_result!(log, { - sled_agent_client.instance_issue_disk_snapshot_request( - &instance.id(), - &disk.id(), - &InstanceIssueDiskSnapshotRequestBody { snapshot_id }, - ) + retry_until_known_result(log, || async { + sled_agent_client + .instance_issue_disk_snapshot_request( + &instance.id(), + &disk.id(), + &InstanceIssueDiskSnapshotRequestBody { snapshot_id }, + ) + .await }) + .await .map_err(|e| e.to_string()) .map_err(ActionError::action_failed)?; Ok(()) @@ -734,12 +737,15 @@ async fn ssc_send_snapshot_request_to_sled_agent_undo( let url = format!("http://{}", dataset.address()); let client = CrucibleAgentClient::new(&url); - retry_until_known_result!(log, { - client.region_delete_snapshot( - &RegionId(region.id().to_string()), - &snapshot_id.to_string(), - ) - })?; + retry_until_known_result(log, || async { + client + .region_delete_snapshot( + &RegionId(region.id().to_string()), + &snapshot_id.to_string(), + ) + .await + }) + .await?; } Ok(()) } @@ -1007,14 +1013,17 @@ async fn ssc_call_pantry_snapshot_for_disk( let client = crucible_pantry_client::Client::new(&endpoint); - retry_until_known_result!(log, { - client.snapshot( - ¶ms.disk_id.to_string(), - &crucible_pantry_client::types::SnapshotRequest { - snapshot_id: snapshot_id.to_string(), - }, - ) + retry_until_known_result(log, || async { + client + .snapshot( + ¶ms.disk_id.to_string(), + &crucible_pantry_client::types::SnapshotRequest { + snapshot_id: snapshot_id.to_string(), + }, + ) + .await }) + .await .map_err(|e| { ActionError::action_failed(Error::internal_error(&e.to_string())) })?; @@ -1050,12 +1059,15 @@ async fn ssc_call_pantry_snapshot_for_disk_undo( let url = format!("http://{}", dataset.address()); let client = CrucibleAgentClient::new(&url); - retry_until_known_result!(log, { - client.region_delete_snapshot( - &RegionId(region.id().to_string()), - &snapshot_id.to_string(), - ) - })?; + retry_until_known_result(log, || async { + client + .region_delete_snapshot( + &RegionId(region.id().to_string()), + &snapshot_id.to_string(), + ) + .await + }) + .await?; } Ok(()) } @@ -1197,34 +1209,42 @@ async fn ssc_start_running_snapshot( info!(log, "dataset {:?} region {:?} url {}", dataset, region, url); // Validate with the Crucible agent that the snapshot exists - let crucible_region = retry_until_known_result!(log, { - client.region_get(&RegionId(region.id().to_string())) + let crucible_region = retry_until_known_result(log, || async { + client.region_get(&RegionId(region.id().to_string())).await }) + .await .map_err(|e| e.to_string()) .map_err(ActionError::action_failed)?; info!(log, "crucible region {:?}", crucible_region); - let crucible_snapshot = retry_until_known_result!(log, { - client.region_get_snapshot( - &RegionId(region.id().to_string()), - &snapshot_id.to_string(), - ) + let crucible_snapshot = retry_until_known_result(log, || async { + client + .region_get_snapshot( + &RegionId(region.id().to_string()), + &snapshot_id.to_string(), + ) + .await }) + .await .map_err(|e| e.to_string()) .map_err(ActionError::action_failed)?; info!(log, "crucible snapshot {:?}", crucible_snapshot); // Start the snapshot running - let crucible_running_snapshot = retry_until_known_result!(log, { - client.region_run_snapshot( - &RegionId(region.id().to_string()), - &snapshot_id.to_string(), - ) - }) - .map_err(|e| e.to_string()) - .map_err(ActionError::action_failed)?; + let crucible_running_snapshot = + retry_until_known_result(log, || async { + client + .region_run_snapshot( + &RegionId(region.id().to_string()), + &snapshot_id.to_string(), + ) + .await + }) + .await + .map_err(|e| e.to_string()) + .map_err(ActionError::action_failed)?; info!(log, "crucible running snapshot {:?}", crucible_running_snapshot); @@ -1289,12 +1309,15 @@ async fn ssc_start_running_snapshot_undo( use crucible_agent_client::Error::ErrorResponse; use http::status::StatusCode; - retry_until_known_result!(log, { - client.region_delete_running_snapshot( - &RegionId(region.id().to_string()), - &snapshot_id.to_string(), - ) + retry_until_known_result(log, || async { + client + .region_delete_running_snapshot( + &RegionId(region.id().to_string()), + &snapshot_id.to_string(), + ) + .await }) + .await .map(|_| ()) // NOTE: If we later create a volume record and delete it, the // running snapshot may be deleted (see: diff --git a/nexus/src/app/sagas/switch_port_settings_apply.rs b/nexus/src/app/sagas/switch_port_settings_apply.rs index b3b82f6e1e..887c1ee5cd 100644 --- a/nexus/src/app/sagas/switch_port_settings_apply.rs +++ b/nexus/src/app/sagas/switch_port_settings_apply.rs @@ -3,11 +3,11 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use super::{NexusActionContext, NEXUS_DPD_TAG}; +use crate::app::sagas::retry_until_known_result; use crate::app::sagas::{ declare_saga_actions, ActionRegistry, NexusSaga, SagaInitError, }; use crate::db::datastore::UpdatePrecondition; -use crate::retry_until_known_result; use crate::{authn, db}; use anyhow::Error; use db::datastore::SwitchPortSettingsCombinedResult; @@ -215,9 +215,10 @@ async fn spa_ensure_switch_port_settings( let dpd_port_settings = api_to_dpd_port_settings(&settings) .map_err(ActionError::action_failed)?; - retry_until_known_result!(log, { - dpd_client.port_settings_apply(&port_id, &dpd_port_settings) + retry_until_known_result(log, || async { + dpd_client.port_settings_apply(&port_id, &dpd_port_settings).await }) + .await .map_err(|e| ActionError::action_failed(e.to_string()))?; Ok(()) @@ -248,9 +249,10 @@ async fn spa_undo_ensure_switch_port_settings( let id = match orig_port_settings_id { Some(id) => id, None => { - retry_until_known_result!(log, { - dpd_client.port_settings_clear(&port_id) + retry_until_known_result(log, || async { + dpd_client.port_settings_clear(&port_id).await }) + .await .map_err(|e| external::Error::internal_error(&e.to_string()))?; return Ok(()); @@ -265,9 +267,10 @@ async fn spa_undo_ensure_switch_port_settings( let dpd_port_settings = api_to_dpd_port_settings(&settings) .map_err(ActionError::action_failed)?; - retry_until_known_result!(log, { - dpd_client.port_settings_apply(&port_id, &dpd_port_settings) + retry_until_known_result(log, || async { + dpd_client.port_settings_apply(&port_id, &dpd_port_settings).await }) + .await .map_err(|e| external::Error::internal_error(&e.to_string()))?; Ok(()) diff --git a/nexus/src/app/sagas/switch_port_settings_clear.rs b/nexus/src/app/sagas/switch_port_settings_clear.rs index 54952345f9..3dd6941f5b 100644 --- a/nexus/src/app/sagas/switch_port_settings_clear.rs +++ b/nexus/src/app/sagas/switch_port_settings_clear.rs @@ -3,13 +3,13 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use super::NexusActionContext; +use crate::app::sagas::retry_until_known_result; use crate::app::sagas::switch_port_settings_apply::api_to_dpd_port_settings; use crate::app::sagas::{ declare_saga_actions, ActionRegistry, NexusSaga, SagaInitError, }; use crate::authn; use crate::db::datastore::UpdatePrecondition; -use crate::retry_until_known_result; use anyhow::Error; use dpd_client::types::PortId; use omicron_common::api::external::{self, NameOrId}; @@ -132,9 +132,10 @@ async fn spa_clear_switch_port_settings( let dpd_client: Arc = Arc::clone(&osagactx.nexus().dpd_client); - retry_until_known_result!(log, { - dpd_client.port_settings_clear(&port_id) + retry_until_known_result(log, || async { + dpd_client.port_settings_clear(&port_id).await }) + .await .map_err(|e| ActionError::action_failed(e.to_string()))?; Ok(()) @@ -175,9 +176,10 @@ async fn spa_undo_clear_switch_port_settings( let dpd_port_settings = api_to_dpd_port_settings(&settings) .map_err(ActionError::action_failed)?; - retry_until_known_result!(log, { - dpd_client.port_settings_apply(&port_id, &dpd_port_settings) + retry_until_known_result(log, || async { + dpd_client.port_settings_apply(&port_id, &dpd_port_settings).await }) + .await .map_err(|e| external::Error::internal_error(&e.to_string()))?; Ok(()) diff --git a/nexus/src/app/sled.rs b/nexus/src/app/sled.rs index 725a7507f6..2124760f6a 100644 --- a/nexus/src/app/sled.rs +++ b/nexus/src/app/sled.rs @@ -4,6 +4,7 @@ //! Sleds, and the hardware and services within them. +use crate::app::sagas::retry_until_known_result; use crate::authz; use crate::db; use crate::db::identity::Asset; @@ -14,7 +15,6 @@ use crate::internal_api::params::{ PhysicalDiskDeleteRequest, PhysicalDiskPutRequest, SledAgentStartupInfo, SledRole, ZpoolPutRequest, }; -use crate::retry_until_known_result; use nexus_db_queries::context::OpContext; use nexus_db_queries::db::lookup; use omicron_common::api::external::DataPageParams; @@ -388,9 +388,10 @@ impl super::Nexus { // the same information is a no-op. join_handles.push(tokio::spawn(futures::future::lazy( move |_ctx| async move { - retry_until_known_result!(log, { - client.set_v2p(&nic_id, &mapping) + retry_until_known_result(&log, || async { + client.set_v2p(&nic_id, &mapping).await }) + .await }, ))); } @@ -495,9 +496,10 @@ impl super::Nexus { // the same information is a no-op. join_handles.push(tokio::spawn(futures::future::lazy( move |_ctx| async move { - retry_until_known_result!(log, { - client.del_v2p(&nic_id, &mapping) + retry_until_known_result(&log, || async { + client.del_v2p(&nic_id, &mapping).await }) + .await }, ))); }