Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(webhooks): allow manually retrying delivery of outgoing webhooks #4176

Merged
merged 6 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/api_models/src/user_role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub enum Permission {
WebhookEventRead,
PayoutWrite,
PayoutRead,
WebhookEventWrite,
}

#[derive(Debug, serde::Serialize)]
Expand Down
36 changes: 31 additions & 5 deletions crates/api_models/src/webhook_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ pub struct EventRetrieveResponse {
pub delivery_attempt: Option<WebhookDeliveryAttempt>,
}

impl common_utils::events::ApiEventMetric for EventRetrieveResponse {
fn get_api_event_type(&self) -> Option<common_utils::events::ApiEventsType> {
Some(common_utils::events::ApiEventsType::Events {
merchant_id_or_profile_id: self.event_information.merchant_id.clone(),
})
}
}

/// The request information (headers and body) sent in the webhook.
#[derive(Debug, Serialize, Deserialize, ToSchema)]
pub struct OutgoingWebhookRequestContent {
Expand All @@ -114,20 +122,24 @@ pub struct OutgoingWebhookRequestContent {
#[derive(Debug, serde::Serialize, serde::Deserialize, ToSchema)]
pub struct OutgoingWebhookResponseContent {
/// The response body received for the webhook sent.
#[schema(value_type = String)]
#[schema(value_type = Option<String>)]
#[serde(alias = "payload")]
pub body: Secret<String>,
pub body: Option<Secret<String>>,

/// The response headers received for the webhook sent.
#[schema(
value_type = Vec<(String, String)>,
value_type = Option<Vec<(String, String)>>,
example = json!([["content-type", "application/json"], ["content-length", "1024"]]))
]
pub headers: Vec<(String, Secret<String>)>,
pub headers: Option<Vec<(String, Secret<String>)>>,

/// The HTTP status code for the webhook sent.
#[schema(example = 200)]
pub status_code: u16,
pub status_code: Option<u16>,

/// Error message in case any error occurred when trying to deliver the webhook.
#[schema(example = 200)]
pub error_message: Option<String>,
}

#[derive(Debug, serde::Serialize)]
Expand Down Expand Up @@ -157,3 +169,17 @@ impl common_utils::events::ApiEventMetric for WebhookDeliveryAttemptListRequestI
})
}
}

#[derive(Debug, serde::Serialize)]
pub struct WebhookDeliveryRetryRequestInternal {
pub merchant_id_or_profile_id: String,
pub event_id: String,
}

impl common_utils::events::ApiEventMetric for WebhookDeliveryRetryRequestInternal {
fn get_api_event_type(&self) -> Option<common_utils::events::ApiEventsType> {
Some(common_utils::events::ApiEventsType::Events {
merchant_id_or_profile_id: self.merchant_id_or_profile_id.clone(),
})
}
}
1 change: 1 addition & 0 deletions crates/openapi/src/openapi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ Never share your secret api keys. Keep them guarded and secure.
// Routes for events
routes::webhook_events::list_initial_webhook_delivery_attempts,
routes::webhook_events::list_webhook_delivery_attempts,
routes::webhook_events::retry_webhook_delivery_attempt,
),
components(schemas(
api_models::refunds::RefundRequest,
Expand Down
24 changes: 24 additions & 0 deletions crates/openapi/src/routes/webhook_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,27 @@ pub fn list_initial_webhook_delivery_attempts() {}
security(("admin_api_key" = []))
)]
pub fn list_webhook_delivery_attempts() {}

/// Events - Manual Retry
///
/// Manually retry the delivery of the specified Event.
#[utoipa::path(
post,
path = "/events/{merchant_id_or_profile_id}/{event_id}/retry",
params(
("merchant_id_or_profile_id" = String, Path, description = "The unique identifier for the Merchant Account or Business Profile"),
("event_id" = String, Path, description = "The unique identifier for the Event"),
),
responses(
(
status = 200,
description = "The delivery of the Event was attempted. \
Check the `response` field in the response payload to identify the status of the delivery attempt.",
body = EventRetrieveResponse
),
),
tag = "Event",
operation_id = "Manually retry the delivery of an Event",
security(("admin_api_key" = []))
)]
pub fn retry_webhook_delivery_attempt() {}
149 changes: 132 additions & 17 deletions crates/router/src/core/webhooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,16 +887,78 @@ async fn trigger_webhook_to_merchant(
);
logger::debug!(outgoing_webhook_response=?response);

let update_event_if_client_error =
|state: AppState,
merchant_key_store: domain::MerchantKeyStore,
merchant_id: String,
event_id: String,
error_message: String| async move {
let is_webhook_notified = false;

let response_to_store = OutgoingWebhookResponseContent {
body: None,
headers: None,
status_code: None,
error_message: Some(error_message),
};

let event_update = domain::EventUpdate::UpdateResponse {
is_webhook_notified,
response: Some(
domain_types::encrypt(
response_to_store
.encode_to_string_of_json()
.change_context(
errors::WebhooksFlowError::OutgoingWebhookResponseEncodingFailed,
)
.map(Secret::new)?,
merchant_key_store.key.get_inner().peek(),
)
.await
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)
.attach_printable("Failed to encrypt outgoing webhook response content")?,
),
};

