Skip to content

Commit

Permalink
Merge branch 'v1.4-dev' into feat/transferHelper
Browse files Browse the repository at this point in the history
  • Loading branch information
QuantumExplorer authored Oct 29, 2024
2 parents 3bb6712 + 4dbdc1f commit fe027f8
Show file tree
Hide file tree
Showing 369 changed files with 2,218 additions and 287 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions packages/dashmate/configs/defaults/getMainnetConfigFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ export default function getMainnetConfigFactory(homeDir, getBaseConfig) {
host: '152.42.151.147',
port: 26656,
},
{
id: 'fdc2239c1e0e62f3a192823d6e068d012620a2d1',
host: 'seed-1.pshenmic.dev',
port: 26656,
},
],
},
mempool: {
Expand Down
5 changes: 5 additions & 0 deletions packages/dashmate/configs/defaults/getTestnetConfigFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ export default function getTestnetConfigFactory(homeDir, getBaseConfig) {
host: '35.92.64.72',
port: 36656,
},
{
id: 'de3a73fc78e5c828151454156b492e4a2d985849',
host: 'seed-1.pshenmic.dev',
port: 36656,
},
],
port: 36656,
},
Expand Down
13 changes: 13 additions & 0 deletions packages/dashmate/configs/getConfigFileMigrationsFactory.js
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,19 @@ export default function getConfigFileMigrationsFactory(homeDir, defaultConfigs)
});
return configFile;
},
'1.5.0': (configFile) => {
Object.entries(configFile.configs)
.forEach(([name, options]) => {
if (options.network === NETWORK_MAINNET && name !== 'base') {
options.platform.drive.tenderdash.p2p.seeds = mainnet.get('platform.drive.tenderdash.p2p.seeds');
}

if (options.network === NETWORK_TESTNET && name !== 'base') {
options.platform.drive.tenderdash.p2p.seeds = testnet.get('platform.drive.tenderdash.p2p.seeds');
}
});
return configFile;
},
};
}

Expand Down
2 changes: 2 additions & 0 deletions packages/js-dapi-client/lib/networkConfigs.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module.exports = {
'seed-3.testnet.networks.dash.org:1443',
'seed-4.testnet.networks.dash.org:1443',
'seed-5.testnet.networks.dash.org:1443',
'seed-1.pshenmic.dev:1443',
],
network: 'testnet',
// Since we don't have PoSe atm, 3rd party masternodes sometimes provide wrong data
Expand Down Expand Up @@ -57,6 +58,7 @@ module.exports = {
'seed-2.mainnet.networks.dash.org',
'seed-3.mainnet.networks.dash.org',
'seed-4.mainnet.networks.dash.org',
'seed-1.pshenmic.dev',
],
network: 'mainnet',
},
Expand Down
3 changes: 1 addition & 2 deletions packages/rs-dapi-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ edition = "2021"

