Skip to content

Commit

Permalink
feat(payment_methods): Implement Process tracker workflow for Payment…
Browse files Browse the repository at this point in the history
… method Status update (#4668)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
  • Loading branch information
Sarthak1799 and hyperswitch-bot[bot] authored Jun 21, 2024
1 parent ca61e47 commit 5cde7ee
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 4 deletions.
1 change: 1 addition & 0 deletions crates/diesel_models/src/process_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ pub enum ProcessTrackerRunner {
ApiKeyExpiryWorkflow,
OutgoingWebhookRetryWorkflow,
AttachPayoutAccountWorkflow,
PaymentMethodStatusUpdateWorkflow,
}

#[cfg(test)]
Expand Down
3 changes: 3 additions & 0 deletions crates/router/src/bin/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ impl ProcessTrackerWorkflows<routes::SessionState> for WorkflowRunner {
)
}
}
storage::ProcessTrackerRunner::PaymentMethodStatusUpdateWorkflow => Ok(Box::new(
workflows::payment_method_status_update::PaymentMethodStatusUpdateWorkflow,
)),
}
};

Expand Down
72 changes: 71 additions & 1 deletion crates/router/src/core/payment_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,28 @@ use api_models::payments::CardToken;
#[cfg(feature = "payouts")]
pub use api_models::{enums::PayoutConnectors, payouts as payout_types};
use diesel_models::enums;
use error_stack::ResultExt;
use hyperswitch_domain_models::payments::{payment_attempt::PaymentAttempt, PaymentIntent};
use router_env::{instrument, tracing};

use crate::{
core::{errors::RouterResult, payments::helpers, pm_auth as core_pm_auth},
consts,
core::{
errors::{self, RouterResult},
payments::helpers,
pm_auth as core_pm_auth,
},
db,
routes::SessionState,
types::{
api::{self, payments},
domain, storage,
},
};

const PAYMENT_METHOD_STATUS_UPDATE_TASK: &str = "PAYMENT_METHOD_STATUS_UPDATE";
const PAYMENT_METHOD_STATUS_TAG: &str = "PAYMENT_METHOD_STATUS";

#[instrument(skip_all)]
pub async fn retrieve_payment_method(
pm_data: &Option<payments::PaymentMethodData>,
Expand Down Expand Up @@ -94,6 +104,66 @@ pub async fn retrieve_payment_method(
}
}

fn generate_task_id_for_payment_method_status_update_workflow(
key_id: &str,
runner: &storage::ProcessTrackerRunner,
task: &str,
) -> String {
format!("{runner}_{task}_{key_id}")
}

pub async fn add_payment_method_status_update_task(
db: &dyn db::StorageInterface,
payment_method: &diesel_models::PaymentMethod,
prev_status: enums::PaymentMethodStatus,
curr_status: enums::PaymentMethodStatus,
merchant_id: &str,
) -> Result<(), errors::ProcessTrackerError> {
let created_at = payment_method.created_at;
let schedule_time =
created_at.saturating_add(time::Duration::seconds(consts::DEFAULT_SESSION_EXPIRY));

let tracking_data = storage::PaymentMethodStatusTrackingData {
payment_method_id: payment_method.payment_method_id.clone(),
prev_status,
curr_status,
merchant_id: merchant_id.to_string(),
};

let runner = storage::ProcessTrackerRunner::PaymentMethodStatusUpdateWorkflow;
let task = PAYMENT_METHOD_STATUS_UPDATE_TASK;
let tag = [PAYMENT_METHOD_STATUS_TAG];

let process_tracker_id = generate_task_id_for_payment_method_status_update_workflow(
payment_method.payment_method_id.as_str(),
&runner,
task,
);
let process_tracker_entry = storage::ProcessTrackerNew::new(
process_tracker_id,
task,
runner,
tag,
tracking_data,
schedule_time,
)
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable("Failed to construct PAYMENT_METHOD_STATUS_UPDATE process tracker task")?;

db
.insert_process(process_tracker_entry)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable_lazy(|| {
format!(
"Failed while inserting PAYMENT_METHOD_STATUS_UPDATE reminder to process_tracker for payment_method_id: {}",
payment_method.payment_method_id.clone()
)
})?;

Ok(())
}

