Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry progenitor client calls in sagas #3225

Merged
merged 7 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

128 changes: 127 additions & 1 deletion dpd-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub struct ClientState {
impl Client {
/// Ensure that a NAT entry exists, overwriting a previous conflicting entry if
/// applicable.
///
/// nat_ipv[46]_create are not idempotent (see oxidecomputer/dendrite#343),
/// but this wrapper function is. Call this from sagas instead.
#[allow(clippy::too_many_arguments)]
pub async fn ensure_nat_entry(
&self,
Expand Down Expand Up @@ -131,13 +134,136 @@ impl Client {

Ok(())
}

/// Ensure that a NAT entry is deleted.
///
/// nat_ipv[46]_delete are not idempotent (see oxidecomputer/dendrite#343),
/// but this wrapper function is. Call this from sagas instead.
pub async fn ensure_nat_entry_deleted(
&self,
log: &Logger,
target_ip: ipnetwork::IpNetwork,
target_first_port: u16,
) -> Result<(), progenitor_client::Error<types::Error>> {
let result = match target_ip {
ipnetwork::IpNetwork::V4(network) => {
self.nat_ipv4_delete(&network.ip(), target_first_port).await
}
ipnetwork::IpNetwork::V6(network) => {
self.nat_ipv6_delete(&network.ip(), target_first_port).await
}
};

match result {
Ok(_) => {
info!(log, "deleted old nat entry"; "target_ip" => ?target_ip);
}

Err(e) => {
if e.status() == Some(http::StatusCode::NOT_FOUND) {
info!(log, "no nat entry found for: {target_ip:#?}");
} else {
return Err(e);
}
}
}

Ok(())
}

/// Ensure that a loopback address is created.
///
/// loopback_ipv[46]_create are not idempotent (see
/// oxidecomputer/dendrite#343), but this wrapper function is. Call this
/// from sagas instead.
pub async fn ensure_loopback_created(
&self,
log: &Logger,
address: IpAddr,
tag: &str,
) -> Result<(), progenitor_client::Error<types::Error>> {
let result = match &address {
IpAddr::V4(a) => {
self.loopback_ipv4_create(&types::Ipv4Entry {
addr: *a,
tag: tag.into(),
})
.await
}
IpAddr::V6(a) => {
self.loopback_ipv6_create(&types::Ipv6Entry {
addr: *a,
tag: tag.into(),
})
.await
}
};

match result {
Ok(_) => {
info!(log, "created loopback address"; "address" => ?address);
Ok(())
}

Err(progenitor_client::Error::ErrorResponse(er)) => {
match er.status() {
http::StatusCode::CONFLICT => {
info!(log, "loopback address already created"; "address" => ?address);

Ok(())
}

_ => Err(progenitor_client::Error::ErrorResponse(er)),
}
}

Err(e) => Err(e),
}
}

/// Ensure that a loopback address is deleted.
///
/// loopback_ipv[46]_delete are not idempotent (see
/// oxidecomputer/dendrite#343), but this wrapper function is. Call this
/// from sagas instead.
pub async fn ensure_loopback_deleted(
&self,
log: &Logger,
address: IpAddr,
) -> Result<(), progenitor_client::Error<types::Error>> {
let result = match &address {
IpAddr::V4(a) => self.loopback_ipv4_delete(&a).await,
IpAddr::V6(a) => self.loopback_ipv6_delete(&a).await,
};

match result {
Ok(_) => {
info!(log, "deleted loopback address"; "address" => ?address);
Ok(())
}

Err(progenitor_client::Error::ErrorResponse(er)) => {
match er.status() {
http::StatusCode::NOT_FOUND => {
info!(log, "loopback address already deleted"; "address" => ?address);
Ok(())
}

_ => Err(progenitor_client::Error::ErrorResponse(er)),
}
}

Err(e) => Err(e),
}
}
}

// XXX delete everything below once we use the real dpd-client crate.
// https://github.com/oxidecomputer/omicron/issues/2775

use std::convert::TryFrom;
use std::fmt;
use std::net::{Ipv4Addr, Ipv6Addr};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::str::FromStr;

use schemars::JsonSchema;
Expand Down
1 change: 1 addition & 0 deletions nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ parse-display.workspace = true
paste.workspace = true
# See omicron-rpaths for more about the "pq-sys" dependency.
pq-sys = "*"
progenitor-client.workspace = true
propolis-client.workspace = true
rand.workspace = true
ref-cast.workspace = true
Expand Down
38 changes: 22 additions & 16 deletions nexus/src/app/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::MAX_DISKS_PER_INSTANCE;
use super::MAX_EXTERNAL_IPS_PER_INSTANCE;
use super::MAX_NICS_PER_INSTANCE;
use crate::app::sagas;
use crate::app::sagas::retry_until_known_result;
use crate::authn;
use crate::authz;
use crate::cidata::InstanceCiData;
Expand Down Expand Up @@ -1240,22 +1241,27 @@ impl super::Nexus {
})
.map(|(_, ip)| ip)
{
dpd_client
.ensure_nat_entry(
&log,
target_ip.ip,
dpd_client::types::MacAddr { a: mac_address.into_array() },
*target_ip.first_port,
*target_ip.last_port,
vni,
sled_ip_address.ip(),
)
.await
.map_err(|e| {
Error::internal_error(&format!(
"failed to ensure dpd entry: {e}"
))
})?;
retry_until_known_result(log, || async {
dpd_client
.ensure_nat_entry(
&log,
target_ip.ip,
dpd_client::types::MacAddr {
a: mac_address.into_array(),
},
*target_ip.first_port,
*target_ip.last_port,
vni,
sled_ip_address.ip(),
)
.await
})
.await
.map_err(|e| {
Error::internal_error(&format!(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is no worse than it was before, but it seems like a problem that we don't distinguish between the many kinds of errors that can happen here. So, not a blocker, but should we file an issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, opened #3329

"failed to ensure dpd entry: {e}"
))
})?;
}

Ok(())
Expand Down
84 changes: 14 additions & 70 deletions nexus/src/app/sagas/common_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use super::*;

use crate::app::sagas::retry_until_known_result;
use crate::authz;
use crate::db;
use crate::db::identity::Asset;
Expand Down Expand Up @@ -281,73 +282,6 @@ pub async fn get_pantry_address(

// Common Pantry operations

#[macro_export]
macro_rules! retry_until_known_result {
( $log:ident, $func:block ) => {{
use omicron_common::backoff;

#[derive(Debug, thiserror::Error)]
enum InnerError {
#[error("Reqwest error: {0}")]
Reqwest(#[from] reqwest::Error),

#[error("Pantry client error: {0}")]
PantryClient(
#[from]
crucible_pantry_client::Error<
crucible_pantry_client::types::Error,
>,
),
}

backoff::retry_notify(
backoff::retry_policy_internal_service(),
|| async {
match ($func).await {
Err(crucible_pantry_client::Error::CommunicationError(
e,
)) => {
warn!(
$log,
"saw transient communication error {}, retrying...",
e,
);

Err(backoff::BackoffError::<InnerError>::transient(
e.into(),
))
}

Err(e) => {
warn!($log, "saw permanent error {}, aborting", e,);

Err(backoff::BackoffError::<InnerError>::Permanent(
e.into(),
))
}

Ok(v) => Ok(v),
}
},
|error: InnerError, delay| {
warn!(
$log,
"failed external call ({:?}), will retry in {:?}",
error,
delay,
);
},
)
.await
.map_err(|e| {
ActionError::action_failed(format!(
"gave up on external call due to {:?}",
e
))
})
}};
}

pub async fn call_pantry_attach_for_disk(
log: &slog::Logger,
opctx: &OpContext,
Expand Down Expand Up @@ -392,8 +326,12 @@ pub async fn call_pantry_attach_for_disk(
volume_construction_request,
};

retry_until_known_result!(log, {
client.attach(&disk_id.to_string(), &attach_request)
retry_until_known_result(log, || async {
client.attach(&disk_id.to_string(), &attach_request).await
})
.await
.map_err(|e| {
ActionError::action_failed(format!("pantry attach failed with {:?}", e))
})?;

Ok(())
Expand All @@ -410,7 +348,13 @@ pub async fn call_pantry_detach_for_disk(

let client = crucible_pantry_client::Client::new(&endpoint);

retry_until_known_result!(log, { client.detach(&disk_id.to_string()) })?;
retry_until_known_result(log, || async {
client.detach(&disk_id.to_string()).await
})
.await
.map_err(|e| {
ActionError::action_failed(format!("pantry detach failed with {:?}", e))
})?;

Ok(())
}
19 changes: 2 additions & 17 deletions nexus/src/app/sagas/finalize_disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use super::ActionRegistry;
use super::NexusActionContext;
use super::NexusSaga;
use super::SagaInitError;
use crate::app::sagas::common_storage::call_pantry_detach_for_disk;
use crate::app::sagas::snapshot_create;
use crate::db::lookup::LookupPath;
use crate::external_api::params;
use crate::retry_until_known_result;
use crate::{authn, authz};
use nexus_db_model::Generation;
use omicron_common::api::external;
Expand Down Expand Up @@ -284,24 +284,9 @@ async fn sfd_call_pantry_detach_for_disk(
) -> Result<(), ActionError> {
let log = sagactx.user_data().log();
let params = sagactx.saga_params::<Params>()?;

let pantry_address = sagactx.lookup::<SocketAddrV6>("pantry_address")?;
let endpoint = format!("http://{}", pantry_address);

info!(
log,
"sending detach request for disk {} to pantry endpoint {}",
params.disk_id,
endpoint,
);

let disk_id = params.disk_id.to_string();

let client = crucible_pantry_client::Client::new(&endpoint);

retry_until_known_result!(log, { client.detach(&disk_id) })?;

Ok(())
call_pantry_detach_for_disk(&log, params.disk_id, pantry_address).await
}

async fn sfd_clear_pantry_address(
Expand Down
Loading