Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk)!: retry broadcast operations #2337

Merged
merged 19 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/dapi-grpc/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl MappingConfig {
create_dir_all(&self.out_dir)?;

self.builder
.compile(&[self.protobuf_file], &self.proto_includes)
.compile_protos(&[self.protobuf_file], &self.proto_includes)
}
}

Expand Down
17 changes: 15 additions & 2 deletions packages/rs-dapi-client/src/dapi_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,11 @@ impl DapiRequestExecutor for DapiClient {
.address_list
.write()
.expect("can't get address list for write");

lklimek marked this conversation as resolved.
Show resolved Hide resolved
tracing::warn!(
?address,
?error,
"received server error, banning address"
);
address_list.ban_address(&address).map_err(|error| {
ExecutionError {
inner: DapiClientError::AddressList(error),
Expand All @@ -236,9 +240,18 @@ impl DapiRequestExecutor for DapiClient {
address: Some(address.clone()),
}
})?;
} else {
tracing::debug!(
?address,
?error,
"received server error, we should ban the node but banning is disabled"
);
}
} else {
tracing::trace!(?error, "received error");
tracing::debug!(
?error,
"received server error, most likely the request is invalid"
);
}
}
};
Expand Down
73 changes: 73 additions & 0 deletions packages/rs-dapi-client/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,18 @@ where
/// Result of request execution
pub type ExecutionResult<R, E> = Result<ExecutionResponse<R>, ExecutionError<E>>;

impl<R, E> From<ExecutionResponse<R>> for ExecutionResult<R, E> {
fn from(response: ExecutionResponse<R>) -> Self {
ExecutionResult::<R, E>::Ok(response)
}
}

impl<R, E> From<ExecutionError<E>> for ExecutionResult<R, E> {
fn from(e: ExecutionError<E>) -> Self {
ExecutionResult::<R, E>::Err(e)
}
}

impl<R, E> IntoInner<Result<R, E>> for ExecutionResult<R, E> {
fn into_inner(self) -> Result<R, E> {
match self {
Expand All @@ -145,3 +157,64 @@ where
}
}
}

/// Convert Result<T,TE> to ExecutionResult<R,E>, taking context from ExecutionResponse.
pub trait WrapToExecutionResult<R, RE, W>: Sized {
/// Convert self (eg. some [Result]) to [ExecutionResult], taking context information from `W` (eg. ExecutionResponse).
///
/// This function simplifies processing of results by wrapping them into ExecutionResult.
/// It is useful when you have execution result retrieved in previous step and you want to
/// add it to the result of the current step.
///
/// Useful when chaining multiple commands and you want to keep track of retries and address.
///
/// ## Example
///
/// ```rust
/// use rs_dapi_client::{ExecutionResponse, ExecutionResult, WrapToExecutionResult};
///
/// fn some_request() -> ExecutionResult<i8, String> {
/// Ok(ExecutionResponse {
/// inner: 42,
/// retries: 123,
/// address: "http://127.0.0.1".parse().expect("create mock address"),
/// })
/// }
///
/// fn next_step() -> Result<i32, String> {
/// Err("next error".to_string())
/// }
///
/// let response = some_request().expect("request should succeed");
/// let result: ExecutionResult<i32, String> = next_step().wrap_to_execution_result(&response);
///
/// if let ExecutionResult::Err(error) = result {
/// assert_eq!(error.inner, "next error");
/// assert_eq!(error.retries, 123);
/// } else {
/// panic!("Expected error");
/// }
/// ```
fn wrap_to_execution_result(self, result: &W) -> ExecutionResult<R, RE>;
}

impl<R, RE, TR, IR, IRE> WrapToExecutionResult<R, RE, ExecutionResponse<TR>> for Result<IR, IRE>
where
R: From<IR>,
RE: From<IRE>,
{
fn wrap_to_execution_result(self, result: &ExecutionResponse<TR>) -> ExecutionResult<R, RE> {
match self {
Ok(r) => ExecutionResult::Ok(ExecutionResponse {
inner: r.into(),
retries: result.retries,
address: result.address.clone(),
}),
Err(e) => ExecutionResult::Err(ExecutionError {
inner: e.into(),
retries: result.retries,
address: Some(result.address.clone()),
}),
}
}
}
1 change: 1 addition & 0 deletions packages/rs-dapi-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub use dapi_client::{DapiClient, DapiClientError};
pub use dump::DumpData;
pub use executor::{
DapiRequestExecutor, ExecutionError, ExecutionResponse, ExecutionResult, InnerInto, IntoInner,
WrapToExecutionResult,
};
use futures::{future::BoxFuture, FutureExt};
pub use request_settings::RequestSettings;
Expand Down
5 changes: 4 additions & 1 deletion packages/rs-dapi-client/src/request_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ const DEFAULT_BAN_FAILED_ADDRESS: bool = true;
pub struct RequestSettings {
/// Timeout for establishing a connection.
pub connect_timeout: Option<Duration>,
/// Timeout for a request.
/// Timeout for single request (soft limit).
///
/// Note that the total maximum time of execution can exceed `(timeout + connect_timeout) * retries`
/// as it accounts for internal processing time between retries.
pub timeout: Option<Duration>,
/// Number of retries in case of failed requests. If max retries reached, the last error is returned.
/// 1 means one request and one retry in case of error, etc.
Expand Down
34 changes: 32 additions & 2 deletions packages/rs-dapi-client/src/transport/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,38 @@ impl CanRetry for dapi_grpc::tonic::Status {
}
}

