Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry progenitor client calls in sagas #3225

Merged
merged 7 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ parse-display.workspace = true
paste.workspace = true
# See omicron-rpaths for more about the "pq-sys" dependency.
pq-sys = "*"
progenitor-client.workspace = true
propolis-client.workspace = true
rand.workspace = true
ref-cast.workspace = true
Expand Down
79 changes: 11 additions & 68 deletions nexus/src/app/sagas/common_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::authz;
use crate::db;
use crate::db::identity::Asset;
use crate::db::lookup::LookupPath;
use crate::retry_until_known_result;
use crate::Nexus;
use anyhow::anyhow;
use crucible_agent_client::{
Expand Down Expand Up @@ -281,73 +282,6 @@ pub async fn get_pantry_address(

// Common Pantry operations

#[macro_export]
macro_rules! retry_until_known_result {
( $log:ident, $func:block ) => {{
use omicron_common::backoff;

#[derive(Debug, thiserror::Error)]
enum InnerError {
#[error("Reqwest error: {0}")]
Reqwest(#[from] reqwest::Error),

#[error("Pantry client error: {0}")]
PantryClient(
#[from]
crucible_pantry_client::Error<
crucible_pantry_client::types::Error,
>,
),
}

backoff::retry_notify(
backoff::retry_policy_internal_service(),
|| async {
match ($func).await {
Err(crucible_pantry_client::Error::CommunicationError(
e,
)) => {
warn!(
$log,
"saw transient communication error {}, retrying...",
e,
);

Err(backoff::BackoffError::<InnerError>::transient(
e.into(),
))
}

Err(e) => {
warn!($log, "saw permanent error {}, aborting", e,);

Err(backoff::BackoffError::<InnerError>::Permanent(
e.into(),
))
}

Ok(v) => Ok(v),
}
},
|error: InnerError, delay| {
warn!(
$log,
"failed external call ({:?}), will retry in {:?}",
error,
delay,
);
},
)
.await
.map_err(|e| {
ActionError::action_failed(format!(
"gave up on external call due to {:?}",
e
))
})
}};
}

pub async fn call_pantry_attach_for_disk(
log: &slog::Logger,
opctx: &OpContext,
Expand Down Expand Up @@ -394,6 +328,9 @@ pub async fn call_pantry_attach_for_disk(

retry_until_known_result!(log, {
client.attach(&disk_id.to_string(), &attach_request)
})
.map_err(|e| {
ActionError::action_failed(format!("pantry attach failed with {:?}", e))
})?;

Ok(())
Expand All @@ -410,7 +347,13 @@ pub async fn call_pantry_detach_for_disk(

let client = crucible_pantry_client::Client::new(&endpoint);

retry_until_known_result!(log, { client.detach(&disk_id.to_string()) })?;
retry_until_known_result!(log, { client.detach(&disk_id.to_string()) })
.map_err(|e| {
ActionError::action_failed(format!(
"pantry detach failed with {:?}",
e
))
})?;

Ok(())
}
19 changes: 2 additions & 17 deletions nexus/src/app/sagas/finalize_disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use super::ActionRegistry;
use super::NexusActionContext;
use super::NexusSaga;
use super::SagaInitError;
use crate::app::sagas::common_storage::call_pantry_detach_for_disk;
use crate::app::sagas::snapshot_create;
use crate::db::lookup::LookupPath;
use crate::external_api::params;
use crate::retry_until_known_result;
use crate::{authn, authz};
use nexus_db_model::Generation;
use omicron_common::api::external;
Expand Down Expand Up @@ -284,24 +284,9 @@ async fn sfd_call_pantry_detach_for_disk(
) -> Result<(), ActionError> {
let log = sagactx.user_data().log();
let params = sagactx.saga_params::<Params>()?;

let pantry_address = sagactx.lookup::<SocketAddrV6>("pantry_address")?;
let endpoint = format!("http://{}", pantry_address);

info!(
log,
"sending detach request for disk {} to pantry endpoint {}",
params.disk_id,
endpoint,
);

let disk_id = params.disk_id.to_string();

let client = crucible_pantry_client::Client::new(&endpoint);

retry_until_known_result!(log, { client.detach(&disk_id) })?;

Ok(())
call_pantry_detach_for_disk(&log, params.disk_id, pantry_address).await
}

async fn sfd_clear_pantry_address(
Expand Down
33 changes: 28 additions & 5 deletions nexus/src/app/sagas/import_blocks_from_url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,12 @@ async fn sibfu_call_pantry_import_from_url_for_disk(

let response = retry_until_known_result!(log, {
client.import_from_url(&disk_id, &request)
})
.map_err(|e| {
ActionError::action_failed(format!(
"import from url failed with {:?}",
e
))
})?;

Ok(response.job_id.clone())
Expand Down Expand Up @@ -301,7 +307,20 @@ async fn sibfu_wait_for_import_from_url(
endpoint,
);

while !client.is_job_finished(&job_id).await.unwrap().job_is_finished {
loop {
let result =
retry_until_known_result!(log, { client.is_job_finished(&job_id) })
.map_err(|e| {
ActionError::action_failed(format!(
"is_job_finished failed with {:?}",
e
))
})?;

if result.job_is_finished {
break;
}

tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}

Expand All @@ -313,10 +332,14 @@ async fn sibfu_wait_for_import_from_url(
endpoint,
);

let response = client
.job_result_ok(&job_id)
.await
.map_err(|e| ActionError::action_failed(e.to_string()))?;
let response =
retry_until_known_result!(log, { client.job_result_ok(&job_id) })
.map_err(|e| {
ActionError::action_failed(format!(
"job_result_ok failed with {:?}",
e
))
})?;

if !response.job_result_ok {
return Err(ActionError::action_failed(format!("Job {job_id} failed")));
Expand Down
14 changes: 8 additions & 6 deletions nexus/src/app/sagas/instance_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::db::lookup::LookupPath;
use crate::db::model::ByteCount as DbByteCount;
use crate::db::queries::network_interface::InsertError as InsertNicError;
use crate::external_api::params;
use crate::retry_until_known_result;
use crate::{authn, authz, db};
use chrono::Utc;
use nexus_db_model::NetworkInterfaceKind;
Expand Down Expand Up @@ -437,16 +438,17 @@ async fn sic_remove_network_config(

let result = match target_ip.ip {
ipnetwork::IpNetwork::V4(network) => {
dpd_client
.nat_ipv4_delete(&network.ip(), *target_ip.first_port)
.await
retry_until_known_result!(log, {
dpd_client.nat_ipv4_delete(&network.ip(), *target_ip.first_port)
})
}
ipnetwork::IpNetwork::V6(network) => {
dpd_client
.nat_ipv6_delete(&network.ip(), *target_ip.first_port)
.await
retry_until_known_result!(log, {
dpd_client.nat_ipv6_delete(&network.ip(), *target_ip.first_port)
})
}
};

match result {
Ok(_) => {
debug!(log, "deletion of nat entry successful for: {target_ip:#?}");
Expand Down
13 changes: 7 additions & 6 deletions nexus/src/app/sagas/instance_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::NexusSaga;
use crate::app::sagas::declare_saga_actions;
use crate::db;
use crate::db::lookup::LookupPath;
use crate::retry_until_known_result;
use crate::{authn, authz};
use nexus_types::identity::Resource;
use omicron_common::api::external::{Error, ResourceType};
Expand Down Expand Up @@ -167,14 +168,14 @@ async fn sid_delete_network_config(
debug!(log, "deleting nat mapping for entry: {entry:#?}");
let result = match entry.ip {
ipnetwork::IpNetwork::V4(network) => {
dpd_client
.nat_ipv4_delete(&network.ip(), *entry.first_port)
.await
retry_until_known_result!(log, {
dpd_client.nat_ipv4_delete(&network.ip(), *entry.first_port)
})
}
ipnetwork::IpNetwork::V6(network) => {
dpd_client
.nat_ipv6_delete(&network.ip(), *entry.first_port)
.await
retry_until_known_result!(log, {
dpd_client.nat_ipv6_delete(&network.ip(), *entry.first_port)
})
}
};

Expand Down
15 changes: 8 additions & 7 deletions nexus/src/app/sagas/loopback_address_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::authn;
use crate::authz;
use crate::db::model::LoopbackAddress;
use crate::external_api::params;
use crate::retry_until_known_result;
use anyhow::Error;
use dpd_client::types::{Ipv4Entry, Ipv6Entry};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -135,10 +136,10 @@ async fn slc_loopback_address_create(
) -> Result<(), ActionError> {
let osagactx = sagactx.user_data();
let params = sagactx.saga_params::<Params>()?;
let log = sagactx.user_data().log();

// TODO: https://github.com/oxidecomputer/omicron/issues/2629
if let Ok(_) = std::env::var("SKIP_ASIC_CONFIG") {
let log = sagactx.user_data().log();
debug!(log, "SKIP_ASIC_CONFIG is set, disabling calls to dendrite");
return Ok(());
};
Expand All @@ -151,20 +152,20 @@ async fn slc_loopback_address_create(

let result = match &params.loopback_address.address {
IpAddr::V4(a) => {
dpd_client
.loopback_ipv4_create(&Ipv4Entry {
retry_until_known_result!(log, {
dpd_client.loopback_ipv4_create(&Ipv4Entry {
addr: *a,
tag: NEXUS_DPD_TAG.into(),
})
.await
})
}
IpAddr::V6(a) => {
dpd_client
.loopback_ipv6_create(&Ipv6Entry {
retry_until_known_result!(log, {
dpd_client.loopback_ipv6_create(&Ipv6Entry {
addr: *a,
tag: NEXUS_DPD_TAG.into(),
})
.await
})
}
};

Expand Down
11 changes: 8 additions & 3 deletions nexus/src/app/sagas/loopback_address_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::authn;
use crate::authz;
use crate::db::model::{LoopbackAddress, Name};
use crate::external_api::params;
use crate::retry_until_known_result;
use anyhow::{anyhow, Error};
use nexus_types::identity::Asset;
use omicron_common::api::external::{IpNet, NameOrId};
Expand Down Expand Up @@ -162,10 +163,10 @@ async fn slc_loopback_address_delete(
) -> Result<(), ActionError> {
let osagactx = sagactx.user_data();
let params = sagactx.saga_params::<Params>()?;
let log = sagactx.user_data().log();

// TODO: https://github.com/oxidecomputer/omicron/issues/2629
if let Ok(_) = std::env::var("SKIP_ASIC_CONFIG") {
let log = sagactx.user_data().log();
debug!(log, "SKIP_ASIC_CONFIG is set, disabling calls to dendrite");
return Ok(());
};
Expand All @@ -177,8 +178,12 @@ async fn slc_loopback_address_delete(
Arc::clone(&osagactx.nexus().dpd_client);

match &params.address {
IpNet::V4(a) => dpd_client.loopback_ipv4_delete(&a.ip()).await,
IpNet::V6(a) => dpd_client.loopback_ipv6_delete(&a.ip()).await,
IpNet::V4(a) => retry_until_known_result!(log, {
dpd_client.loopback_ipv4_delete(&a.ip())
}),
IpNet::V6(a) => retry_until_known_result!(log, {
dpd_client.loopback_ipv6_delete(&a.ip())
}),
}
.map_err(|e| ActionError::action_failed(e.to_string()))?;

Expand Down
Loading