diff --git a/Cargo.lock b/Cargo.lock index a6d1c8fd71..8f0f966db1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1110,6 +1110,7 @@ version = "1.4.1" dependencies = [ "arc-swap", "async-trait", + "backon", "base64 0.22.1", "bip37-bloom-filter", "chrono", diff --git a/packages/rs-dapi-client/Cargo.toml b/packages/rs-dapi-client/Cargo.toml index f3682130fc..e8ffd41a9c 100644 --- a/packages/rs-dapi-client/Cargo.toml +++ b/packages/rs-dapi-client/Cargo.toml @@ -5,7 +5,6 @@ edition = "2021" [features] default = ["mocks", "offline-testing"] -tokio-sleep = ["backon/tokio-sleep"] mocks = [ "dep:sha2", "dep:hex", @@ -20,7 +19,7 @@ dump = ["mocks"] offline-testing = [] [dependencies] -backon = { version = "1.2"} +backon = { version = "1.2", features = ["tokio-sleep"] } dapi-grpc = { path = "../dapi-grpc", features = [ "core", "platform", diff --git a/packages/rs-dapi-client/src/executor.rs b/packages/rs-dapi-client/src/executor.rs index 50913b23c4..17f7aa7d12 100644 --- a/packages/rs-dapi-client/src/executor.rs +++ b/packages/rs-dapi-client/src/executor.rs @@ -19,6 +19,21 @@ pub trait DapiRequestExecutor { ::Error: Mockable; } +/// Unwrap wrapped types +pub trait IntoInner { + /// Unwrap the inner type. + /// + /// This function returns inner type, dropping additional context information. + /// It is lossy operation, so it should be used with caution. + fn into_inner(self) -> T; +} + +/// Convert inner type without loosing additional context information of the wrapper. +pub trait InnerInto { + /// Convert inner type without loosing additional context information of the wrapper. + fn inner_into(self) -> T; +} + /// Error happened during request execution. #[derive(Debug, Clone, thiserror::Error, Eq, PartialEq)] #[error("{inner}")] @@ -31,10 +46,27 @@ pub struct ExecutionError { pub address: Option
, } -impl ExecutionError { +impl InnerInto> for ExecutionError +where + F: Into, +{ + /// Convert inner error type without loosing retries and address + fn inner_into(self) -> ExecutionError { + ExecutionError { + inner: self.inner.into(), + retries: self.retries, + address: self.address, + } + } +} + +impl IntoInner for ExecutionError +where + E: Into, +{ /// Unwrap the error cause - pub fn into_inner(self) -> E { - self.inner + fn into_inner(self) -> I { + self.inner.into() } } @@ -55,12 +87,62 @@ pub struct ExecutionResponse { pub address: Address, } -impl ExecutionResponse { +#[cfg(feature = "mocks")] +impl Default for ExecutionResponse { + fn default() -> Self { + Self { + retries: Default::default(), + address: "http://127.0.0.1".parse().expect("create mock address"), + inner: Default::default(), + } + } +} + +impl IntoInner for ExecutionResponse +where + R: Into, +{ /// Unwrap the response - pub fn into_inner(self) -> R { - self.inner + fn into_inner(self) -> I { + self.inner.into() + } +} + +impl InnerInto> for ExecutionResponse +where + F: Into, +{ + /// Convert inner response type without loosing retries and address + fn inner_into(self) -> ExecutionResponse { + ExecutionResponse { + inner: self.inner.into(), + retries: self.retries, + address: self.address, + } } } /// Result of request execution pub type ExecutionResult = Result, ExecutionError>; + +impl IntoInner> for ExecutionResult { + fn into_inner(self) -> Result { + match self { + Ok(response) => Ok(response.into_inner()), + Err(error) => Err(error.into_inner()), + } + } +} + +impl InnerInto> for ExecutionResult +where + F: Into, + FE: Into, +{ + fn inner_into(self) -> ExecutionResult { + match self { + Ok(response) => Ok(response.inner_into()), + Err(error) => Err(error.inner_into()), + } + } +} diff --git a/packages/rs-dapi-client/src/lib.rs b/packages/rs-dapi-client/src/lib.rs index 208c2380ac..9d04d2251a 100644 --- a/packages/rs-dapi-client/src/lib.rs +++ b/packages/rs-dapi-client/src/lib.rs @@ -21,7 +21,9 @@ pub use dapi_client::{DapiClient, DapiClientError}; use dapi_grpc::mock::Mockable; #[cfg(feature = "dump")] pub use dump::DumpData; -pub use executor::{DapiRequestExecutor, ExecutionError, ExecutionResponse, ExecutionResult}; +pub use executor::{ + DapiRequestExecutor, ExecutionError, ExecutionResponse, ExecutionResult, InnerInto, IntoInner, +}; use futures::{future::BoxFuture, FutureExt}; pub use request_settings::RequestSettings; diff --git a/packages/rs-dapi-client/src/mock.rs b/packages/rs-dapi-client/src/mock.rs index 98df667892..968468d8b6 100644 --- a/packages/rs-dapi-client/src/mock.rs +++ b/packages/rs-dapi-client/src/mock.rs @@ -290,3 +290,28 @@ impl Mockable for ExecutionError { }) } } + +/// Create full wrapping object from inner type, using defaults for +/// fields that cannot be derived from the inner type. +pub trait FromInner +where + Self: Default, +{ + /// Create full wrapping object from inner type, using defaults for + /// fields that cannot be derived from the inner type. + /// + /// Note this is imprecise conversion and should be avoided outside of tests. + fn from_inner(inner: R) -> Self; +} + +impl FromInner for ExecutionResponse +where + Self: Default, +{ + fn from_inner(inner: R) -> Self { + Self { + inner, + ..Default::default() + } + } +} diff --git a/packages/rs-dapi-client/src/request_settings.rs b/packages/rs-dapi-client/src/request_settings.rs index 7c900a7829..21a1f69b38 100644 --- a/packages/rs-dapi-client/src/request_settings.rs +++ b/packages/rs-dapi-client/src/request_settings.rs @@ -21,7 +21,8 @@ pub struct RequestSettings { pub connect_timeout: Option, /// Timeout for a request. pub timeout: Option, - /// Number of retries until returning the last error. + /// Number of retries in case of failed requests. If max retries reached, the last error is returned. + /// 1 means one request and one retry in case of error, etc. pub retries: Option, /// Ban DAPI address if node not responded or responded with error. pub ban_failed_address: Option, diff --git a/packages/rs-sdk/Cargo.toml b/packages/rs-sdk/Cargo.toml index 98f5dd4c02..cbb5e7fe93 100644 --- a/packages/rs-sdk/Cargo.toml +++ b/packages/rs-sdk/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] arc-swap = { version = "1.7.1" } +backon = { version = "1.2", features = ["tokio-sleep"] } chrono = { version = "0.4.38" } dpp = { path = "../rs-dpp", default-features = false, features = [ "dash-sdk-features", @@ -57,7 +58,6 @@ test-case = { version = "3.3.1" } [features] default = ["mocks", "offline-testing"] -tokio-sleep = ["rs-dapi-client/tokio-sleep"] mocks = [ "dep:serde", diff --git a/packages/rs-sdk/src/core/transaction.rs b/packages/rs-sdk/src/core/transaction.rs index acbbf4164b..a71a6f664c 100644 --- a/packages/rs-sdk/src/core/transaction.rs +++ b/packages/rs-sdk/src/core/transaction.rs @@ -12,7 +12,7 @@ use dpp::identity::state_transition::asset_lock_proof::chain::ChainAssetLockProo use dpp::identity::state_transition::asset_lock_proof::InstantAssetLockProof; use dpp::prelude::AssetLockProof; -use rs_dapi_client::{DapiRequestExecutor, RequestSettings}; +use rs_dapi_client::{DapiRequestExecutor, IntoInner, RequestSettings}; use std::time::Duration; use tokio::time::{sleep, timeout}; @@ -56,9 +56,7 @@ impl Sdk { }; self.execute(core_transactions_stream, RequestSettings::default()) .await - // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner()) + .into_inner() .map_err(|e| Error::DapiClientError(e.to_string())) } @@ -184,8 +182,7 @@ impl Sdk { RequestSettings::default(), ) .await // TODO: We need better way to handle execution errors - .map_err(|error| error.into_inner())? - .into_inner(); + .into_inner()?; core_chain_locked_height = height; diff --git a/packages/rs-sdk/src/error.rs b/packages/rs-sdk/src/error.rs index e55bda4742..223c7dc00e 100644 --- a/packages/rs-sdk/src/error.rs +++ b/packages/rs-sdk/src/error.rs @@ -5,11 +5,12 @@ use std::time::Duration; use dapi_grpc::mock::Mockable; use dpp::version::PlatformVersionError; use dpp::ProtocolError; -use rs_dapi_client::{CanRetry, DapiClientError}; +use rs_dapi_client::{CanRetry, DapiClientError, ExecutionError}; pub use drive_proof_verifier::error::ContextProviderError; /// Error type for the SDK +// TODO: Propagate server address and retry information so that the user can retrieve it #[derive(Debug, thiserror::Error)] pub enum Error { /// SDK is not configured properly @@ -85,6 +86,16 @@ impl From for Error { } } +impl From> for Error +where + ExecutionError: ToString, +{ + fn from(value: ExecutionError) -> Self { + // TODO: Improve error handling + Self::DapiClientError(value.to_string()) + } +} + impl CanRetry for Error { fn can_retry(&self) -> bool { matches!(self, Error::StaleNode(..) | Error::TimeoutReached(_, _)) diff --git a/packages/rs-sdk/src/platform/fetch.rs b/packages/rs-sdk/src/platform/fetch.rs index 1f97bb4426..80564fbdf2 100644 --- a/packages/rs-sdk/src/platform/fetch.rs +++ b/packages/rs-sdk/src/platform/fetch.rs @@ -9,6 +9,7 @@ //! traits. The associated [Fetch::Request]` type needs to implement [TransportRequest]. use crate::mock::MockResponse; +use crate::sync::retry; use crate::{error::Error, platform::query::Query, Sdk}; use dapi_grpc::platform::v0::{self as platform_proto, Proof, ResponseMetadata}; use dpp::voting::votes::Vote; @@ -18,6 +19,7 @@ use dpp::{ }; use drive_proof_verifier::FromProof; use rs_dapi_client::{transport::TransportRequest, DapiRequest, RequestSettings}; +use rs_dapi_client::{ExecutionError, ExecutionResponse, InnerInto, IntoInner}; use std::fmt::Debug; use super::types::identity::IdentityRequest; @@ -119,25 +121,9 @@ where query: Q, settings: Option, ) -> Result<(Option, ResponseMetadata), Error> { - let request = query.query(sdk.prove())?; - - let response = request - .clone() - .execute(sdk, settings.unwrap_or_default()) - .await // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner())?; - - let object_type = std::any::type_name::().to_string(); - tracing::trace!(request = ?request, response = ?response, object_type, "fetched object from platform"); - - let (object, response_metadata): (Option, ResponseMetadata) = - sdk.parse_proof_with_metadata(request, response).await?; - - match object { - Some(item) => Ok((item.into(), response_metadata)), - None => Ok((None, response_metadata)), - } + Self::fetch_with_metadata_and_proof(sdk, query, settings) + .await + .map(|(object, metadata, _)| (object, metadata)) } /// Fetch single object from Platform with metadata and underlying proof. @@ -169,26 +155,47 @@ where query: Q, settings: Option, ) -> Result<(Option, ResponseMetadata, Proof), Error> { - let request = query.query(sdk.prove())?; + let request: &::Request = &query.query(sdk.prove())?; + + let fut = |settings: RequestSettings| async move { + let ExecutionResponse { + address, + retries, + inner: response, + } = request + .clone() + .execute(sdk, settings) + .await + .map_err(|execution_error| execution_error.inner_into())?; + + let object_type = std::any::type_name::().to_string(); + tracing::trace!(request = ?request, response = ?response, ?address, retries, object_type, "fetched object from platform"); - let response = request - .clone() - .execute(sdk, settings.unwrap_or_default()) - .await // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner())?; + let (object, response_metadata, proof): (Option, ResponseMetadata, Proof) = sdk + .parse_proof_with_metadata_and_proof(request.clone(), response) + .await + .map_err(|e| ExecutionError { + inner: e, + address: Some(address.clone()), + retries, + })?; - let object_type = std::any::type_name::().to_string(); - tracing::trace!(request = ?request, response = ?response, object_type, "fetched object from platform"); + match object { + Some(item) => Ok((item.into(), response_metadata, proof)), + None => Ok((None, response_metadata, proof)), + } + .map(|x| ExecutionResponse { + inner: x, + address, + retries, + }) + }; - let (object, response_metadata, proof): (Option, ResponseMetadata, Proof) = sdk - .parse_proof_with_metadata_and_proof(request, response) - .await?; + let settings = sdk + .dapi_client_settings + .override_by(settings.unwrap_or_default()); - match object { - Some(item) => Ok((item.into(), response_metadata, proof)), - None => Ok((None, response_metadata, proof)), - } + retry(settings, fut).await.into_inner() } /// Fetch single object from Platform. diff --git a/packages/rs-sdk/src/platform/fetch_many.rs b/packages/rs-sdk/src/platform/fetch_many.rs index a9efb4ebc9..4653835557 100644 --- a/packages/rs-sdk/src/platform/fetch_many.rs +++ b/packages/rs-sdk/src/platform/fetch_many.rs @@ -4,21 +4,22 @@ //! //! ## Traits //! - `[FetchMany]`: An async trait that fetches multiple items of a specific type from Platform. + use super::LimitQuery; use crate::{ error::Error, mock::MockResponse, platform::{document_query::DocumentQuery, query::Query}, + sync::retry, Sdk, }; use dapi_grpc::platform::v0::{ GetContestedResourceIdentityVotesRequest, GetContestedResourceVoteStateRequest, GetContestedResourceVotersForIdentityRequest, GetContestedResourcesRequest, - GetDataContractsRequest, GetDocumentsResponse, GetEpochsInfoRequest, - GetEvonodesProposedEpochBlocksByIdsRequest, GetEvonodesProposedEpochBlocksByRangeRequest, - GetIdentitiesBalancesRequest, GetIdentityKeysRequest, GetPathElementsRequest, - GetProtocolVersionUpgradeStateRequest, GetProtocolVersionUpgradeVoteStatusRequest, - GetVotePollsByEndDateRequest, + GetDataContractsRequest, GetEpochsInfoRequest, GetEvonodesProposedEpochBlocksByIdsRequest, + GetEvonodesProposedEpochBlocksByRangeRequest, GetIdentitiesBalancesRequest, + GetIdentityKeysRequest, GetPathElementsRequest, GetProtocolVersionUpgradeStateRequest, + GetProtocolVersionUpgradeVoteStatusRequest, GetVotePollsByEndDateRequest, }; use dashcore_rpc::dashcore::ProTxHash; use dpp::data_contract::DataContract; @@ -40,7 +41,10 @@ use drive_proof_verifier::types::{ ProtocolVersionUpgrades, ResourceVotesByIdentity, VotePollsGroupedByTimestamp, Voter, Voters, }; use drive_proof_verifier::{types::Documents, FromProof}; -use rs_dapi_client::{transport::TransportRequest, DapiRequest, RequestSettings}; +use rs_dapi_client::{ + transport::TransportRequest, DapiRequest, ExecutionError, ExecutionResponse, InnerInto, + IntoInner, RequestSettings, +}; /// Fetch multiple objects from Platform. /// @@ -141,24 +145,41 @@ where sdk: &Sdk, query: Q, ) -> Result { - let request = query.query(sdk.prove())?; + let request = &query.query(sdk.prove())?; + let closure = |settings: RequestSettings| async move { + let ExecutionResponse { + address, + retries, + inner: response, + } = request + .clone() + .execute(sdk, settings) + .await + .map_err(|e| e.inner_into())?; - let response = request - .clone() - .execute(sdk, RequestSettings::default()) - .await // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner())?; + let object_type = std::any::type_name::().to_string(); + tracing::trace!(request = ?request, response = ?response, ?address, retries, object_type, "fetched object from platform"); - let object_type = std::any::type_name::().to_string(); - tracing::trace!(request = ?request, response = ?response, object_type, "fetched object from platform"); + sdk.parse_proof::<>::Request, O>(request.clone(), response) + .await + .map(|o| ExecutionResponse { + inner: o, + retries, + address: address.clone(), + }) + .map_err(|e| ExecutionError { + inner: e, + retries, + address: Some(address), + }) + }; - let object: O = sdk - .parse_proof::<>::Request, O>(request, response) - .await? - .unwrap_or_default(); + let settings = sdk.dapi_client_settings; - Ok(object) + retry(settings, closure) + .await + .into_inner() + .map(|o| o.unwrap_or_default()) } /// Fetch multiple objects from Platform by their identifiers. @@ -231,24 +252,38 @@ impl FetchMany for Document { sdk: &Sdk, query: Q, ) -> Result { - let document_query: DocumentQuery = query.query(sdk.prove())?; + let document_query: &DocumentQuery = &query.query(sdk.prove())?; + + retry(sdk.dapi_client_settings, |settings| async move { + let request = document_query.clone(); - let request = document_query.clone(); - let response: GetDocumentsResponse = request - .execute(sdk, RequestSettings::default()) - .await // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner())?; + let ExecutionResponse { + address, + retries, + inner: response, + } = request.execute(sdk, settings).await.map_err(|e| e.inner_into())?; - tracing::trace!(request=?document_query, response=?response, "fetch multiple documents"); + tracing::trace!(request=?document_query, response=?response, ?address, retries, "fetch multiple documents"); - // let object: Option> = sdk - let documents: Documents = sdk - .parse_proof::(document_query, response) - .await? - .unwrap_or_default(); + // let object: Option> = sdk + let documents = sdk + .parse_proof::(document_query.clone(), response) + .await + .map_err(|e| ExecutionError { + inner: e, + retries, + address: Some(address.clone()), + })? + .unwrap_or_default(); - Ok(documents) + Ok(ExecutionResponse { + inner: documents, + retries, + address, + }) + }) + .await + .into_inner() } } diff --git a/packages/rs-sdk/src/platform/fetch_unproved.rs b/packages/rs-sdk/src/platform/fetch_unproved.rs index 2d85dab7a9..ac3a682f81 100644 --- a/packages/rs-sdk/src/platform/fetch_unproved.rs +++ b/packages/rs-sdk/src/platform/fetch_unproved.rs @@ -1,7 +1,7 @@ use super::{types::evonode::EvoNode, Query}; -use crate::error::Error; use crate::mock::MockResponse; use crate::Sdk; +use crate::{error::Error, sync::retry}; use dapi_grpc::platform::v0::{ self as platform_proto, GetStatusRequest, GetStatusResponse, ResponseMetadata, }; @@ -9,6 +9,7 @@ use dpp::{dashcore::Network, version::PlatformVersion}; use drive_proof_verifier::types::EvoNodeStatus; use drive_proof_verifier::unproved::FromUnproved; use rs_dapi_client::{transport::TransportRequest, DapiRequest, RequestSettings}; +use rs_dapi_client::{ExecutionError, ExecutionResponse, InnerInto, IntoInner}; use std::fmt::Debug; #[async_trait::async_trait] @@ -71,21 +72,42 @@ where >, { // Default implementation - let request: ::Request = query.query(false)?; + let request: &::Request = &query.query(false)?; + let closure = move |local_settings: RequestSettings| async move { + // Execute the request using the Sdk instance + let ExecutionResponse { + inner: response, + address, + retries, + } = request + .clone() + .execute(sdk, local_settings) + .await + .map_err(|e| e.inner_into())?; - // Execute the request using the Sdk instance - let response = request - .clone() - .execute(sdk, settings) - .await // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner())?; + // Parse the response into the appropriate type along with metadata + let (object, metadata): (Option, platform_proto::ResponseMetadata) = + Self::maybe_from_unproved_with_metadata( + request.clone(), + response, + sdk.network, + sdk.version(), + ) + .map_err(|e| ExecutionError { + inner: e.into(), + address: Some(address.clone()), + retries, + })?; - // Parse the response into the appropriate type along with metadata - let (object, mtd): (Option, platform_proto::ResponseMetadata) = - Self::maybe_from_unproved_with_metadata(request, response, sdk.network, sdk.version())?; + Ok(ExecutionResponse { + inner: (object, metadata), + address, + retries, + }) + }; - Ok((object, mtd)) + let settings = sdk.dapi_client_settings.override_by(settings); + retry(settings, closure).await.into_inner() } } diff --git a/packages/rs-sdk/src/platform/transition/broadcast.rs b/packages/rs-sdk/src/platform/transition/broadcast.rs index 4b06691e1b..7e4c6488c1 100644 --- a/packages/rs-sdk/src/platform/transition/broadcast.rs +++ b/packages/rs-sdk/src/platform/transition/broadcast.rs @@ -7,7 +7,7 @@ use dpp::state_transition::StateTransition; use drive::drive::Drive; use drive_proof_verifier::error::ContextProviderError; use drive_proof_verifier::DataContractProvider; -use rs_dapi_client::{DapiRequest, RequestSettings}; +use rs_dapi_client::{DapiRequest, IntoInner, RequestSettings}; #[async_trait::async_trait] pub trait BroadcastStateTransition { @@ -27,7 +27,7 @@ impl BroadcastStateTransition for StateTransition { request .execute(sdk, RequestSettings::default()) .await // TODO: We need better way to handle execution errors - .map_err(|error| error.into_inner())?; + .into_inner()?; // response is empty for a broadcast, result comes from the stream wait for state transition result @@ -40,21 +40,19 @@ impl BroadcastStateTransition for StateTransition { _time_out_ms: Option, ) -> Result { let request = self.broadcast_request_for_state_transition()?; - + // TODO: Implement retry logic request .clone() .execute(sdk, RequestSettings::default()) - .await // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner())?; + .await + .into_inner()?; let request = self.wait_for_state_transition_result_request()?; let response = request .execute(sdk, RequestSettings::default()) - .await // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner())?; + .await + .into_inner()?; let block_info = block_info_from_metadata(response.metadata()?)?; let proof = response.proof_owned()?; diff --git a/packages/rs-sdk/src/platform/transition/purchase_document.rs b/packages/rs-sdk/src/platform/transition/purchase_document.rs index 809d5d81d7..1ede5c247e 100644 --- a/packages/rs-sdk/src/platform/transition/purchase_document.rs +++ b/packages/rs-sdk/src/platform/transition/purchase_document.rs @@ -19,7 +19,7 @@ use dpp::state_transition::documents_batch_transition::DocumentsBatchTransition; use dpp::state_transition::proof_result::StateTransitionProofResult; use dpp::state_transition::StateTransition; use drive::drive::Drive; -use rs_dapi_client::{DapiRequest, RequestSettings}; +use rs_dapi_client::{DapiRequest, IntoInner, RequestSettings}; #[async_trait::async_trait] /// A trait for purchasing a document on Platform @@ -102,7 +102,7 @@ impl PurchaseDocument for Document { .clone() .execute(sdk, settings.request_settings) .await // TODO: We need better way to handle execution errors - .map_err(|error| error.into_inner())?; + .into_inner()?; // response is empty for a broadcast, result comes from the stream wait for state transition result @@ -116,12 +116,11 @@ impl PurchaseDocument for Document { data_contract: Arc, ) -> Result { let request = state_transition.wait_for_state_transition_result_request()?; - + // TODO: Implement retry logic let response = request .execute(sdk, RequestSettings::default()) - .await // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner())?; + .await + .into_inner()?; let block_info = block_info_from_metadata(response.metadata()?)?; diff --git a/packages/rs-sdk/src/platform/transition/put_contract.rs b/packages/rs-sdk/src/platform/transition/put_contract.rs index 56a8410822..a8f07b0b31 100644 --- a/packages/rs-sdk/src/platform/transition/put_contract.rs +++ b/packages/rs-sdk/src/platform/transition/put_contract.rs @@ -18,7 +18,7 @@ use dpp::state_transition::StateTransition; use drive::drive::Drive; use drive_proof_verifier::error::ContextProviderError; use drive_proof_verifier::DataContractProvider; -use rs_dapi_client::{DapiRequest, RequestSettings}; +use rs_dapi_client::{DapiRequest, IntoInner, RequestSettings}; #[async_trait::async_trait] /// A trait for putting a contract to platform @@ -87,7 +87,7 @@ impl PutContract for DataContract { .clone() .execute(sdk, settings.unwrap_or_default().request_settings) .await // TODO: We need better way to handle execution errors - .map_err(|error| error.into_inner())?; + .into_inner()?; // response is empty for a broadcast, result comes from the stream wait for state transition result @@ -103,9 +103,8 @@ impl PutContract for DataContract { let response = request .execute(sdk, RequestSettings::default()) - .await // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner())?; + .await + .into_inner()?; let block_info = block_info_from_metadata(response.metadata()?)?; diff --git a/packages/rs-sdk/src/platform/transition/put_document.rs b/packages/rs-sdk/src/platform/transition/put_document.rs index feadb98cbc..806e640d93 100644 --- a/packages/rs-sdk/src/platform/transition/put_document.rs +++ b/packages/rs-sdk/src/platform/transition/put_document.rs @@ -17,7 +17,7 @@ use dpp::state_transition::documents_batch_transition::DocumentsBatchTransition; use dpp::state_transition::proof_result::StateTransitionProofResult; use dpp::state_transition::StateTransition; use drive::drive::Drive; -use rs_dapi_client::{DapiRequest, RequestSettings}; +use rs_dapi_client::{DapiRequest, IntoInner, RequestSettings}; #[async_trait::async_trait] /// A trait for putting a document to platform @@ -96,7 +96,7 @@ impl PutDocument for Document { .clone() .execute(sdk, settings.request_settings) .await // TODO: We need better way to handle execution errors - .map_err(|error| error.into_inner())?; + .into_inner()?; // response is empty for a broadcast, result comes from the stream wait for state transition result @@ -110,12 +110,11 @@ impl PutDocument for Document { data_contract: Arc, ) -> Result { let request = state_transition.wait_for_state_transition_result_request()?; - + // TODO: Implement retry logic let response = request .execute(sdk, RequestSettings::default()) - .await // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner())?; + .await + .into_inner()?; let block_info = block_info_from_metadata(response.metadata()?)?; diff --git a/packages/rs-sdk/src/platform/transition/put_identity.rs b/packages/rs-sdk/src/platform/transition/put_identity.rs index 1652f89569..e755b3a888 100644 --- a/packages/rs-sdk/src/platform/transition/put_identity.rs +++ b/packages/rs-sdk/src/platform/transition/put_identity.rs @@ -14,7 +14,7 @@ use drive_proof_verifier::DataContractProvider; use crate::platform::block_info_from_metadata::block_info_from_metadata; use dpp::state_transition::proof_result::StateTransitionProofResult; use drive::drive::Drive; -use rs_dapi_client::{DapiClientError, DapiRequest, RequestSettings}; +use rs_dapi_client::{DapiClientError, DapiRequest, IntoInner, RequestSettings}; #[async_trait::async_trait] /// A trait for putting an identity to platform @@ -57,7 +57,7 @@ impl PutIdentity for Identity { .clone() .execute(sdk, RequestSettings::default()) .await // TODO: We need better way to handle execution errors - .map_err(|error| error.into_inner())?; + .into_inner()?; // response is empty for a broadcast, result comes from the stream wait for state transition result @@ -82,9 +82,8 @@ impl PutIdentity for Identity { let response_result = request .clone() .execute(sdk, RequestSettings::default()) - .await // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner()); + .await + .into_inner(); match response_result { Ok(_) => {} @@ -103,12 +102,12 @@ impl PutIdentity for Identity { } let request = state_transition.wait_for_state_transition_result_request()?; + // TODO: Implement retry logic let response = request .execute(sdk, RequestSettings::default()) - .await // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner())?; + .await + .into_inner()?; let block_info = block_info_from_metadata(response.metadata()?)?; let proof = response.proof_owned()?; diff --git a/packages/rs-sdk/src/platform/transition/top_up_identity.rs b/packages/rs-sdk/src/platform/transition/top_up_identity.rs index d8595bee56..c43d8a9f19 100644 --- a/packages/rs-sdk/src/platform/transition/top_up_identity.rs +++ b/packages/rs-sdk/src/platform/transition/top_up_identity.rs @@ -11,7 +11,7 @@ use dpp::state_transition::proof_result::StateTransitionProofResult; use drive::drive::Drive; use drive_proof_verifier::error::ContextProviderError; use drive_proof_verifier::DataContractProvider; -use rs_dapi_client::{DapiRequest, RequestSettings}; +use rs_dapi_client::{DapiRequest, IntoInner, RequestSettings}; #[async_trait::async_trait] pub trait TopUpIdentity { @@ -48,15 +48,14 @@ impl TopUpIdentity for Identity { .clone() .execute(sdk, RequestSettings::default()) .await // TODO: We need better way to handle execution errors - .map_err(|error| error.into_inner())?; + .into_inner()?; let request = state_transition.wait_for_state_transition_result_request()?; - + // TODO: Implement retry logic in wait for state transition result let response = request .execute(sdk, RequestSettings::default()) - .await // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner())?; + .await + .into_inner()?; let block_info = block_info_from_metadata(response.metadata()?)?; diff --git a/packages/rs-sdk/src/platform/transition/transfer_document.rs b/packages/rs-sdk/src/platform/transition/transfer_document.rs index 4bb6996da5..a64c76cb95 100644 --- a/packages/rs-sdk/src/platform/transition/transfer_document.rs +++ b/packages/rs-sdk/src/platform/transition/transfer_document.rs @@ -18,7 +18,7 @@ use dpp::state_transition::documents_batch_transition::DocumentsBatchTransition; use dpp::state_transition::proof_result::StateTransitionProofResult; use dpp::state_transition::StateTransition; use drive::drive::Drive; -use rs_dapi_client::{DapiRequest, RequestSettings}; +use rs_dapi_client::{DapiRequest, IntoInner, RequestSettings}; #[async_trait::async_trait] /// A trait for transferring a document on Platform @@ -97,7 +97,7 @@ impl TransferDocument for Document { .clone() .execute(sdk, settings.request_settings) .await // TODO: We need better way to handle execution errors - .map_err(|error| error.into_inner())?; + .into_inner()?; // response is empty for a broadcast, result comes from the stream wait for state transition result @@ -114,9 +114,8 @@ impl TransferDocument for Document { let response = request .execute(sdk, RequestSettings::default()) - .await // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner())?; + .await + .into_inner()?; let block_info = block_info_from_metadata(response.metadata()?)?; diff --git a/packages/rs-sdk/src/platform/transition/update_price_of_document.rs b/packages/rs-sdk/src/platform/transition/update_price_of_document.rs index 779faf52d0..0f331cde5d 100644 --- a/packages/rs-sdk/src/platform/transition/update_price_of_document.rs +++ b/packages/rs-sdk/src/platform/transition/update_price_of_document.rs @@ -18,7 +18,7 @@ use dpp::state_transition::documents_batch_transition::DocumentsBatchTransition; use dpp::state_transition::proof_result::StateTransitionProofResult; use dpp::state_transition::StateTransition; use drive::drive::Drive; -use rs_dapi_client::{DapiRequest, RequestSettings}; +use rs_dapi_client::{DapiRequest, IntoInner, RequestSettings}; #[async_trait::async_trait] /// A trait for updating the price of a document on Platform @@ -98,7 +98,7 @@ impl UpdatePriceOfDocument for Document { .clone() .execute(sdk, settings.request_settings) .await // TODO: We need better way to handle execution errors - .map_err(|error| error.into_inner())?; + .into_inner()?; // response is empty for a broadcast, result comes from the stream wait for state transition result @@ -112,12 +112,11 @@ impl UpdatePriceOfDocument for Document { data_contract: Arc, ) -> Result { let request = state_transition.wait_for_state_transition_result_request()?; - + // TODO: Implement retry logic let response = request .execute(sdk, RequestSettings::default()) - .await // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner())?; + .await + .into_inner()?; let block_info = block_info_from_metadata(response.metadata()?)?; diff --git a/packages/rs-sdk/src/platform/transition/vote.rs b/packages/rs-sdk/src/platform/transition/vote.rs index 9a7906ffe4..5666b8b42d 100644 --- a/packages/rs-sdk/src/platform/transition/vote.rs +++ b/packages/rs-sdk/src/platform/transition/vote.rs @@ -17,7 +17,7 @@ use dpp::voting::votes::resource_vote::accessors::v0::ResourceVoteGettersV0; use dpp::voting::votes::Vote; use drive::drive::Drive; use drive_proof_verifier::{error::ContextProviderError, DataContractProvider}; -use rs_dapi_client::DapiRequest; +use rs_dapi_client::{DapiRequest, IntoInner}; #[async_trait::async_trait] /// A trait for putting a vote on platform @@ -74,7 +74,7 @@ impl PutVote for Vote { request .execute(sdk, settings.request_settings) .await // TODO: We need better way to handle execution errors - .map_err(|error| error.into_inner())?; + .into_inner()?; Ok(()) } @@ -108,12 +108,11 @@ impl PutVote for Vote { None, )?; let request = masternode_vote_transition.broadcast_request_for_state_transition()?; - + // TODO: Implement retry logic let response_result = request .execute(sdk, settings.request_settings) - .await // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner()); + .await + .into_inner(); match response_result { Ok(_) => {} @@ -134,9 +133,8 @@ impl PutVote for Vote { let request = masternode_vote_transition.wait_for_state_transition_result_request()?; let response = request .execute(sdk, settings.request_settings) - .await // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner())?; + .await + .into_inner()?; let block_info = block_info_from_metadata(response.metadata()?)?; let proof = response.proof_owned()?; diff --git a/packages/rs-sdk/src/sdk.rs b/packages/rs-sdk/src/sdk.rs index 466dba051a..cf9e0630ee 100644 --- a/packages/rs-sdk/src/sdk.rs +++ b/packages/rs-sdk/src/sdk.rs @@ -119,6 +119,9 @@ pub struct Sdk { /// Cancellation token; once cancelled, all pending requests should be aborted. pub(crate) cancel_token: CancellationToken, + /// Global settings of dapi client + pub(crate) dapi_client_settings: RequestSettings, + #[cfg(feature = "mocks")] dump_dir: Option, } @@ -134,6 +137,7 @@ impl Clone for Sdk { metadata_last_seen_height: Arc::clone(&self.metadata_last_seen_height), metadata_height_tolerance: self.metadata_height_tolerance, metadata_time_tolerance_ms: self.metadata_time_tolerance_ms, + dapi_client_settings: self.dapi_client_settings, #[cfg(feature = "mocks")] dump_dir: self.dump_dir.clone(), } @@ -958,6 +962,7 @@ impl SdkBuilder { #[allow(unused_mut)] // needs to be mutable for #[cfg(feature = "mocks")] let mut sdk= Sdk{ network: self.network, + dapi_client_settings: self.settings, inner:SdkInstance::Dapi { dapi, version:self.version }, proofs:self.proofs, context_provider: ArcSwapOption::new( self.context_provider.map(Arc::new)), @@ -1020,6 +1025,7 @@ impl SdkBuilder { let mock_sdk = Arc::new(Mutex::new(mock_sdk)); let sdk= Sdk { network: self.network, + dapi_client_settings: self.settings, inner:SdkInstance::Mock { mock:mock_sdk.clone(), dapi, diff --git a/packages/rs-sdk/src/sync.rs b/packages/rs-sdk/src/sync.rs index d3c066e8cb..38a878e174 100644 --- a/packages/rs-sdk/src/sync.rs +++ b/packages/rs-sdk/src/sync.rs @@ -3,10 +3,16 @@ //! This is a workaround for an issue in tokio, where you cannot call `block_on` from sync call that is called //! inside a tokio runtime. This module spawns async futures in active tokio runtime, and retrieves the result //! using a channel. -use drive_proof_verifier::error::ContextProviderError; -use std::{future::Future, sync::mpsc::SendError}; -use tokio::runtime::TryCurrentError; +use arc_swap::ArcSwap; +use drive_proof_verifier::error::ContextProviderError; +use rs_dapi_client::{CanRetry, ExecutionResult, RequestSettings}; +use std::{ + fmt::Debug, + future::Future, + sync::{mpsc::SendError, Arc}, +}; +use tokio::{runtime::TryCurrentError, sync::Mutex}; #[derive(Debug, thiserror::Error)] pub enum AsyncError { /// Not running inside tokio runtime @@ -88,10 +94,149 @@ async fn worker( Ok(()) } +/// Retry the provided closure. +/// +/// This function is used to retry async code. It takes into account number of retries already executed by lower +/// layers and stops retrying once the maximum number of retries is reached. +/// +/// The `settings` should contain maximum number of retries that should be executed. In case of failure, total number of +/// requests sent is expected to be at least `settings.retries + 1` (initial request + `retries` configured in settings). +/// The actual number of requests sent can be higher, as the lower layers can retry the request multiple times. +/// +/// `future_factory_fn` should be a `FnMut()` closure that returns a future that should be retried. +/// It takes [`RequestSettings`] as an argument and returns [`ExecutionResult`]. +/// Retry mechanism can change [`RequestSettings`] between invocations of the `future_factory_fn` closure +/// to limit the number of retries for lower layers. +/// +/// ## Parameters +/// +/// - `settings` - global settings with any request-specific settings overrides applied. +/// - `future_factory_fn` - closure that returns a future that should be retried. It should take [`RequestSettings`] as +/// an argument and return [`ExecutionResult`]. +/// +/// ## Returns +/// +/// Returns future that resolves to [`ExecutionResult`]. +/// +/// ## Example +/// +/// ```rust +/// # use dash_sdk::RequestSettings; +/// # use dash_sdk::error::{Error,StaleNodeError}; +/// # use rs_dapi_client::{ExecutionResult, ExecutionError}; +/// async fn retry_test_function(settings: RequestSettings) -> ExecutionResult<(), dash_sdk::Error> { +/// // do something +/// Err(ExecutionError { +/// inner: Error::StaleNode(StaleNodeError::Height{ +/// expected_height: 10, +/// received_height: 3, +/// tolerance_blocks: 1, +/// }), +/// retries: 0, +/// address: None, +/// }) +/// } +/// #[tokio::main] +/// async fn main() { +/// let global_settings = RequestSettings::default(); +/// dash_sdk::sync::retry(global_settings, retry_test_function).await.expect_err("should fail"); +/// } +/// ``` +/// +/// ## Troubleshooting +/// +/// Compiler error: `no method named retry found for closure`: +/// - ensure returned value is [`ExecutionResult`]., +/// - consider adding `.await` at the end of the closure. +/// +/// +/// ## See also +/// +/// - [`::backon`] crate that is used by this function. +pub async fn retry( + settings: RequestSettings, + future_factory_fn: FutureFactoryFn, +) -> ExecutionResult +where + Fut: Future>, + FutureFactoryFn: FnMut(RequestSettings) -> Fut, + E: CanRetry + Debug, +{ + let max_retries = settings.retries.unwrap_or_default(); + + let backoff_strategy = backon::ConstantBuilder::default() + .with_delay(std::time::Duration::from_millis(10)) // we use different server, so no real delay needed, just to avoid spamming + .with_max_times(max_retries); + + let mut retries: usize = 0; + + // Settings must be modified inside `when()` closure, so we need to use `ArcSwap` to allow mutable access to settings. + let settings = ArcSwap::new(Arc::new(settings)); + + // Closure below needs to be FnMut, so we need mutable future_factory_fn. In order to achieve that, + // we use Arc>> pattern, to NOT move `future_factory_fn` directly into closure (as this breaks FnMut), + // while still allowing mutable access to it. + let inner_fn = Arc::new(Mutex::new(future_factory_fn)); + + let closure_settings = &settings; + // backon also support [backon::RetryableWithContext], but it doesn't pass the context to `when()` call. + // As we need to modify the settings inside `when()`, context doesn't solve our problem and we have to implement + // our own "context-like" logic using the closure below and `ArcSwap` for settings. + let closure = move || { + let inner_fn = inner_fn.clone(); + async move { + let settings = closure_settings.load_full().clone(); + let mut func = inner_fn.lock().await; + (*func)(*settings).await + } + }; + + let result= ::backon::Retryable::retry(closure,backoff_strategy) + .when(|e| { + if e.can_retry() { + // requests sent for current execution attempt; + let requests_sent = e.retries + 1; + + // requests sent in all preceeding attempts; user expects `settings.retries +1` + retries += requests_sent; + let all_requests_sent = retries; + + if all_requests_sent <=max_retries { // we account for for initial request + tracing::warn!(retry = all_requests_sent, max_retries, error=?e, "retrying request"); + let new_settings = RequestSettings { + retries: Some(max_retries - all_requests_sent), // limit num of retries for lower layer + ..**settings.load() + }; + settings.store(Arc::new(new_settings)); + true + } else { + tracing::error!(retry = all_requests_sent, max_retries, error=?e, "no more retries left, giving up"); + false + } + } else { + false + } + }) + .notify(|error, duration| { + tracing::warn!(?duration, ?error, "request failed, retrying"); + }) + .await; + + result.map_err(|mut e| { + e.retries = retries; + e + }) +} + #[cfg(test)] mod test { use super::*; - use std::future::Future; + use http::Uri; + use rs_dapi_client::ExecutionError; + use std::{ + future::Future, + sync::atomic::{AtomicUsize, Ordering}, + }; use tokio::{ runtime::Builder, sync::mpsc::{self, Receiver}, @@ -168,4 +313,64 @@ mod test { assert_eq!(result.unwrap(), "Success"); } } + + #[derive(Debug)] + enum MockError { + Generic, + } + impl CanRetry for MockError { + fn can_retry(&self) -> bool { + true + } + } + + async fn retry_test_function( + settings: RequestSettings, + counter: Arc, + ) -> ExecutionResult<(), MockError> { + // num or retries increases with each call + let retries = counter.load(Ordering::Relaxed); + let retries = if settings.retries.unwrap_or_default() < retries { + settings.retries.unwrap_or_default() + } else { + retries + }; + + // we sent 1 initial request plus `retries` retries + counter.fetch_add(1 + retries, Ordering::Relaxed); + + Err(ExecutionError { + inner: MockError::Generic, + retries, + address: Some(Uri::from_static("http://localhost").into()), + }) + } + + #[test_case::test_matrix([1,2,3,5,7,8,10,11,23,49, usize::MAX])] + #[tokio::test] + async fn test_retry(expected_requests: usize) { + for _ in 0..1 { + let counter = Arc::new(AtomicUsize::new(0)); + + // we retry 5 times, and expect 5 retries + 1 initial request + let mut global_settings = RequestSettings::default(); + global_settings.retries = Some(expected_requests - 1); + + let closure = |s| { + let counter = counter.clone(); + retry_test_function(s, counter) + }; + + retry(global_settings, closure) + .await + .expect_err("should fail"); + + assert_eq!( + counter.load(Ordering::Relaxed), + expected_requests, + "test failed for expected {} requests", + expected_requests + ); + } + } } diff --git a/packages/rs-sdk/tests/fetch/epoch.rs b/packages/rs-sdk/tests/fetch/epoch.rs index f898f2abcc..7a2d4c3bed 100644 --- a/packages/rs-sdk/tests/fetch/epoch.rs +++ b/packages/rs-sdk/tests/fetch/epoch.rs @@ -12,7 +12,7 @@ use dpp::block::epoch::EpochIndex; use dpp::block::extended_epoch_info::v0::ExtendedEpochInfoV0Getters; use dpp::block::extended_epoch_info::ExtendedEpochInfo; use drive_proof_verifier::types::ExtendedEpochInfos; -use rs_dapi_client::{DapiRequestExecutor, RequestSettings}; +use rs_dapi_client::{DapiRequestExecutor, IntoInner, RequestSettings}; /// Get current epoch index from DAPI response metadata async fn get_current_epoch(sdk: &Sdk, cfg: &Config) -> EpochIndex { @@ -25,9 +25,8 @@ async fn get_current_epoch(sdk: &Sdk, cfg: &Config) -> EpochIndex { let response = sdk .execute(identity_request, RequestSettings::default()) - .await // TODO: We need better way to handle execution response and errors - .map(|execution_response| execution_response.into_inner()) - .map_err(|execution_error| execution_error.into_inner()) + .await + .into_inner() .expect("get identity"); response.metadata().expect("metadata").epoch as EpochIndex