/// A shortcut to link between gRPC request type, response type, client and its
/// method in order to represent it in a form of types and data.
/// Macro to implement the `TransportRequest` trait for a given request type, response type, client type, and settings.
///
/// # Parameters
///
/// - `$request:ty`: The request type for which the `TransportRequest` trait will be implemented.
/// - `$response:ty`: The response type returned by the transport request.
/// - `$client:ty`: The client type used to execute the transport request (eg. generated by `tonic` crate).
/// - `$settings:expr`: The settings to be used for the transport request; these settings will override client's
/// default settings, but can still be overriden by arguments to
/// the [`DapiRequestExecutor::execute`](crate::DapiRequestExecutor::execute) method.
/// - `$($method:tt)+`: The method of `$client` to be called to execute the request.
///
/// # Example
///
/// ```compile_fail
/// impl_transport_request_grpc!(
/// MyRequestType,
/// MyResponseType,
/// MyClientType,
/// my_settings,
/// my_method
/// );
/// ```
///
/// This will generate an implementation of the `TransportRequest` trait for `MyRequestType`
/// that uses `MyClientType` to execute the `my_method` method, with the specified `my_settings`.
///
/// The generated implementation will:
/// - Define the associated types `Client` and `Response`.
/// - Set the `SETTINGS_OVERRIDES` constant to the provided settings.
/// - Implement the `method_name` function to return the name of the method as a string.
/// - Implement the `execute_transport` function to execute the transport request using the provided client and settings.
macro_rules! impl_transport_request_grpc {
($request:ty, $response:ty, $client:ty, $settings:expr, $($method:tt)+) => {
impl TransportRequest for $request {
Expand Down
4 changes: 2 additions & 2 deletions packages/rs-dpp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dashcore = { git = "https://github.com/dashpay/rust-dashcore", features = [
"signer",
"serde",
"bls",
"eddsa"
"eddsa",
], default-features = false, tag = "0.32.0" }
env_logger = { version = "0.11" }
getrandom = { version = "0.2", features = ["js"] }
Expand All @@ -56,7 +56,7 @@ platform-version = { path = "../rs-platform-version" }
platform-versioning = { path = "../rs-platform-versioning" }
platform-serialization = { path = "../rs-platform-serialization" }
platform-serialization-derive = { path = "../rs-platform-serialization-derive" }
derive_more = { version = "1.0", features = ["from", "display"] }
derive_more = { version = "1.0", features = ["from", "display", "try_into"] }
nohash-hasher = "0.2.0"
rust_decimal = "1.29.1"
rust_decimal_macros = "1.29.1"
Expand Down
2 changes: 1 addition & 1 deletion packages/rs-dpp/src/state_transition/proof_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::voting::votes::Vote;
use platform_value::Identifier;
use std::collections::BTreeMap;

#[derive(Debug)]
#[derive(Debug, strum::Display, derive_more::TryInto)]
shumkov marked this conversation as resolved.
Show resolved Hide resolved
pub enum StateTransitionProofResult {
VerifiedDataContract(DataContract),
VerifiedIdentity(Identity),
Expand Down
2 changes: 1 addition & 1 deletion packages/rs-sdk/src/core/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Sdk {
self.execute(core_transactions_stream, RequestSettings::default())
.await
.into_inner()
.map_err(|e| Error::DapiClientError(e.to_string()))
.map_err(|e| e.into())
}

/// Waits for a response for the asset lock proof
Expand Down
17 changes: 15 additions & 2 deletions packages/rs-sdk/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Definitions of errors
use dapi_grpc::tonic::Code;
use dpp::consensus::ConsensusError;
use dpp::serialization::PlatformDeserializable;
use dpp::version::PlatformVersionError;
Expand Down Expand Up @@ -56,6 +57,10 @@ pub enum Error {
/// SDK operation timeout reached error
#[error("SDK operation timeout {} secs reached: {1}", .0.as_secs())]
TimeoutReached(Duration, String),

/// Returned when an attempt is made to create an object that already exists in the system
#[error("Object already exists: {0}")]
AlreadyExists(String),
shumkov marked this conversation as resolved.
Show resolved Hide resolved
/// Generic error
// TODO: Use domain specific errors instead of generic ones
#[error("SDK error: {0}")]
Expand All @@ -78,6 +83,7 @@ pub enum Error {
impl From<DapiClientError> for Error {
fn from(value: DapiClientError) -> Self {
if let DapiClientError::Transport(TransportError::Grpc(status)) = &value {
// If we have some consensus error metadata, we deserialize it and return as ConsensusError
if let Some(consensus_error_value) = status
.metadata()
.get_bin("dash-serialized-consensus-error-bin")
Expand All @@ -88,11 +94,18 @@ impl From<DapiClientError> for Error {
.map(|consensus_error| {
Self::Protocol(ProtocolError::ConsensusError(Box::new(consensus_error)))
})
.unwrap_or_else(Self::Protocol);
.unwrap_or_else(|e| {
tracing::debug!("Failed to deserialize consensus error: {}", e);
Self::Protocol(e)
});
}
// Otherwise we parse the error code and act accordingly
if status.code() == Code::AlreadyExists {
return Self::AlreadyExists(status.message().to_string());
}
}

Self::DapiClientError(format!("{:?}", value))
Self::DapiClientError(value.to_string())
}
}

Expand Down
3 changes: 0 additions & 3 deletions packages/rs-sdk/src/platform/transition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
pub mod broadcast;
pub(crate) mod broadcast_identity;
pub mod broadcast_request;
pub(crate) mod context;
pub mod purchase_document;
pub mod put_contract;
pub mod put_document;
Expand All @@ -16,6 +15,4 @@ pub mod update_price_of_document;
pub mod vote;
pub mod withdraw_from_identity;

pub use context::*;

pub use txid::TxId;
Loading
Loading