Skip to content

Commit

Permalink
Turn retry_until_known_result into a function
Browse files Browse the repository at this point in the history
Credit goes to jgallagher, I just copied what he wrote
  • Loading branch information
jmpesp committed Jun 2, 2023
1 parent 0239b13 commit b80679e
Show file tree
Hide file tree
Showing 12 changed files with 200 additions and 152 deletions.
27 changes: 16 additions & 11 deletions nexus/src/app/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::MAX_DISKS_PER_INSTANCE;
use super::MAX_EXTERNAL_IPS_PER_INSTANCE;
use super::MAX_NICS_PER_INSTANCE;
use crate::app::sagas;
use crate::app::sagas::retry_until_known_result;
use crate::authn;
use crate::authz;
use crate::cidata::InstanceCiData;
Expand All @@ -16,7 +17,6 @@ use crate::db::identity::Resource;
use crate::db::lookup;
use crate::db::lookup::LookupPath;
use crate::external_api::params;
use crate::retry_until_known_result;
use futures::future::Fuse;
use futures::{FutureExt, SinkExt, StreamExt};
use nexus_db_model::IpKind;
Expand Down Expand Up @@ -1241,17 +1241,22 @@ impl super::Nexus {
})
.map(|(_, ip)| ip)
{
retry_until_known_result!(log, {
dpd_client.ensure_nat_entry(
&log,
target_ip.ip,
dpd_client::types::MacAddr { a: mac_address.into_array() },
*target_ip.first_port,
*target_ip.last_port,
vni,
sled_ip_address.ip(),
)
retry_until_known_result(log, || async {
dpd_client
.ensure_nat_entry(
&log,
target_ip.ip,
dpd_client::types::MacAddr {
a: mac_address.into_array(),
},
*target_ip.first_port,
*target_ip.last_port,
vni,
sled_ip_address.ip(),
)
.await
})
.await
.map_err(|e| {
Error::internal_error(&format!(
"failed to ensure dpd entry: {e}"
Expand Down
21 changes: 11 additions & 10 deletions nexus/src/app/sagas/common_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
use super::*;

use crate::app::sagas::retry_until_known_result;
use crate::authz;
use crate::db;
use crate::db::identity::Asset;
use crate::db::lookup::LookupPath;
use crate::retry_until_known_result;
use crate::Nexus;
use anyhow::anyhow;
use crucible_agent_client::{
Expand Down Expand Up @@ -326,9 +326,10 @@ pub async fn call_pantry_attach_for_disk(
volume_construction_request,
};

retry_until_known_result!(log, {
client.attach(&disk_id.to_string(), &attach_request)
retry_until_known_result(log, || async {
client.attach(&disk_id.to_string(), &attach_request).await
})
.await
.map_err(|e| {
ActionError::action_failed(format!("pantry attach failed with {:?}", e))
})?;
Expand All @@ -347,13 +348,13 @@ pub async fn call_pantry_detach_for_disk(

let client = crucible_pantry_client::Client::new(&endpoint);

retry_until_known_result!(log, { client.detach(&disk_id.to_string()) })
.map_err(|e| {
ActionError::action_failed(format!(
"pantry detach failed with {:?}",
e
))
})?;
retry_until_known_result(log, || async {
client.detach(&disk_id.to_string()).await
})
.await
.map_err(|e| {
ActionError::action_failed(format!("pantry detach failed with {:?}", e))
})?;

Ok(())
}
40 changes: 21 additions & 19 deletions nexus/src/app/sagas/import_blocks_from_url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use super::ActionRegistry;
use super::NexusActionContext;
use super::NexusSaga;
use super::SagaInitError;
use crate::app::sagas::retry_until_known_result;
use crate::db::lookup::LookupPath;
use crate::retry_until_known_result;
use crate::{authn, authz};
use nexus_db_model::Generation;
use nexus_types::external_api::params;
Expand Down Expand Up @@ -272,9 +272,10 @@ async fn sibfu_call_pantry_import_from_url_for_disk(
},
};

let response = retry_until_known_result!(log, {
client.import_from_url(&disk_id, &request)
let response = retry_until_known_result(log, || async {
client.import_from_url(&disk_id, &request).await
})
.await
.map_err(|e| {
ActionError::action_failed(format!(
"import from url failed with {:?}",
Expand Down Expand Up @@ -308,14 +309,16 @@ async fn sibfu_wait_for_import_from_url(
);

loop {
let result =
retry_until_known_result!(log, { client.is_job_finished(&job_id) })
.map_err(|e| {
ActionError::action_failed(format!(
"is_job_finished failed with {:?}",
e
))
})?;
let result = retry_until_known_result(log, || async {
client.is_job_finished(&job_id).await
})
.await
.map_err(|e| {
ActionError::action_failed(format!(
"is_job_finished failed with {:?}",
e
))
})?;

if result.job_is_finished {
break;
Expand All @@ -332,14 +335,13 @@ async fn sibfu_wait_for_import_from_url(
endpoint,
);

let response =
retry_until_known_result!(log, { client.job_result_ok(&job_id) })
.map_err(|e| {
ActionError::action_failed(format!(
"job_result_ok failed with {:?}",
e
))
})?;
let response = retry_until_known_result(log, || async {
client.job_result_ok(&job_id).await
})
.await
.map_err(|e| {
ActionError::action_failed(format!("job_result_ok failed with {:?}", e))
})?;

if !response.job_result_ok {
return Err(ActionError::action_failed(format!("Job {job_id} failed")));
Expand Down
15 changes: 7 additions & 8 deletions nexus/src/app/sagas/instance_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use super::{NexusActionContext, NexusSaga, SagaInitError, ACTION_GENERATE_ID};
use crate::app::instance::WriteBackUpdatedInstance;
use crate::app::sagas::declare_saga_actions;
use crate::app::sagas::disk_create::{self, SagaDiskCreate};
use crate::app::sagas::retry_until_known_result;
use crate::app::{
MAX_DISKS_PER_INSTANCE, MAX_EXTERNAL_IPS_PER_INSTANCE,
MAX_NICS_PER_INSTANCE,
Expand All @@ -15,7 +16,6 @@ use crate::db::lookup::LookupPath;
use crate::db::model::ByteCount as DbByteCount;
use crate::db::queries::network_interface::InsertError as InsertNicError;
use crate::external_api::params;
use crate::retry_until_known_result;
use crate::{authn, authz, db};
use chrono::Utc;
use nexus_db_model::NetworkInterfaceKind;
Expand Down Expand Up @@ -436,13 +436,12 @@ async fn sic_remove_network_config(

debug!(log, "deleting nat mapping for entry: {target_ip:#?}");

let result = retry_until_known_result!(log, {
dpd_client.ensure_nat_entry_deleted(
log,
target_ip.ip,
*target_ip.first_port,
)
});
let result = retry_until_known_result(log, || async {
dpd_client
.ensure_nat_entry_deleted(log, target_ip.ip, *target_ip.first_port)
.await
})
.await;

match result {
Ok(_) => {
Expand Down
15 changes: 7 additions & 8 deletions nexus/src/app/sagas/instance_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use super::ActionRegistry;
use super::NexusActionContext;
use super::NexusSaga;
use crate::app::sagas::declare_saga_actions;
use crate::app::sagas::retry_until_known_result;
use crate::db;
use crate::db::lookup::LookupPath;
use crate::retry_until_known_result;
use crate::{authn, authz};
use nexus_types::identity::Resource;
use omicron_common::api::external::{Error, ResourceType};
Expand Down Expand Up @@ -167,13 +167,12 @@ async fn sid_delete_network_config(
for entry in external_ips {
debug!(log, "deleting nat mapping for entry: {entry:#?}");

let result = retry_until_known_result!(log, {
dpd_client.ensure_nat_entry_deleted(
log,
entry.ip,
*entry.first_port,
)
});
let result = retry_until_known_result(log, || async {
dpd_client
.ensure_nat_entry_deleted(log, entry.ip, *entry.first_port)
.await
})
.await;

match result {
Ok(_) => {
Expand Down
17 changes: 10 additions & 7 deletions nexus/src/app/sagas/loopback_address_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use super::{NexusActionContext, NEXUS_DPD_TAG};
use crate::app::sagas::retry_until_known_result;
use crate::app::sagas::{
declare_saga_actions, ActionRegistry, NexusSaga, SagaInitError,
};
use crate::authn;
use crate::authz;
use crate::db::model::LoopbackAddress;
use crate::external_api::params;
use crate::retry_until_known_result;
use anyhow::Error;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
Expand Down Expand Up @@ -148,12 +148,15 @@ async fn slc_loopback_address_create(
let dpd_client: Arc<dpd_client::Client> =
Arc::clone(&osagactx.nexus().dpd_client);

retry_until_known_result!(log, {
dpd_client.ensure_loopback_created(
log,
params.loopback_address.address,
NEXUS_DPD_TAG,
)
retry_until_known_result(log, || async {
dpd_client
.ensure_loopback_created(
log,
params.loopback_address.address,
NEXUS_DPD_TAG,
)
.await
})
.await
.map_err(|e| ActionError::action_failed(e.to_string()))
}
7 changes: 4 additions & 3 deletions nexus/src/app/sagas/loopback_address_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use super::NexusActionContext;
use crate::app::sagas::retry_until_known_result;
use crate::app::sagas::{
declare_saga_actions, ActionRegistry, NexusSaga, SagaInitError,
};
use crate::authn;
use crate::authz;
use crate::db::model::{LoopbackAddress, Name};
use crate::external_api::params;
use crate::retry_until_known_result;
use anyhow::{anyhow, Error};
use nexus_types::identity::Asset;
use omicron_common::api::external::{IpNet, NameOrId};
Expand Down Expand Up @@ -177,8 +177,9 @@ async fn slc_loopback_address_delete(
let dpd_client: Arc<dpd_client::Client> =
Arc::clone(&osagactx.nexus().dpd_client);

retry_until_known_result!(log, {
dpd_client.ensure_loopback_deleted(log, params.address.ip())
retry_until_known_result(log, || async {
dpd_client.ensure_loopback_deleted(log, params.address.ip()).await
})
.await
.map_err(|e| ActionError::action_failed(e.to_string()))
}
54 changes: 31 additions & 23 deletions nexus/src/app/sagas/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ pub(crate) use __emit_action;
pub(crate) use __stringify_ident;
pub(crate) use declare_saga_actions;

use futures::Future;

/// Retry a progenitor client operation until a known result is returned.
///
/// Saga execution relies on the outcome of an external call being known: since
Expand All @@ -317,18 +319,26 @@ pub(crate) use declare_saga_actions;
/// is seen.
///
/// Note that retrying is only valid if the call itself is idempotent.
#[macro_export]
macro_rules! retry_until_known_result {
( $log:ident, $func:block ) => {{
use omicron_common::backoff;

backoff::retry_notify(
backoff::retry_policy_internal_service(),
|| async {
match ($func).await {
pub(crate) async fn retry_until_known_result<F, T, E, Fut>(
log: &slog::Logger,
mut f: F,
) -> Result<T, progenitor_client::Error<E>>
where
F: FnMut() -> Fut,
Fut: Future<Output = Result<T, progenitor_client::Error<E>>>,
E: std::fmt::Debug,
{
use omicron_common::backoff;

backoff::retry_notify(
backoff::retry_policy_internal_service(),
move || {
let fut = f();
async move {
match fut.await {
Err(progenitor_client::Error::CommunicationError(e)) => {
warn!(
$log,
log,
"saw transient communication error {}, retrying...",
e,
);
Expand Down Expand Up @@ -362,23 +372,21 @@ macro_rules! retry_until_known_result {
}

Err(e) => {
warn!($log, "saw permanent error {}, aborting", e,);
warn!(log, "saw permanent error {}, aborting", e,);

Err(backoff::BackoffError::Permanent(e))
}

Ok(v) => Ok(v),
}
},
|error: progenitor_client::Error<_>, delay| {
warn!(
$log,
"failed external call ({:?}), will retry in {:?}",
error,
delay,
);
},
)
.await
}};
}
},
|error: progenitor_client::Error<_>, delay| {
warn!(
log,
"failed external call ({:?}), will retry in {:?}", error, delay,
);
},
)
.await
}
Loading

0 comments on commit b80679e

Please sign in to comment.