[features]
default = ["mocks", "offline-testing"]
tokio-sleep = ["backon/tokio-sleep"]
mocks = [
"dep:sha2",
"dep:hex",
Expand All @@ -20,7 +19,7 @@ dump = ["mocks"]
offline-testing = []

[dependencies]
backon = { version = "1.2"}
backon = { version = "1.2", features = ["tokio-sleep"] }
dapi-grpc = { path = "../dapi-grpc", features = [
"core",
"platform",
Expand Down
18 changes: 17 additions & 1 deletion packages/rs-dapi-client/src/address_list.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Subsystem to manage DAPI nodes.

use chrono::Utc;
use dapi_grpc::tonic::codegen::http;
use dapi_grpc::tonic::transport::Uri;
use rand::{rngs::SmallRng, seq::IteratorRandom, SeedableRng};
use std::collections::HashSet;
Expand All @@ -20,6 +21,16 @@ pub struct Address {
uri: Uri,
}

impl FromStr for Address {
type Err = AddressListError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
Uri::from_str(s)
.map(Address::from)
.map_err(AddressListError::from)
}
}

impl PartialEq<Self> for Address {
fn eq(&self, other: &Self) -> bool {
self.uri == other.uri
Expand Down Expand Up @@ -79,9 +90,13 @@ impl Address {
#[derive(Debug, thiserror::Error)]
#[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))]
pub enum AddressListError {
/// Specified address not present in the list
/// Specified address is not present in the list
#[error("address {0} not found in the list")]
AddressNotFound(#[cfg_attr(feature = "mocks", serde(with = "http_serde::uri"))] Uri),
/// A valid uri is required to create an Address
#[error("unable parse address: {0}")]
#[cfg_attr(feature = "mocks", serde(skip))]
InvalidAddressUri(#[from] http::uri::InvalidUri),
}

/// A structure to manage DAPI addresses to select from
Expand Down Expand Up @@ -201,6 +216,7 @@ impl AddressList {
}
}

// TODO: Must be changed to FromStr
impl From<&str> for AddressList {
fn from(value: &str) -> Self {
let uri_list: Vec<Uri> = value
Expand Down
105 changes: 58 additions & 47 deletions packages/rs-dapi-client/src/dapi_client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! [DapiClient] definition.

use backon::{ExponentialBuilder, Retryable};
use backon::{ConstantBuilder, Retryable};
use dapi_grpc::mock::Mockable;
use dapi_grpc::tonic::async_trait;
use std::fmt::Debug;
use std::sync::atomic::AtomicUsize;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tracing::Instrument;
Expand All @@ -12,19 +13,17 @@ use crate::address_list::AddressListError;
use crate::connection_pool::ConnectionPool;
use crate::{
transport::{TransportClient, TransportRequest},
Address, AddressList, CanRetry, RequestSettings,
Address, AddressList, CanRetry, DapiRequestExecutor, ExecutionError, ExecutionResponse,
ExecutionResult, RequestSettings,
};

/// General DAPI request error type.
#[derive(Debug, thiserror::Error)]
#[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))]
pub enum DapiClientError<TE: Mockable> {
/// The error happened on transport layer
#[error("transport error with {1}: {0}")]
Transport(
#[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))] TE,
Address,
),
#[error("transport error: {0}")]
Transport(#[cfg_attr(feature = "mocks", serde(with = "dapi_grpc::mock::serde_mockable"))] TE),
/// There are no valid DAPI addresses to use.
#[error("no available addresses to use")]
NoAvailableAddresses,
Expand All @@ -43,7 +42,7 @@ impl<TE: CanRetry + Mockable> CanRetry for DapiClientError<TE> {
use DapiClientError::*;
match self {
NoAvailableAddresses => false,
Transport(transport_error, _) => transport_error.can_retry(),
Transport(transport_error) => transport_error.can_retry(),
AddressList(_) => false,
#[cfg(feature = "mocks")]
Mock(_) => false,
Expand Down Expand Up @@ -73,21 +72,6 @@ impl<TE: Mockable> Mockable for DapiClientError<TE> {
}
}

#[async_trait]
/// DAPI client executor trait.
pub trait DapiRequestExecutor {
/// Execute request using this DAPI client.
async fn execute<R>(
&self,
request: R,
settings: RequestSettings,
) -> Result<R::Response, DapiClientError<<R::Client as TransportClient>::Error>>
where
R: TransportRequest + Mockable,
R::Response: Mockable,
<R::Client as TransportClient>::Error: Mockable;
}

/// Access point to DAPI.
#[derive(Debug, Clone)]
pub struct DapiClient {
Expand Down Expand Up @@ -126,7 +110,7 @@ impl DapiRequestExecutor for DapiClient {
&self,
request: R,
settings: RequestSettings,
) -> Result<R::Response, DapiClientError<<R::Client as TransportClient>::Error>>
) -> ExecutionResult<R::Response, DapiClientError<<R::Client as TransportClient>::Error>>
where
R: TransportRequest + Mockable,
R::Response: Mockable,
Expand All @@ -140,24 +124,25 @@ impl DapiRequestExecutor for DapiClient {
.finalize();

// Setup retry policy:
let retry_settings = ExponentialBuilder::default()
let retry_settings = ConstantBuilder::default()
.with_max_times(applied_settings.retries)
// backon doesn't accept 1.0
.with_factor(1.001)
.with_min_delay(Duration::from_secs(0))
.with_max_delay(Duration::from_secs(0));
.with_delay(Duration::from_millis(10));

// Save dump dir for later use, as self is moved into routine
#[cfg(feature = "dump")]
let dump_dir = self.dump_dir.clone();
#[cfg(feature = "dump")]
let dump_request = request.clone();

let retries_counter_arc = Arc::new(AtomicUsize::new(0));
let retries_counter_arc_ref = &retries_counter_arc;

// Setup DAPI request execution routine future. It's a closure that will be called
// more once to build new future on each retry.
let routine = move || {
// Try to get an address to initialize transport on:
let retries_counter = Arc::clone(retries_counter_arc_ref);

// Try to get an address to initialize transport on:
let address_list = self
.address_list
.read()
Expand Down Expand Up @@ -192,30 +177,29 @@ impl DapiRequestExecutor for DapiClient {
async move {
// It stays wrapped in `Result` since we want to return
// `impl Future<Output = Result<...>`, not a `Result` itself.
let address = address_result?;
let address = address_result.map_err(|inner| ExecutionError {
inner,
retries: retries_counter.load(std::sync::atomic::Ordering::Acquire),
address: None,
})?;

let pool = self.pool.clone();

let mut transport_client = R::Client::with_uri_and_settings(
address.uri().clone(),
&applied_settings,
&pool,
)
.map_err(|e| {
DapiClientError::<<R::Client as TransportClient>::Error>::Transport(
e,
address.clone(),
)
.map_err(|error| ExecutionError {
inner: DapiClientError::Transport(error),
retries: retries_counter.load(std::sync::atomic::Ordering::Acquire),
address: Some(address.clone()),
})?;

let response = transport_request
.execute_transport(&mut transport_client, &applied_settings)
.await
.map_err(|e| {
DapiClientError::<<R::Client as TransportClient>::Error>::Transport(
e,
address.clone(),
)
});
.map_err(DapiClientError::Transport);

match &response {
Ok(_) => {
Expand All @@ -226,8 +210,14 @@ impl DapiRequestExecutor for DapiClient {
.write()
.expect("can't get address list for write");

address_list.unban_address(&address)
.map_err(DapiClientError::<<R::Client as TransportClient>::Error>::AddressList)?;
address_list.unban_address(&address).map_err(|error| {
ExecutionError {
inner: DapiClientError::AddressList(error),
retries: retries_counter
.load(std::sync::atomic::Ordering::Acquire),
address: Some(address.clone()),
}
})?;
}

tracing::trace!(?response, "received {} response", response_name);
Expand All @@ -240,16 +230,34 @@ impl DapiRequestExecutor for DapiClient {
.write()
.expect("can't get address list for write");

address_list.ban_address(&address)
.map_err(DapiClientError::<<R::Client as TransportClient>::Error>::AddressList)?;
address_list.ban_address(&address).map_err(|error| {
ExecutionError {
inner: DapiClientError::AddressList(error),
retries: retries_counter
.load(std::sync::atomic::Ordering::Acquire),
address: Some(address.clone()),
}
})?;
}
} else {
tracing::trace!(?error, "received error");
}
}
};

let retries = retries_counter.load(std::sync::atomic::Ordering::Acquire);

response
.map(|inner| ExecutionResponse {
inner,
retries,
address: address.clone(),
})
.map_err(|inner| ExecutionError {
inner,
retries,
address: Some(address),
})
}
};

Expand All @@ -258,11 +266,14 @@ impl DapiRequestExecutor for DapiClient {
let result = routine
.retry(retry_settings)
.notify(|error, duration| {
let retries_counter = Arc::clone(&retries_counter_arc);
retries_counter.fetch_add(1, std::sync::atomic::Ordering::AcqRel);

tracing::warn!(
?error,
"retrying error with sleeping {} secs",
duration.as_secs_f32()
)
);
})
.when(|e| e.can_retry())
.instrument(tracing::info_span!("request routine"))
Expand Down
Loading

0 comments on commit fe027f8

Please sign in to comment.