#[instrument(skip_all)]
pub async fn retrieve_payment_method_with_token(
state: &SessionState,
Expand Down
21 changes: 19 additions & 2 deletions crates/router/src/core/payment_methods/cards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ use crate::{
configs::settings,
core::{
errors::{self, StorageErrorExt},
payment_methods::{transformers as payment_methods, vault},
payment_methods::{
add_payment_method_status_update_task, transformers as payment_methods, vault,
},
payments::{
helpers,
routing::{self, SessionFlowRoutingInput},
Expand Down Expand Up @@ -297,6 +299,21 @@ pub async fn get_client_secret_or_add_payment_method(
)
.await?;

if res.status == enums::PaymentMethodStatus::AwaitingData {
add_payment_method_status_update_task(
db,
&res,
enums::PaymentMethodStatus::AwaitingData,
enums::PaymentMethodStatus::Inactive,
merchant_id,
)
.await
.change_context(errors::ApiErrorResponse::InternalServerError)
.attach_printable(
"Failed to add payment method status update task in process tracker",
)?;
}

Ok(services::api::ApplicationResponse::Json(
api::PaymentMethodResponse::foreign_from(res),
))
Expand Down Expand Up @@ -357,7 +374,7 @@ pub async fn add_payment_method_data(
.attach_printable("Unable to find payment method")?;

if payment_method.status != enums::PaymentMethodStatus::AwaitingData {
return Err((errors::ApiErrorResponse::DuplicatePaymentMethod).into());
return Err((errors::ApiErrorResponse::ClientSecretExpired).into());
}

let customer_id = payment_method.customer_id.clone();
Expand Down
8 changes: 8 additions & 0 deletions crates/router/src/types/storage/payment_method.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,11 @@ impl DerefMut for PaymentsMandateReference {
&mut self.0
}
}

#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)]
pub struct PaymentMethodStatusTrackingData {
pub payment_method_id: String,
pub prev_status: enums::PaymentMethodStatus,
pub curr_status: enums::PaymentMethodStatus,
pub merchant_id: String,
}
1 change: 1 addition & 0 deletions crates/router/src/workflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod api_key_expiry;
#[cfg(feature = "payouts")]
pub mod attach_payout_account_workflow;
pub mod outgoing_webhook_retry;
pub mod payment_method_status_update;
pub mod payment_sync;
pub mod refund_router;
pub mod tokenized_data;
107 changes: 107 additions & 0 deletions crates/router/src/workflows/payment_method_status_update.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use common_utils::ext_traits::ValueExt;
use scheduler::{
consumer::types::process_data, utils as pt_utils, workflows::ProcessTrackerWorkflow,
};

use crate::{
errors,
logger::error,
routes::SessionState,
types::storage::{self, PaymentMethodStatusTrackingData},
};

pub struct PaymentMethodStatusUpdateWorkflow;

#[async_trait::async_trait]
impl ProcessTrackerWorkflow<SessionState> for PaymentMethodStatusUpdateWorkflow {
async fn execute_workflow<'a>(
&'a self,
state: &'a SessionState,
process: storage::ProcessTracker,
) -> Result<(), errors::ProcessTrackerError> {
let db = &*state.store;
let tracking_data: PaymentMethodStatusTrackingData = process
.tracking_data
.clone()
.parse_value("PaymentMethodStatusTrackingData")?;

let retry_count = process.retry_count;
let pm_id = tracking_data.payment_method_id;
let prev_pm_status = tracking_data.prev_status;
let curr_pm_status = tracking_data.curr_status;
let merchant_id = tracking_data.merchant_id;

let key_store = state
.store
.get_merchant_key_store_by_merchant_id(
merchant_id.as_str(),
&state.store.get_master_key().to_vec().into(),
)
.await?;

let merchant_account = db
.find_merchant_account_by_merchant_id(&merchant_id, &key_store)
.await?;

let payment_method = db
.find_payment_method(&pm_id, merchant_account.storage_scheme)
.await?;

if payment_method.status != prev_pm_status {
return db
.as_scheduler()
.finish_process_with_business_status(process, "PROCESS_ALREADY_COMPLETED")
.await
.map_err(Into::<errors::ProcessTrackerError>::into);
}

let pm_update = storage::PaymentMethodUpdate::StatusUpdate {
status: Some(curr_pm_status),
};

let res = db
.update_payment_method(payment_method, pm_update, merchant_account.storage_scheme)
.await
.map_err(errors::ProcessTrackerError::EStorageError);

if let Ok(_pm) = res {
db.as_scheduler()
.finish_process_with_business_status(process, "COMPLETED_BY_PT")
.await?;
} else {
let mapping = process_data::PaymentMethodsPTMapping::default();
let time_delta = if retry_count == 0 {
Some(mapping.default_mapping.start_after)
} else {
pt_utils::get_delay(retry_count + 1, &mapping.default_mapping.frequencies)
};

let schedule_time = pt_utils::get_time_from_delta(time_delta);

match schedule_time {
Some(s_time) => db
.as_scheduler()
.retry_process(process, s_time)
.await
.map_err(Into::<errors::ProcessTrackerError>::into)?,
None => db
.as_scheduler()
.finish_process_with_business_status(process, "RETRIES_EXCEEDED")
.await
.map_err(Into::<errors::ProcessTrackerError>::into)?,
};
};

Ok(())
}

async fn error_handler<'a>(
&'a self,
_state: &'a SessionState,
process: storage::ProcessTracker,
_error: errors::ProcessTrackerError,
) -> errors::CustomResult<(), errors::ProcessTrackerError> {
error!(%process.id, "Failed while executing workflow");
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/scheduler/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ pub fn get_outgoing_webhook_retry_schedule_time(
}

/// Get the delay based on the retry count
fn get_delay<'a>(
pub fn get_delay<'a>(
retry_count: i32,
frequencies: impl IntoIterator<Item = &'a (i32, i32)>,
) -> Option<i32> {
Expand Down

0 comments on commit 5cde7ee

Please sign in to comment.