Skip to content

Commit

Permalink
Retry progenitor client calls in sagas (#3225)
Browse files Browse the repository at this point in the history
Generalize the retry_until_known_result macro into a function, and wrap
progenitor client calls from saga nodes. This will retry in the face of
transient errors, and reduce

1) the times that sagas fail due to network weather, and
2) the times that saga unwinds fail for the same reason.
  • Loading branch information
jmpesp authored Jun 9, 2023
1 parent 051cd95 commit d75bda7
Show file tree
Hide file tree
Showing 16 changed files with 463 additions and 290 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

128 changes: 127 additions & 1 deletion dpd-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub struct ClientState {
impl Client {
/// Ensure that a NAT entry exists, overwriting a previous conflicting entry if
/// applicable.
///
/// nat_ipv[46]_create are not idempotent (see oxidecomputer/dendrite#343),
/// but this wrapper function is. Call this from sagas instead.
#[allow(clippy::too_many_arguments)]
pub async fn ensure_nat_entry(
&self,
Expand Down Expand Up @@ -131,13 +134,136 @@ impl Client {

Ok(())
}

/// Ensure that a NAT entry is deleted.
///
/// nat_ipv[46]_delete are not idempotent (see oxidecomputer/dendrite#343),
/// but this wrapper function is. Call this from sagas instead.
pub async fn ensure_nat_entry_deleted(
&self,
log: &Logger,
target_ip: ipnetwork::IpNetwork,
target_first_port: u16,
) -> Result<(), progenitor_client::Error<types::Error>> {
let result = match target_ip {
ipnetwork::IpNetwork::V4(network) => {
self.nat_ipv4_delete(&network.ip(), target_first_port).await
}
ipnetwork::IpNetwork::V6(network) => {
self.nat_ipv6_delete(&network.ip(), target_first_port).await
}
};

match result {
Ok(_) => {
info!(log, "deleted old nat entry"; "target_ip" => ?target_ip);
}

Err(e) => {
if e.status() == Some(http::StatusCode::NOT_FOUND) {
info!(log, "no nat entry found for: {target_ip:#?}");
} else {
return Err(e);
}
}
}

Ok(())
}

/// Ensure that a loopback address is created.
///
/// loopback_ipv[46]_create are not idempotent (see
/// oxidecomputer/dendrite#343), but this wrapper function is. Call this
/// from sagas instead.
pub async fn ensure_loopback_created(
&self,
log: &Logger,
address: IpAddr,
tag: &str,
) -> Result<(), progenitor_client::Error<types::Error>> {
let result = match &address {
IpAddr::V4(a) => {
self.loopback_ipv4_create(&types::Ipv4Entry {
addr: *a,
tag: tag.into(),
})
.await
}
IpAddr::V6(a) => {
self.loopback_ipv6_create(&types::Ipv6Entry {
addr: *a,
tag: tag.into(),
})
.await
}
};

match result {
Ok(_) => {
info!(log, "created loopback address"; "address" => ?address);
Ok(())
}

Err(progenitor_client::Error::ErrorResponse(er)) => {
match er.status() {
http::StatusCode::CONFLICT => {
info!(log, "loopback address already created"; "address" => ?address);

Ok(())
}

_ => Err(progenitor_client::Error::ErrorResponse(er)),
}
}

Err(e) => Err(e),
}
}

/// Ensure that a loopback address is deleted.
///
/// loopback_ipv[46]_delete are not idempotent (see
/// oxidecomputer/dendrite#343), but this wrapper function is. Call this
/// from sagas instead.
pub async fn ensure_loopback_deleted(
&self,
log: &Logger,
address: IpAddr,
) -> Result<(), progenitor_client::Error<types::Error>> {
let result = match &address {
IpAddr::V4(a) => self.loopback_ipv4_delete(&a).await,
IpAddr::V6(a) => self.loopback_ipv6_delete(&a).await,
};

match result {
Ok(_) => {
info!(log, "deleted loopback address"; "address" => ?address);
Ok(())
}

Err(progenitor_client::Error::ErrorResponse(er)) => {
match er.status() {
http::StatusCode::NOT_FOUND => {
info!(log, "loopback address already deleted"; "address" => ?address);
Ok(())
}

_ => Err(progenitor_client::Error::ErrorResponse(er)),
}
}

Err(e) => Err(e),
}
}
}

// XXX delete everything below once we use the real dpd-client crate.
// https://github.com/oxidecomputer/omicron/issues/2775

use std::convert::TryFrom;
use std::fmt;
use std::net::{Ipv4Addr, Ipv6Addr};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::str::FromStr;

use schemars::JsonSchema;
Expand Down
1 change: 1 addition & 0 deletions nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ parse-display.workspace = true
paste.workspace = true
# See omicron-rpaths for more about the "pq-sys" dependency.
pq-sys = "*"
progenitor-client.workspace = true
propolis-client.workspace = true
rand.workspace = true
ref-cast.workspace = true
Expand Down
38 changes: 22 additions & 16 deletions nexus/src/app/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::MAX_DISKS_PER_INSTANCE;
use super::MAX_EXTERNAL_IPS_PER_INSTANCE;
use super::MAX_NICS_PER_INSTANCE;
use crate::app::sagas;
use crate::app::sagas::retry_until_known_result;
use crate::authn;
use crate::authz;
use crate::cidata::InstanceCiData;
Expand Down Expand Up @@ -1241,22 +1242,27 @@ impl super::Nexus {
})
.map(|(_, ip)| ip)
{
dpd_client
.ensure_nat_entry(
&log,
target_ip.ip,
dpd_client::types::MacAddr { a: mac_address.into_array() },
*target_ip.first_port,
*target_ip.last_port,
vni,
sled_ip_address.ip(),
)
.await
.map_err(|e| {
Error::internal_error(&format!(
"failed to ensure dpd entry: {e}"
))
})?;
retry_until_known_result(log, || async {
dpd_client
.ensure_nat_entry(
&log,
target_ip.ip,
dpd_client::types::MacAddr {
a: mac_address.into_array(),
},
*target_ip.first_port,
*target_ip.last_port,
vni,
sled_ip_address.ip(),
)
.await
})
.await
.map_err(|e| {
Error::internal_error(&format!(
"failed to ensure dpd entry: {e}"
))
})?;
}

Ok(())
Expand Down
84 changes: 14 additions & 70 deletions nexus/src/app/sagas/common_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use super::*;

use crate::app::sagas::retry_until_known_result;
use crate::authz;
use crate::db;
use crate::db::identity::Asset;
Expand Down Expand Up @@ -281,73 +282,6 @@ pub async fn get_pantry_address(

// Common Pantry operations

#[macro_export]
macro_rules! retry_until_known_result {
( $log:ident, $func:block ) => {{
use omicron_common::backoff;

#[derive(Debug, thiserror::Error)]
enum InnerError {
#[error("Reqwest error: {0}")]
Reqwest(#[from] reqwest::Error),

#[error("Pantry client error: {0}")]
PantryClient(
#[from]
crucible_pantry_client::Error<
crucible_pantry_client::types::Error,
>,
),
}

backoff::retry_notify(
backoff::retry_policy_internal_service(),
|| async {
match ($func).await {
Err(crucible_pantry_client::Error::CommunicationError(
e,
)) => {
warn!(
$log,
"saw transient communication error {}, retrying...",
e,
);

Err(backoff::BackoffError::<InnerError>::transient(
e.into(),
))
}

Err(e) => {
warn!($log, "saw permanent error {}, aborting", e,);

Err(backoff::BackoffError::<InnerError>::Permanent(
e.into(),
))
}

Ok(v) => Ok(v),
}
},
|error: InnerError, delay| {
warn!(
$log,
"failed external call ({:?}), will retry in {:?}",
error,
delay,
);
},
)
.await
.map_err(|e| {
ActionError::action_failed(format!(
"gave up on external call due to {:?}",
e
))
})
}};
}

pub async fn call_pantry_attach_for_disk(
log: &slog::Logger,
opctx: &OpContext,
Expand Down Expand Up @@ -392,8 +326,12 @@ pub async fn call_pantry_attach_for_disk(
volume_construction_request,
};

retry_until_known_result!(log, {
client.attach(&disk_id.to_string(), &attach_request)
retry_until_known_result(log, || async {
client.attach(&disk_id.to_string(), &attach_request).await
})
.await
.map_err(|e| {
ActionError::action_failed(format!("pantry attach failed with {:?}", e))
})?;

Ok(())
Expand All @@ -410,7 +348,13 @@ pub async fn call_pantry_detach_for_disk(

let client = crucible_pantry_client::Client::new(&endpoint);

retry_until_known_result!(log, { client.detach(&disk_id.to_string()) })?;
retry_until_known_result(log, || async {
client.detach(&disk_id.to_string()).await
})
.await
.map_err(|e| {
ActionError::action_failed(format!("pantry detach failed with {:?}", e))
})?;

Ok(())
}
19 changes: 2 additions & 17 deletions nexus/src/app/sagas/finalize_disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use super::ActionRegistry;
use super::NexusActionContext;
use super::NexusSaga;
use super::SagaInitError;
use crate::app::sagas::common_storage::call_pantry_detach_for_disk;
use crate::app::sagas::snapshot_create;
use crate::db::lookup::LookupPath;
use crate::external_api::params;
use crate::retry_until_known_result;
use crate::{authn, authz};
use nexus_db_model::Generation;
use omicron_common::api::external;
Expand Down Expand Up @@ -284,24 +284,9 @@ async fn sfd_call_pantry_detach_for_disk(
) -> Result<(), ActionError> {
let log = sagactx.user_data().log();
let params = sagactx.saga_params::<Params>()?;

let pantry_address = sagactx.lookup::<SocketAddrV6>("pantry_address")?;
let endpoint = format!("http://{}", pantry_address);

info!(
log,
"sending detach request for disk {} to pantry endpoint {}",
params.disk_id,
endpoint,
);

let disk_id = params.disk_id.to_string();

let client = crucible_pantry_client::Client::new(&endpoint);

retry_until_known_result!(log, { client.detach(&disk_id) })?;

Ok(())
call_pantry_detach_for_disk(&log, params.disk_id, pantry_address).await
}

async fn sfd_clear_pantry_address(
Expand Down
Loading

0 comments on commit d75bda7

Please sign in to comment.