state
.store
.update_event_by_merchant_id_event_id(
&merchant_id,
&event_id,
event_update,
&merchant_key_store,
)
.await
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)
};

let api_client_error_handler =
|client_error: error_stack::Report<errors::ApiClientError>,
delivery_attempt: enums::WebhookDeliveryAttempt| {
|state: AppState,
merchant_key_store: domain::MerchantKeyStore,
merchant_id: String,
event_id: String,
client_error: error_stack::Report<errors::ApiClientError>,
delivery_attempt: enums::WebhookDeliveryAttempt| async move {
// Not including detailed error message in response information since it contains too
// much of diagnostic information to be exposed to the merchant.
update_event_if_client_error(
state,
merchant_key_store,
merchant_id,
event_id,
"Unable to send request to merchant server".to_string(),
)
.await?;

let error =
client_error.change_context(errors::WebhooksFlowError::CallToMerchantFailed);
logger::error!(
?error,
?delivery_attempt,
"An error occurred when sending webhook to merchant"
);

Ok::<_, error_stack::Report<errors::WebhooksFlowError>>(())
};
let update_event_in_storage = |state: AppState,
merchant_key_store: domain::MerchantKeyStore,
Expand Down Expand Up @@ -934,9 +996,10 @@ async fn trigger_webhook_to_merchant(
Secret::from(String::from("Non-UTF-8 response body"))
});
let response_to_store = OutgoingWebhookResponseContent {
body: response_body,
headers: response_headers,
status_code: status_code.as_u16(),
body: Some(response_body),
headers: Some(response_headers),
status_code: Some(status_code.as_u16()),
error_message: None,
};

let event_update = domain::EventUpdate::UpdateResponse {
Expand All @@ -953,7 +1016,7 @@ async fn trigger_webhook_to_merchant(
)
.await
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)
.attach_printable("Failed to encrypt outgoing webhook request content")?,
.attach_printable("Failed to encrypt outgoing webhook response content")?,
),
};
state
Expand All @@ -967,16 +1030,19 @@ async fn trigger_webhook_to_merchant(
.await
.change_context(errors::WebhooksFlowError::WebhookEventUpdationFailed)
};
let increment_webhook_outgoing_received_count = |merchant_id: String| {
metrics::WEBHOOK_OUTGOING_RECEIVED_COUNT.add(
&metrics::CONTEXT,
1,
&[metrics::KeyValue::new(MERCHANT_ID, merchant_id)],
)
};
let success_response_handler =
|state: AppState,
merchant_id: String,
process_tracker: Option<storage::ProcessTracker>,
business_status: &'static str| async move {
metrics::WEBHOOK_OUTGOING_RECEIVED_COUNT.add(
&metrics::CONTEXT,
1,
&[metrics::KeyValue::new(MERCHANT_ID, merchant_id)],
);
increment_webhook_outgoing_received_count(merchant_id);

match process_tracker {
Some(process_tracker) => state
Expand Down Expand Up @@ -1006,7 +1072,17 @@ async fn trigger_webhook_to_merchant(

match delivery_attempt {
enums::WebhookDeliveryAttempt::InitialAttempt => match response {
Err(client_error) => api_client_error_handler(client_error, delivery_attempt),
Err(client_error) => {
api_client_error_handler(
state.clone(),
merchant_key_store.clone(),
business_profile.merchant_id.clone(),
event_id.clone(),
client_error,
delivery_attempt,
)
.await?
}
Ok(response) => {
let status_code = response.status();
let _updated_event = update_event_in_storage(
Expand Down Expand Up @@ -1043,7 +1119,15 @@ async fn trigger_webhook_to_merchant(
.attach_printable("`process_tracker` is unavailable in automatic retry flow")?;
match response {
Err(client_error) => {
api_client_error_handler(client_error, delivery_attempt);
api_client_error_handler(
state.clone(),
merchant_key_store.clone(),
business_profile.merchant_id.clone(),
event_id.clone(),
client_error,
delivery_attempt,
)
.await?;
// Schedule a retry attempt for webhook delivery
outgoing_webhook_retry::retry_webhook_delivery_task(
&*state.store,
Expand Down Expand Up @@ -1095,10 +1179,41 @@ async fn trigger_webhook_to_merchant(
}
}
}
enums::WebhookDeliveryAttempt::ManualRetry => {
// Will be updated when manual retry is implemented
Err(errors::WebhooksFlowError::NotReceivedByMerchant)?
}
enums::WebhookDeliveryAttempt::ManualRetry => match response {
Err(client_error) => {
api_client_error_handler(
state.clone(),
merchant_key_store.clone(),
business_profile.merchant_id.clone(),
event_id.clone(),
client_error,
delivery_attempt,
)
.await?
}
Ok(response) => {
let status_code = response.status();
let _updated_event = update_event_in_storage(
state.clone(),
merchant_key_store.clone(),
business_profile.merchant_id.clone(),
event_id.clone(),
response,
)
.await?;

if status_code.is_success() {
increment_webhook_outgoing_received_count(business_profile.merchant_id.clone());
} else {
error_response_handler(
business_profile.merchant_id,
delivery_attempt,
status_code.as_u16(),
"Ignoring error when sending webhook to merchant",
);
}
}
},
}

Ok(())
Expand Down
Loading
Loading