diff --git a/Cargo.lock b/Cargo.lock index f70bcc2168..fd03909b1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4588,6 +4588,7 @@ dependencies = [ "pem", "petgraph", "pq-sys", + "progenitor-client", "propolis-client", "rand 0.8.5", "rcgen", diff --git a/dpd-client/src/lib.rs b/dpd-client/src/lib.rs index e1c52a180b..1186e1722f 100644 --- a/dpd-client/src/lib.rs +++ b/dpd-client/src/lib.rs @@ -33,6 +33,9 @@ pub struct ClientState { impl Client { /// Ensure that a NAT entry exists, overwriting a previous conflicting entry if /// applicable. + /// + /// nat_ipv[46]_create are not idempotent (see oxidecomputer/dendrite#343), + /// but this wrapper function is. Call this from sagas instead. #[allow(clippy::too_many_arguments)] pub async fn ensure_nat_entry( &self, @@ -131,13 +134,136 @@ impl Client { Ok(()) } + + /// Ensure that a NAT entry is deleted. + /// + /// nat_ipv[46]_delete are not idempotent (see oxidecomputer/dendrite#343), + /// but this wrapper function is. Call this from sagas instead. + pub async fn ensure_nat_entry_deleted( + &self, + log: &Logger, + target_ip: ipnetwork::IpNetwork, + target_first_port: u16, + ) -> Result<(), progenitor_client::Error> { + let result = match target_ip { + ipnetwork::IpNetwork::V4(network) => { + self.nat_ipv4_delete(&network.ip(), target_first_port).await + } + ipnetwork::IpNetwork::V6(network) => { + self.nat_ipv6_delete(&network.ip(), target_first_port).await + } + }; + + match result { + Ok(_) => { + info!(log, "deleted old nat entry"; "target_ip" => ?target_ip); + } + + Err(e) => { + if e.status() == Some(http::StatusCode::NOT_FOUND) { + info!(log, "no nat entry found for: {target_ip:#?}"); + } else { + return Err(e); + } + } + } + + Ok(()) + } + + /// Ensure that a loopback address is created. + /// + /// loopback_ipv[46]_create are not idempotent (see + /// oxidecomputer/dendrite#343), but this wrapper function is. Call this + /// from sagas instead. + pub async fn ensure_loopback_created( + &self, + log: &Logger, + address: IpAddr, + tag: &str, + ) -> Result<(), progenitor_client::Error> { + let result = match &address { + IpAddr::V4(a) => { + self.loopback_ipv4_create(&types::Ipv4Entry { + addr: *a, + tag: tag.into(), + }) + .await + } + IpAddr::V6(a) => { + self.loopback_ipv6_create(&types::Ipv6Entry { + addr: *a, + tag: tag.into(), + }) + .await + } + }; + + match result { + Ok(_) => { + info!(log, "created loopback address"; "address" => ?address); + Ok(()) + } + + Err(progenitor_client::Error::ErrorResponse(er)) => { + match er.status() { + http::StatusCode::CONFLICT => { + info!(log, "loopback address already created"; "address" => ?address); + + Ok(()) + } + + _ => Err(progenitor_client::Error::ErrorResponse(er)), + } + } + + Err(e) => Err(e), + } + } + + /// Ensure that a loopback address is deleted. + /// + /// loopback_ipv[46]_delete are not idempotent (see + /// oxidecomputer/dendrite#343), but this wrapper function is. Call this + /// from sagas instead. + pub async fn ensure_loopback_deleted( + &self, + log: &Logger, + address: IpAddr, + ) -> Result<(), progenitor_client::Error> { + let result = match &address { + IpAddr::V4(a) => self.loopback_ipv4_delete(&a).await, + IpAddr::V6(a) => self.loopback_ipv6_delete(&a).await, + }; + + match result { + Ok(_) => { + info!(log, "deleted loopback address"; "address" => ?address); + Ok(()) + } + + Err(progenitor_client::Error::ErrorResponse(er)) => { + match er.status() { + http::StatusCode::NOT_FOUND => { + info!(log, "loopback address already deleted"; "address" => ?address); + Ok(()) + } + + _ => Err(progenitor_client::Error::ErrorResponse(er)), + } + } + + Err(e) => Err(e), + } + } } + // XXX delete everything below once we use the real dpd-client crate. // https://github.com/oxidecomputer/omicron/issues/2775 use std::convert::TryFrom; use std::fmt; -use std::net::{Ipv4Addr, Ipv6Addr}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::str::FromStr; use schemars::JsonSchema; diff --git a/nexus/Cargo.toml b/nexus/Cargo.toml index dfaadf10d6..f17bcc9396 100644 --- a/nexus/Cargo.toml +++ b/nexus/Cargo.toml @@ -51,6 +51,7 @@ parse-display.workspace = true paste.workspace = true # See omicron-rpaths for more about the "pq-sys" dependency. pq-sys = "*" +progenitor-client.workspace = true propolis-client.workspace = true rand.workspace = true ref-cast.workspace = true diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index ebb4884ac7..ee34749cd8 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -8,6 +8,7 @@ use super::MAX_DISKS_PER_INSTANCE; use super::MAX_EXTERNAL_IPS_PER_INSTANCE; use super::MAX_NICS_PER_INSTANCE; use crate::app::sagas; +use crate::app::sagas::retry_until_known_result; use crate::authn; use crate::authz; use crate::cidata::InstanceCiData; @@ -1241,22 +1242,27 @@ impl super::Nexus { }) .map(|(_, ip)| ip) { - dpd_client - .ensure_nat_entry( - &log, - target_ip.ip, - dpd_client::types::MacAddr { a: mac_address.into_array() }, - *target_ip.first_port, - *target_ip.last_port, - vni, - sled_ip_address.ip(), - ) - .await - .map_err(|e| { - Error::internal_error(&format!( - "failed to ensure dpd entry: {e}" - )) - })?; + retry_until_known_result(log, || async { + dpd_client + .ensure_nat_entry( + &log, + target_ip.ip, + dpd_client::types::MacAddr { + a: mac_address.into_array(), + }, + *target_ip.first_port, + *target_ip.last_port, + vni, + sled_ip_address.ip(), + ) + .await + }) + .await + .map_err(|e| { + Error::internal_error(&format!( + "failed to ensure dpd entry: {e}" + )) + })?; } Ok(()) diff --git a/nexus/src/app/sagas/common_storage.rs b/nexus/src/app/sagas/common_storage.rs index 4c5853d916..a9c5a0f265 100644 --- a/nexus/src/app/sagas/common_storage.rs +++ b/nexus/src/app/sagas/common_storage.rs @@ -6,6 +6,7 @@ use super::*; +use crate::app::sagas::retry_until_known_result; use crate::authz; use crate::db; use crate::db::identity::Asset; @@ -281,73 +282,6 @@ pub async fn get_pantry_address( // Common Pantry operations -#[macro_export] -macro_rules! retry_until_known_result { - ( $log:ident, $func:block ) => {{ - use omicron_common::backoff; - - #[derive(Debug, thiserror::Error)] - enum InnerError { - #[error("Reqwest error: {0}")] - Reqwest(#[from] reqwest::Error), - - #[error("Pantry client error: {0}")] - PantryClient( - #[from] - crucible_pantry_client::Error< - crucible_pantry_client::types::Error, - >, - ), - } - - backoff::retry_notify( - backoff::retry_policy_internal_service(), - || async { - match ($func).await { - Err(crucible_pantry_client::Error::CommunicationError( - e, - )) => { - warn!( - $log, - "saw transient communication error {}, retrying...", - e, - ); - - Err(backoff::BackoffError::::transient( - e.into(), - )) - } - - Err(e) => { - warn!($log, "saw permanent error {}, aborting", e,); - - Err(backoff::BackoffError::::Permanent( - e.into(), - )) - } - - Ok(v) => Ok(v), - } - }, - |error: InnerError, delay| { - warn!( - $log, - "failed external call ({:?}), will retry in {:?}", - error, - delay, - ); - }, - ) - .await - .map_err(|e| { - ActionError::action_failed(format!( - "gave up on external call due to {:?}", - e - )) - }) - }}; -} - pub async fn call_pantry_attach_for_disk( log: &slog::Logger, opctx: &OpContext, @@ -392,8 +326,12 @@ pub async fn call_pantry_attach_for_disk( volume_construction_request, }; - retry_until_known_result!(log, { - client.attach(&disk_id.to_string(), &attach_request) + retry_until_known_result(log, || async { + client.attach(&disk_id.to_string(), &attach_request).await + }) + .await + .map_err(|e| { + ActionError::action_failed(format!("pantry attach failed with {:?}", e)) })?; Ok(()) @@ -410,7 +348,13 @@ pub async fn call_pantry_detach_for_disk( let client = crucible_pantry_client::Client::new(&endpoint); - retry_until_known_result!(log, { client.detach(&disk_id.to_string()) })?; + retry_until_known_result(log, || async { + client.detach(&disk_id.to_string()).await + }) + .await + .map_err(|e| { + ActionError::action_failed(format!("pantry detach failed with {:?}", e)) + })?; Ok(()) } diff --git a/nexus/src/app/sagas/finalize_disk.rs b/nexus/src/app/sagas/finalize_disk.rs index f8ddff87ba..a26806c6b5 100644 --- a/nexus/src/app/sagas/finalize_disk.rs +++ b/nexus/src/app/sagas/finalize_disk.rs @@ -10,10 +10,10 @@ use super::ActionRegistry; use super::NexusActionContext; use super::NexusSaga; use super::SagaInitError; +use crate::app::sagas::common_storage::call_pantry_detach_for_disk; use crate::app::sagas::snapshot_create; use crate::db::lookup::LookupPath; use crate::external_api::params; -use crate::retry_until_known_result; use crate::{authn, authz}; use nexus_db_model::Generation; use omicron_common::api::external; @@ -284,24 +284,9 @@ async fn sfd_call_pantry_detach_for_disk( ) -> Result<(), ActionError> { let log = sagactx.user_data().log(); let params = sagactx.saga_params::()?; - let pantry_address = sagactx.lookup::("pantry_address")?; - let endpoint = format!("http://{}", pantry_address); - - info!( - log, - "sending detach request for disk {} to pantry endpoint {}", - params.disk_id, - endpoint, - ); - - let disk_id = params.disk_id.to_string(); - let client = crucible_pantry_client::Client::new(&endpoint); - - retry_until_known_result!(log, { client.detach(&disk_id) })?; - - Ok(()) + call_pantry_detach_for_disk(&log, params.disk_id, pantry_address).await } async fn sfd_clear_pantry_address( diff --git a/nexus/src/app/sagas/import_blocks_from_url.rs b/nexus/src/app/sagas/import_blocks_from_url.rs index a623a4636f..b9a1a01ee5 100644 --- a/nexus/src/app/sagas/import_blocks_from_url.rs +++ b/nexus/src/app/sagas/import_blocks_from_url.rs @@ -10,8 +10,8 @@ use super::ActionRegistry; use super::NexusActionContext; use super::NexusSaga; use super::SagaInitError; +use crate::app::sagas::retry_until_known_result; use crate::db::lookup::LookupPath; -use crate::retry_until_known_result; use crate::{authn, authz}; use nexus_db_model::Generation; use nexus_types::external_api::params; @@ -272,8 +272,15 @@ async fn sibfu_call_pantry_import_from_url_for_disk( }, }; - let response = retry_until_known_result!(log, { - client.import_from_url(&disk_id, &request) + let response = retry_until_known_result(log, || async { + client.import_from_url(&disk_id, &request).await + }) + .await + .map_err(|e| { + ActionError::action_failed(format!( + "import from url failed with {:?}", + e + )) })?; Ok(response.job_id.clone()) @@ -301,7 +308,22 @@ async fn sibfu_wait_for_import_from_url( endpoint, ); - while !client.is_job_finished(&job_id).await.unwrap().job_is_finished { + loop { + let result = retry_until_known_result(log, || async { + client.is_job_finished(&job_id).await + }) + .await + .map_err(|e| { + ActionError::action_failed(format!( + "is_job_finished failed with {:?}", + e + )) + })?; + + if result.job_is_finished { + break; + } + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } @@ -313,10 +335,13 @@ async fn sibfu_wait_for_import_from_url( endpoint, ); - let response = client - .job_result_ok(&job_id) - .await - .map_err(|e| ActionError::action_failed(e.to_string()))?; + let response = retry_until_known_result(log, || async { + client.job_result_ok(&job_id).await + }) + .await + .map_err(|e| { + ActionError::action_failed(format!("job_result_ok failed with {:?}", e)) + })?; if !response.job_result_ok { return Err(ActionError::action_failed(format!("Job {job_id} failed"))); diff --git a/nexus/src/app/sagas/instance_create.rs b/nexus/src/app/sagas/instance_create.rs index b92dfed87b..e4e56d16eb 100644 --- a/nexus/src/app/sagas/instance_create.rs +++ b/nexus/src/app/sagas/instance_create.rs @@ -6,6 +6,7 @@ use super::{NexusActionContext, NexusSaga, SagaInitError, ACTION_GENERATE_ID}; use crate::app::instance::WriteBackUpdatedInstance; use crate::app::sagas::declare_saga_actions; use crate::app::sagas::disk_create::{self, SagaDiskCreate}; +use crate::app::sagas::retry_until_known_result; use crate::app::{ MAX_DISKS_PER_INSTANCE, MAX_EXTERNAL_IPS_PER_INSTANCE, MAX_NICS_PER_INSTANCE, @@ -437,34 +438,23 @@ async fn sic_remove_network_config( debug!(log, "deleting nat mapping for entry: {target_ip:#?}"); - let result = match target_ip.ip { - ipnetwork::IpNetwork::V4(network) => { - dpd_client - .nat_ipv4_delete(&network.ip(), *target_ip.first_port) - .await - } - ipnetwork::IpNetwork::V6(network) => { - dpd_client - .nat_ipv6_delete(&network.ip(), *target_ip.first_port) - .await - } - }; + let result = retry_until_known_result(log, || async { + dpd_client + .ensure_nat_entry_deleted(log, target_ip.ip, *target_ip.first_port) + .await + }) + .await; + match result { Ok(_) => { debug!(log, "deletion of nat entry successful for: {target_ip:#?}"); Ok(()) } - Err(e) => { - if e.status() == Some(http::StatusCode::NOT_FOUND) { - debug!(log, "no nat entry found for: {target_ip:#?}"); - Ok(()) - } else { - Err(ActionError::action_failed(Error::internal_error( - &format!("failed to delete nat entry via dpd: {e}"), - ))) - } - } + Err(e) => Err(Error::internal_error(&format!( + "failed to delete nat entry via dpd: {e}" + ))), }?; + Ok(()) } diff --git a/nexus/src/app/sagas/instance_delete.rs b/nexus/src/app/sagas/instance_delete.rs index cd6dc86181..4e9654dcd9 100644 --- a/nexus/src/app/sagas/instance_delete.rs +++ b/nexus/src/app/sagas/instance_delete.rs @@ -6,6 +6,7 @@ use super::ActionRegistry; use super::NexusActionContext; use super::NexusSaga; use crate::app::sagas::declare_saga_actions; +use crate::app::sagas::retry_until_known_result; use crate::db; use crate::db::lookup::LookupPath; use crate::{authn, authz}; @@ -165,34 +166,25 @@ async fn sid_delete_network_config( // bailing out before we've attempted deletion of all entries. for entry in external_ips { debug!(log, "deleting nat mapping for entry: {entry:#?}"); - let result = match entry.ip { - ipnetwork::IpNetwork::V4(network) => { - dpd_client - .nat_ipv4_delete(&network.ip(), *entry.first_port) - .await - } - ipnetwork::IpNetwork::V6(network) => { - dpd_client - .nat_ipv6_delete(&network.ip(), *entry.first_port) - .await - } - }; + + let result = retry_until_known_result(log, || async { + dpd_client + .ensure_nat_entry_deleted(log, entry.ip, *entry.first_port) + .await + }) + .await; match result { Ok(_) => { debug!(log, "deletion of nat entry successful for: {entry:#?}"); } Err(e) => { - if e.status() == Some(http::StatusCode::NOT_FOUND) { - debug!(log, "no nat entry found for: {entry:#?}"); - } else { - let new_error = - ActionError::action_failed(Error::internal_error( - &format!("failed to delete nat entry via dpd: {e}"), - )); - error!(log, "{new_error:#?}"); - errors.push(new_error); - } + let new_error = + ActionError::action_failed(Error::internal_error( + &format!("failed to delete nat entry via dpd: {e}"), + )); + error!(log, "{new_error:#?}"); + errors.push(new_error); } } } diff --git a/nexus/src/app/sagas/loopback_address_create.rs b/nexus/src/app/sagas/loopback_address_create.rs index 7f698f3bda..26c5d0bf3a 100644 --- a/nexus/src/app/sagas/loopback_address_create.rs +++ b/nexus/src/app/sagas/loopback_address_create.rs @@ -3,6 +3,7 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use super::{NexusActionContext, NEXUS_DPD_TAG}; +use crate::app::sagas::retry_until_known_result; use crate::app::sagas::{ declare_saga_actions, ActionRegistry, NexusSaga, SagaInitError, }; @@ -11,9 +12,7 @@ use crate::authz; use crate::db::model::LoopbackAddress; use crate::external_api::params; use anyhow::Error; -use dpd_client::types::{Ipv4Entry, Ipv6Entry}; use serde::{Deserialize, Serialize}; -use std::net::IpAddr; use std::sync::Arc; use steno::ActionError; @@ -135,10 +134,10 @@ async fn slc_loopback_address_create( ) -> Result<(), ActionError> { let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; + let log = sagactx.user_data().log(); // TODO: https://github.com/oxidecomputer/omicron/issues/2629 if let Ok(_) = std::env::var("SKIP_ASIC_CONFIG") { - let log = sagactx.user_data().log(); debug!(log, "SKIP_ASIC_CONFIG is set, disabling calls to dendrite"); return Ok(()); }; @@ -149,36 +148,15 @@ async fn slc_loopback_address_create( let dpd_client: Arc = Arc::clone(&osagactx.nexus().dpd_client); - let result = match ¶ms.loopback_address.address { - IpAddr::V4(a) => { - dpd_client - .loopback_ipv4_create(&Ipv4Entry { - addr: *a, - tag: NEXUS_DPD_TAG.into(), - }) - .await - } - IpAddr::V6(a) => { - dpd_client - .loopback_ipv6_create(&Ipv6Entry { - addr: *a, - tag: NEXUS_DPD_TAG.into(), - }) - .await - } - }; - - if let Err(e) = result { - match e { - sled_agent_client::Error::ErrorResponse(ref er) => { - match er.status() { - http::StatusCode::CONFLICT => Ok(()), - _ => Err(ActionError::action_failed(e.to_string())), - } - } - _ => Err(ActionError::action_failed(e.to_string())), - }?; - } - - Ok(()) + retry_until_known_result(log, || async { + dpd_client + .ensure_loopback_created( + log, + params.loopback_address.address, + NEXUS_DPD_TAG, + ) + .await + }) + .await + .map_err(|e| ActionError::action_failed(e.to_string())) } diff --git a/nexus/src/app/sagas/loopback_address_delete.rs b/nexus/src/app/sagas/loopback_address_delete.rs index 4790401034..0419659324 100644 --- a/nexus/src/app/sagas/loopback_address_delete.rs +++ b/nexus/src/app/sagas/loopback_address_delete.rs @@ -3,6 +3,7 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use super::NexusActionContext; +use crate::app::sagas::retry_until_known_result; use crate::app::sagas::{ declare_saga_actions, ActionRegistry, NexusSaga, SagaInitError, }; @@ -162,10 +163,10 @@ async fn slc_loopback_address_delete( ) -> Result<(), ActionError> { let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; + let log = sagactx.user_data().log(); // TODO: https://github.com/oxidecomputer/omicron/issues/2629 if let Ok(_) = std::env::var("SKIP_ASIC_CONFIG") { - let log = sagactx.user_data().log(); debug!(log, "SKIP_ASIC_CONFIG is set, disabling calls to dendrite"); return Ok(()); }; @@ -176,11 +177,9 @@ async fn slc_loopback_address_delete( let dpd_client: Arc = Arc::clone(&osagactx.nexus().dpd_client); - match ¶ms.address { - IpNet::V4(a) => dpd_client.loopback_ipv4_delete(&a.ip()).await, - IpNet::V6(a) => dpd_client.loopback_ipv6_delete(&a.ip()).await, - } - .map_err(|e| ActionError::action_failed(e.to_string()))?; - - Ok(()) + retry_until_known_result(log, || async { + dpd_client.ensure_loopback_deleted(log, params.address.ip()).await + }) + .await + .map_err(|e| ActionError::action_failed(e.to_string())) } diff --git a/nexus/src/app/sagas/mod.rs b/nexus/src/app/sagas/mod.rs index cd5cda8c27..1b7b85ae13 100644 --- a/nexus/src/app/sagas/mod.rs +++ b/nexus/src/app/sagas/mod.rs @@ -308,3 +308,85 @@ pub(crate) use __action_name; pub(crate) use __emit_action; pub(crate) use __stringify_ident; pub(crate) use declare_saga_actions; + +use futures::Future; + +/// Retry a progenitor client operation until a known result is returned. +/// +/// Saga execution relies on the outcome of an external call being known: since +/// they are idempotent, reissue the external call until a known result comes +/// back. Retry if a communication error is seen, or if another retryable error +/// is seen. +/// +/// Note that retrying is only valid if the call itself is idempotent. +pub(crate) async fn retry_until_known_result( + log: &slog::Logger, + mut f: F, +) -> Result> +where + F: FnMut() -> Fut, + Fut: Future>>, + E: std::fmt::Debug, +{ + use omicron_common::backoff; + + backoff::retry_notify( + backoff::retry_policy_internal_service(), + move || { + let fut = f(); + async move { + match fut.await { + Err(progenitor_client::Error::CommunicationError(e)) => { + warn!( + log, + "saw transient communication error {}, retrying...", + e, + ); + + Err(backoff::BackoffError::transient( + progenitor_client::Error::CommunicationError(e), + )) + } + + Err(progenitor_client::Error::ErrorResponse( + response_value, + )) => { + match response_value.status() { + // Retry on 503 or 429 + http::StatusCode::SERVICE_UNAVAILABLE + | http::StatusCode::TOO_MANY_REQUESTS => { + Err(backoff::BackoffError::transient( + progenitor_client::Error::ErrorResponse( + response_value, + ), + )) + } + + // Anything elses is a permanent error + _ => Err(backoff::BackoffError::Permanent( + progenitor_client::Error::ErrorResponse( + response_value, + ), + )), + } + } + + Err(e) => { + warn!(log, "saw permanent error {}, aborting", e,); + + Err(backoff::BackoffError::Permanent(e)) + } + + Ok(v) => Ok(v), + } + } + }, + |error: progenitor_client::Error<_>, delay| { + warn!( + log, + "failed external call ({:?}), will retry in {:?}", error, delay, + ); + }, + ) + .await +} diff --git a/nexus/src/app/sagas/snapshot_create.rs b/nexus/src/app/sagas/snapshot_create.rs index ddd32fad1b..cb9547217b 100644 --- a/nexus/src/app/sagas/snapshot_create.rs +++ b/nexus/src/app/sagas/snapshot_create.rs @@ -98,6 +98,7 @@ use super::{ ACTION_GENERATE_ID, }; use crate::app::sagas::declare_saga_actions; +use crate::app::sagas::retry_until_known_result; use crate::db::identity::{Asset, Resource}; use crate::db::lookup::LookupPath; use crate::external_api::params; @@ -680,15 +681,18 @@ async fn ssc_send_snapshot_request_to_sled_agent( info!(log, "instance {} sled agent created ok", instance_id); // Send a snapshot request to propolis through sled agent - sled_agent_client - .instance_issue_disk_snapshot_request( - &instance.id(), - &disk.id(), - &InstanceIssueDiskSnapshotRequestBody { snapshot_id }, - ) - .await - .map_err(|e| e.to_string()) - .map_err(ActionError::action_failed)?; + retry_until_known_result(log, || async { + sled_agent_client + .instance_issue_disk_snapshot_request( + &instance.id(), + &disk.id(), + &InstanceIssueDiskSnapshotRequestBody { snapshot_id }, + ) + .await + }) + .await + .map_err(|e| e.to_string()) + .map_err(ActionError::action_failed)?; Ok(()) } @@ -733,12 +737,15 @@ async fn ssc_send_snapshot_request_to_sled_agent_undo( let url = format!("http://{}", dataset.address()); let client = CrucibleAgentClient::new(&url); - client - .region_delete_snapshot( - &RegionId(region.id().to_string()), - &snapshot_id.to_string(), - ) - .await?; + retry_until_known_result(log, || async { + client + .region_delete_snapshot( + &RegionId(region.id().to_string()), + &snapshot_id.to_string(), + ) + .await + }) + .await?; } Ok(()) } @@ -1006,17 +1013,20 @@ async fn ssc_call_pantry_snapshot_for_disk( let client = crucible_pantry_client::Client::new(&endpoint); - client - .snapshot( - ¶ms.disk_id.to_string(), - &crucible_pantry_client::types::SnapshotRequest { - snapshot_id: snapshot_id.to_string(), - }, - ) - .await - .map_err(|e| { - ActionError::action_failed(Error::internal_error(&e.to_string())) - })?; + retry_until_known_result(log, || async { + client + .snapshot( + ¶ms.disk_id.to_string(), + &crucible_pantry_client::types::SnapshotRequest { + snapshot_id: snapshot_id.to_string(), + }, + ) + .await + }) + .await + .map_err(|e| { + ActionError::action_failed(Error::internal_error(&e.to_string())) + })?; Ok(()) } @@ -1049,12 +1059,15 @@ async fn ssc_call_pantry_snapshot_for_disk_undo( let url = format!("http://{}", dataset.address()); let client = CrucibleAgentClient::new(&url); - client - .region_delete_snapshot( - &RegionId(region.id().to_string()), - &snapshot_id.to_string(), - ) - .await?; + retry_until_known_result(log, || async { + client + .region_delete_snapshot( + &RegionId(region.id().to_string()), + &snapshot_id.to_string(), + ) + .await + }) + .await?; } Ok(()) } @@ -1196,31 +1209,39 @@ async fn ssc_start_running_snapshot( info!(log, "dataset {:?} region {:?} url {}", dataset, region, url); // Validate with the Crucible agent that the snapshot exists - let crucible_region = client - .region_get(&RegionId(region.id().to_string())) - .await - .map_err(|e| e.to_string()) - .map_err(ActionError::action_failed)?; + let crucible_region = retry_until_known_result(log, || async { + client.region_get(&RegionId(region.id().to_string())).await + }) + .await + .map_err(|e| e.to_string()) + .map_err(ActionError::action_failed)?; info!(log, "crucible region {:?}", crucible_region); - let crucible_snapshot = client - .region_get_snapshot( - &RegionId(region.id().to_string()), - &snapshot_id.to_string(), - ) - .await - .map_err(|e| e.to_string()) - .map_err(ActionError::action_failed)?; + let crucible_snapshot = retry_until_known_result(log, || async { + client + .region_get_snapshot( + &RegionId(region.id().to_string()), + &snapshot_id.to_string(), + ) + .await + }) + .await + .map_err(|e| e.to_string()) + .map_err(ActionError::action_failed)?; info!(log, "crucible snapshot {:?}", crucible_snapshot); // Start the snapshot running - let crucible_running_snapshot = client - .region_run_snapshot( - &RegionId(region.id().to_string()), - &snapshot_id.to_string(), - ) + let crucible_running_snapshot = + retry_until_known_result(log, || async { + client + .region_run_snapshot( + &RegionId(region.id().to_string()), + &snapshot_id.to_string(), + ) + .await + }) .await .map_err(|e| e.to_string()) .map_err(ActionError::action_failed)?; @@ -1288,25 +1309,26 @@ async fn ssc_start_running_snapshot_undo( use crucible_agent_client::Error::ErrorResponse; use http::status::StatusCode; - client - .region_delete_running_snapshot( - &RegionId(region.id().to_string()), - &snapshot_id.to_string(), - ) - .await - .map(|_| ()) - // NOTE: If we later create a volume record and delete it, the - // running snapshot may be deleted (see: - // ssc_create_volume_record_undo). - // - // To cope, we treat "running snapshot not found" as "Ok", since it - // may just be the result of the volume deletion steps completing. - .or_else(|err| match err { - ErrorResponse(r) if r.status() == StatusCode::NOT_FOUND => { - Ok(()) - } - _ => Err(err), - })?; + retry_until_known_result(log, || async { + client + .region_delete_running_snapshot( + &RegionId(region.id().to_string()), + &snapshot_id.to_string(), + ) + .await + }) + .await + .map(|_| ()) + // NOTE: If we later create a volume record and delete it, the + // running snapshot may be deleted (see: + // ssc_create_volume_record_undo). + // + // To cope, we treat "running snapshot not found" as "Ok", since it + // may just be the result of the volume deletion steps completing. + .or_else(|err| match err { + ErrorResponse(r) if r.status() == StatusCode::NOT_FOUND => Ok(()), + _ => Err(err), + })?; osagactx .datastore() .region_snapshot_remove(dataset.id(), region.id(), snapshot_id) diff --git a/nexus/src/app/sagas/switch_port_settings_apply.rs b/nexus/src/app/sagas/switch_port_settings_apply.rs index 0e94618afc..887c1ee5cd 100644 --- a/nexus/src/app/sagas/switch_port_settings_apply.rs +++ b/nexus/src/app/sagas/switch_port_settings_apply.rs @@ -3,6 +3,7 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use super::{NexusActionContext, NEXUS_DPD_TAG}; +use crate::app::sagas::retry_until_known_result; use crate::app::sagas::{ declare_saga_actions, ActionRegistry, NexusSaga, SagaInitError, }; @@ -200,6 +201,7 @@ async fn spa_ensure_switch_port_settings( ) -> Result<(), ActionError> { let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; + let log = sagactx.user_data().log(); let settings = sagactx .lookup::("switch_port_settings")?; @@ -213,10 +215,11 @@ async fn spa_ensure_switch_port_settings( let dpd_port_settings = api_to_dpd_port_settings(&settings) .map_err(ActionError::action_failed)?; - dpd_client - .port_settings_apply(&port_id, &dpd_port_settings) - .await - .map_err(|e| ActionError::action_failed(e.to_string()))?; + retry_until_known_result(log, || async { + dpd_client.port_settings_apply(&port_id, &dpd_port_settings).await + }) + .await + .map_err(|e| ActionError::action_failed(e.to_string()))?; Ok(()) } @@ -231,6 +234,7 @@ async fn spa_undo_ensure_switch_port_settings( &sagactx, ¶ms.serialized_authn, ); + let log = sagactx.user_data().log(); let port_id: PortId = PortId::from_str(¶ms.switch_port_name) .map_err(|e| external::Error::internal_error(e))?; @@ -245,10 +249,11 @@ async fn spa_undo_ensure_switch_port_settings( let id = match orig_port_settings_id { Some(id) => id, None => { - dpd_client - .port_settings_clear(&port_id) - .await - .map_err(|e| external::Error::internal_error(&e.to_string()))?; + retry_until_known_result(log, || async { + dpd_client.port_settings_clear(&port_id).await + }) + .await + .map_err(|e| external::Error::internal_error(&e.to_string()))?; return Ok(()); } @@ -262,10 +267,11 @@ async fn spa_undo_ensure_switch_port_settings( let dpd_port_settings = api_to_dpd_port_settings(&settings) .map_err(ActionError::action_failed)?; - dpd_client - .port_settings_apply(&port_id, &dpd_port_settings) - .await - .map_err(|e| external::Error::internal_error(&e.to_string()))?; + retry_until_known_result(log, || async { + dpd_client.port_settings_apply(&port_id, &dpd_port_settings).await + }) + .await + .map_err(|e| external::Error::internal_error(&e.to_string()))?; Ok(()) } diff --git a/nexus/src/app/sagas/switch_port_settings_clear.rs b/nexus/src/app/sagas/switch_port_settings_clear.rs index 7462f8d33a..3dd6941f5b 100644 --- a/nexus/src/app/sagas/switch_port_settings_clear.rs +++ b/nexus/src/app/sagas/switch_port_settings_clear.rs @@ -3,6 +3,7 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use super::NexusActionContext; +use crate::app::sagas::retry_until_known_result; use crate::app::sagas::switch_port_settings_apply::api_to_dpd_port_settings; use crate::app::sagas::{ declare_saga_actions, ActionRegistry, NexusSaga, SagaInitError, @@ -123,6 +124,7 @@ async fn spa_clear_switch_port_settings( ) -> Result<(), ActionError> { let osagactx = sagactx.user_data(); let params = sagactx.saga_params::()?; + let log = sagactx.user_data().log(); let port_id: PortId = PortId::from_str(¶ms.port_name) .map_err(|e| ActionError::action_failed(e.to_string()))?; @@ -130,10 +132,11 @@ async fn spa_clear_switch_port_settings( let dpd_client: Arc = Arc::clone(&osagactx.nexus().dpd_client); - dpd_client - .port_settings_clear(&port_id) - .await - .map_err(|e| ActionError::action_failed(e.to_string()))?; + retry_until_known_result(log, || async { + dpd_client.port_settings_clear(&port_id).await + }) + .await + .map_err(|e| ActionError::action_failed(e.to_string()))?; Ok(()) } @@ -148,6 +151,7 @@ async fn spa_undo_clear_switch_port_settings( &sagactx, ¶ms.serialized_authn, ); + let log = sagactx.user_data().log(); let port_id: PortId = PortId::from_str(¶ms.port_name) .map_err(|e| external::Error::internal_error(e))?; @@ -172,10 +176,11 @@ async fn spa_undo_clear_switch_port_settings( let dpd_port_settings = api_to_dpd_port_settings(&settings) .map_err(ActionError::action_failed)?; - dpd_client - .port_settings_apply(&port_id, &dpd_port_settings) - .await - .map_err(|e| external::Error::internal_error(&e.to_string()))?; + retry_until_known_result(log, || async { + dpd_client.port_settings_apply(&port_id, &dpd_port_settings).await + }) + .await + .map_err(|e| external::Error::internal_error(&e.to_string()))?; Ok(()) } diff --git a/nexus/src/app/sled.rs b/nexus/src/app/sled.rs index 37d07a6d67..2124760f6a 100644 --- a/nexus/src/app/sled.rs +++ b/nexus/src/app/sled.rs @@ -4,6 +4,7 @@ //! Sleds, and the hardware and services within them. +use crate::app::sagas::retry_until_known_result; use crate::authz; use crate::db; use crate::db::identity::Asset; @@ -381,11 +382,16 @@ impl super::Nexus { vni: nic.vni.clone(), }; + let log = self.log.clone(); + // This function is idempotent: calling the set_v2p ioctl with // the same information is a no-op. join_handles.push(tokio::spawn(futures::future::lazy( move |_ctx| async move { - client.set_v2p(&nic_id, &mapping).await + retry_until_known_result(&log, || async { + client.set_v2p(&nic_id, &mapping).await + }) + .await }, ))); } @@ -484,11 +490,16 @@ impl super::Nexus { vni: nic.vni.clone(), }; + let log = self.log.clone(); + // This function is idempotent: calling the set_v2p ioctl with // the same information is a no-op. join_handles.push(tokio::spawn(futures::future::lazy( move |_ctx| async move { - client.del_v2p(&nic_id, &mapping).await + retry_until_known_result(&log, || async { + client.del_v2p(&nic_id, &mapping).await + }) + .await }, ))); }