Skip to content

Commit

Permalink
Add configurable BN service request timeouts
Browse files Browse the repository at this point in the history
This change adds configurable base node service request timeouts, notably
differentiating between general service requests, requests to fetch blocks
for block sync and requests for a complete current UTXO set included in the
blockchain.
  • Loading branch information
hansieodendaal committed Oct 27, 2020
1 parent ac91afa commit 1eabd46
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 66 deletions.
9 changes: 8 additions & 1 deletion applications/tari_base_node/src/bootstrap/base_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ use tokio::runtime;
const LOG_TARGET: &str = "c::bn::initialization";
/// The minimum buffer size for the base node pubsub_connector channel
const BASE_NODE_BUFFER_MIN_SIZE: usize = 30;
const SERVICE_REQUEST_MINIMUM_TIMEOUT: Duration = Duration::from_secs(10);

pub struct BaseNodeBootstrapper<'a, B> {
pub config: &'a GlobalConfig,
Expand All @@ -93,7 +94,12 @@ where B: BlockchainBackend + 'static
pubsub_connector(runtime::Handle::current(), buf_size, config.buffer_rate_limit_base_node);
let peer_message_subscriptions = Arc::new(peer_message_subscriptions);

let node_config = BaseNodeServiceConfig::default(); // TODO - make this configurable
let node_config = BaseNodeServiceConfig {
fetch_blocks_timeout: cmp::max(SERVICE_REQUEST_MINIMUM_TIMEOUT, config.fetch_blocks_timeout),
service_request_timeout: cmp::max(SERVICE_REQUEST_MINIMUM_TIMEOUT, config.service_request_timeout),
fetch_utxos_timeout: cmp::max(SERVICE_REQUEST_MINIMUM_TIMEOUT, config.fetch_utxos_timeout),
..Default::default()
};
let mempool_config = MempoolServiceConfig::default(); // TODO - make this configurable

let comms_config = self.create_comms_config();
Expand Down Expand Up @@ -133,6 +139,7 @@ where B: BlockchainBackend + 'static
self.factories,
sync_strategy,
config.orphan_db_clean_out_threshold,
cmp::max(SERVICE_REQUEST_MINIMUM_TIMEOUT, config.fetch_blocks_timeout),
))
.build()
.await?;
Expand Down
28 changes: 0 additions & 28 deletions base_layer/core/src/base_node/consts.rs

This file was deleted.

2 changes: 0 additions & 2 deletions base_layer/core/src/base_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ pub mod chain_metadata_service;
#[cfg(feature = "base_node")]
pub mod comms_interface;
#[cfg(feature = "base_node")]
pub mod consts;
#[cfg(feature = "base_node")]
pub mod service;
#[cfg(feature = "base_node")]
pub mod state_machine_service;
Expand Down
54 changes: 47 additions & 7 deletions base_layer/core/src/base_node/service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ use crate::{
NodeCommsRequest,
NodeCommsResponse,
},
consts::{BASE_NODE_SERVICE_DESIRED_RESPONSE_FRACTION, BASE_NODE_SERVICE_REQUEST_TIMEOUT},
generate_request_key,
proto,
proto::base_node::base_node_service_request::Request,
service::error::BaseNodeServiceError,
state_machine_service::states::StateInfo,
RequestKey,
Expand Down Expand Up @@ -71,17 +71,23 @@ const LOG_TARGET: &str = "c::bn::base_node_service::service";
/// Configuration for the BaseNodeService.
#[derive(Clone, Copy)]
pub struct BaseNodeServiceConfig {
/// The allocated waiting time for a request waiting for service responses from remote base nodes.
pub request_timeout: Duration,
/// The allocated waiting time for a general request waiting for service responses from remote base nodes.
pub service_request_timeout: Duration,
/// The allocated waiting time for a block sync request waiting for service responses from remote base nodes.
pub fetch_blocks_timeout: Duration,
/// The allocated waiting time for a fetch UTXOs request waiting for service responses from remote base nodes.
pub fetch_utxos_timeout: Duration,
/// The fraction of responses that need to be received for a corresponding service request to be finalize.
pub desired_response_fraction: f32,
}

impl Default for BaseNodeServiceConfig {
fn default() -> Self {
Self {
request_timeout: BASE_NODE_SERVICE_REQUEST_TIMEOUT,
desired_response_fraction: BASE_NODE_SERVICE_DESIRED_RESPONSE_FRACTION,
service_request_timeout: Duration::from_secs(180),
fetch_blocks_timeout: Duration::from_secs(30),
fetch_utxos_timeout: Duration::from_secs(600),
desired_response_fraction: 0.6,
}
}
}
Expand Down Expand Up @@ -511,7 +517,7 @@ async fn handle_outbound_request(
let send_result = outbound_message_service
.send_message(
send_msg_params.finish(),
OutboundDomainMessage::new(TariMessageType::BaseNodeRequest, service_request),
OutboundDomainMessage::new(TariMessageType::BaseNodeRequest, service_request.clone()),
)
.await?;

Expand All @@ -530,7 +536,41 @@ async fn handle_outbound_request(
// Wait for matching responses to arrive
waiting_requests.insert(request_key, reply_tx).await;
// Spawn timeout for waiting_request
spawn_request_timeout(timeout_sender, request_key, config.request_timeout);
if let Some(r) = service_request.request.clone() {
match r {
Request::FetchMatchingBlocks(_) |
Request::FetchBlocksWithHashes(_) |
Request::FetchBlocksWithKernels(_) |
Request::FetchBlocksWithStxos(_) |
Request::FetchBlocksWithUtxos(_) => {
trace!(
target: LOG_TARGET,
"Timeout for service request ({}) at {:?}",
request_key,
config.fetch_blocks_timeout
);
spawn_request_timeout(timeout_sender, request_key, config.fetch_blocks_timeout)
},
Request::FetchMatchingUtxos(_) => {
trace!(
target: LOG_TARGET,
"Timeout for service request ({}) at {:?}",
request_key,
config.fetch_utxos_timeout
);
spawn_request_timeout(timeout_sender, request_key, config.fetch_utxos_timeout)
},
_ => {
trace!(
target: LOG_TARGET,
"Timeout for service request ({}) at {:?}",
request_key,
config.service_request_timeout
);
spawn_request_timeout(timeout_sender, request_key, config.service_request_timeout)
},
};
};
// Log messages
let msg_tag = send_states[0].tag;
debug!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::{
};
use futures::{future, Future};
use log::*;
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use tari_comms::{connectivity::ConnectivityRequester, PeerManager};
use tari_service_framework::{ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
use tokio::sync::{broadcast, watch};
Expand All @@ -51,6 +51,7 @@ pub struct BaseNodeStateMachineInitializer<B> {
factories: CryptoFactories,
sync_strategy: BlockSyncStrategy,
orphan_db_clean_out_threshold: usize,
fetch_blocks_timeout: Duration,
}

impl<B> BaseNodeStateMachineInitializer<B>
Expand All @@ -62,6 +63,7 @@ where B: BlockchainBackend + 'static
factories: CryptoFactories,
sync_strategy: BlockSyncStrategy,
orphan_db_clean_out_threshold: usize,
fetch_blocks_timeout: Duration,
) -> Self
{
Self {
Expand All @@ -70,6 +72,7 @@ where B: BlockchainBackend + 'static
factories,
sync_strategy,
orphan_db_clean_out_threshold,
fetch_blocks_timeout,
}
}
}
Expand All @@ -96,6 +99,7 @@ where B: BlockchainBackend + 'static
let rules = self.rules.clone();
let db = self.db.clone();
let orphan_db_clean_out_threshold = self.orphan_db_clean_out_threshold;
let fetch_blocks_timeout = self.fetch_blocks_timeout;
context.spawn_when_ready(move |handles| async move {
let outbound_interface = handles.expect_handle::<OutboundNodeCommsInterface>();
let chain_metadata_service = handles.expect_handle::<ChainMetadataHandle>();
Expand All @@ -106,6 +110,7 @@ where B: BlockchainBackend + 'static
let mut state_machine_config = BaseNodeStateMachineConfig {
block_sync_config: BlockSyncConfig {
orphan_db_clean_out_threshold,
fetch_blocks_timeout,
..Default::default()
},
..Default::default()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,14 @@ use std::{
fmt::{Display, Formatter},
str::FromStr,
sync::Arc,
time::Duration,
};
use tari_comms::{connectivity::ConnectivityError, peer_manager::PeerManagerError};
use tari_crypto::tari_utilities::{hex::Hex, Hashable};
use thiserror::Error;

const LOG_TARGET: &str = "c::bn::state_machine_service::states::block_sync";

// The maximum number of retry attempts a node can perform to request a particular block from remote nodes.
const MAX_METADATA_REQUEST_RETRY_ATTEMPTS: usize = 3;
const MAX_HEADER_REQUEST_RETRY_ATTEMPTS: usize = 5;
const MAX_BLOCK_REQUEST_RETRY_ATTEMPTS: usize = 5;
// The maximum number of retry attempts for attempting to validly request and add the block at a specific block height
// to the chain.
const MAX_ADD_BLOCK_RETRY_ATTEMPTS: usize = 3;
// The number of headers that can be requested in a single query.
const HEADER_REQUEST_SIZE: usize = 100;
// The number of blocks that can be requested in a single query.
const BLOCK_REQUEST_SIZE: usize = 5;

/// Configuration for the Block Synchronization.
#[derive(Clone, Copy)]
pub struct BlockSyncConfig {
Expand All @@ -75,6 +64,7 @@ pub struct BlockSyncConfig {
pub header_request_size: usize,
pub block_request_size: usize,
pub orphan_db_clean_out_threshold: usize,
pub fetch_blocks_timeout: Duration,
}

#[derive(Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -110,13 +100,14 @@ impl Default for BlockSyncConfig {
fn default() -> Self {
Self {
sync_strategy: BlockSyncStrategy::ViaBestChainMetadata(BestChainMetadataBlockSync),
max_metadata_request_retry_attempts: MAX_METADATA_REQUEST_RETRY_ATTEMPTS,
max_header_request_retry_attempts: MAX_HEADER_REQUEST_RETRY_ATTEMPTS,
max_block_request_retry_attempts: MAX_BLOCK_REQUEST_RETRY_ATTEMPTS,
max_add_block_retry_attempts: MAX_ADD_BLOCK_RETRY_ATTEMPTS,
header_request_size: HEADER_REQUEST_SIZE,
block_request_size: BLOCK_REQUEST_SIZE,
max_metadata_request_retry_attempts: 3,
max_header_request_retry_attempts: 5,
max_block_request_retry_attempts: 5,
max_add_block_retry_attempts: 3,
header_request_size: 100,
block_request_size: 5,
orphan_db_clean_out_threshold: 0,
fetch_blocks_timeout: Duration::from_secs(30),
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions base_layer/core/tests/node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ use tari_comms::protocol::messaging::MessagingEvent;
use tari_core::{
base_node::{
comms_interface::{BlockEvent, Broadcast, CommsInterfaceError},
consts::BASE_NODE_SERVICE_DESIRED_RESPONSE_FRACTION,
service::BaseNodeServiceConfig,
state_machine_service::states::{ListeningInfo, StateInfo, StatusInfo},
},
Expand Down Expand Up @@ -703,8 +702,10 @@ fn service_request_timeout() {
let network = Network::LocalNet;
let consensus_manager = ConsensusManagerBuilder::new(network).build();
let base_node_service_config = BaseNodeServiceConfig {
request_timeout: Duration::from_millis(1),
desired_response_fraction: BASE_NODE_SERVICE_DESIRED_RESPONSE_FRACTION,
service_request_timeout: Duration::from_millis(1),
fetch_blocks_timeout: Default::default(),
fetch_utxos_timeout: Default::default(),
desired_response_fraction: Default::default(),
};
let temp_dir = tempdir().unwrap();
let (mut alice_node, bob_node, _consensus_manager) = create_network_with_2_base_nodes_with_config(
Expand Down
15 changes: 9 additions & 6 deletions common/config/presets/windows.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@
# - Rate limit for the base node wallet (min value = 5, default value = 20).
#buffer_rate_limit_base_node_wallet = 20

# The timeout (s) for requesting blocks from a peer during blockchain sync (min value = 10 s, default value = 30 s).
#fetch_blocks_timeout = 30

# The timeout (s) for requesting UTXOs from a base node (min value = 10 s, default value = 600 s).
#fetch_utxos_timeout = 600

# The timeout (s) for requesting other base node services (min value = 10 s, default value = 180 s).
#service_request_timeout = 180

########################################################################################################################
# #
# Wallet Configuration Options #
Expand Down Expand Up @@ -337,9 +346,6 @@ peer_seeds = []
########################################################################################################################
[mempool.testnet]

# The maximum period the mempool will wait for responses to requests made to base nodes [default: 60 seconds].
# request_timeout = 60

# The maximum number of transactions that can be stored in the Unconfirmed Transaction pool. This is the main waiting
# area in the mempool and almost all transactions will end up in this pool before being mined. It's for this reason
# that this parameter will have the greatest impact on actual memory usage by your mempool. If you are not mining,
Expand Down Expand Up @@ -386,9 +392,6 @@ peer_seeds = []

[mempool.mainnet]

# The maximum period the mempool will wait for responses to requests made to base nodes [default: 60 seconds].
# request_timeout = 60

# The maximum number of transactions that can be stored in the Unconfirmed Transaction pool. This is the main waiting
# area in the mempool and almost all transactions will end up in this pool before being mined. It's for this reason
# that this parameter will have the greatest impact on actual memory usage by your mempool. If you are not mining,
Expand Down
24 changes: 24 additions & 0 deletions common/src/configuration/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ pub struct GlobalConfig {
pub buffer_size_base_node_wallet: usize,
pub buffer_rate_limit_base_node: usize,
pub buffer_rate_limit_base_node_wallet: usize,
pub fetch_blocks_timeout: Duration,
pub fetch_utxos_timeout: Duration,
pub service_request_timeout: Duration,
pub base_node_query_timeout: Duration,
pub transaction_broadcast_monitoring_timeout: Duration,
pub transaction_chain_monitoring_timeout: Duration,
Expand Down Expand Up @@ -413,6 +416,24 @@ fn convert_node_config(network: Network, cfg: Config) -> Result<GlobalConfig, Co
cfg.get_int(&key)
.map_err(|e| ConfigurationError::new(&key, &e.to_string()))? as usize;

let key = "common.fetch_blocks_timeout";
let fetch_blocks_timeout = Duration::from_secs(
cfg.get_int(&key)
.map_err(|e| ConfigurationError::new(&key, &e.to_string()))? as u64,
);

let key = "common.fetch_utxos_timeout";
let fetch_utxos_timeout = Duration::from_secs(
cfg.get_int(&key)
.map_err(|e| ConfigurationError::new(&key, &e.to_string()))? as u64,
);

let key = "common.service_request_timeout";
let service_request_timeout = Duration::from_secs(
cfg.get_int(&key)
.map_err(|e| ConfigurationError::new(&key, &e.to_string()))? as u64,
);

let key = config_string("merge_mining_proxy", &net_str, "monerod_url");
let monerod_url = cfg
.get_str(&key)
Expand Down Expand Up @@ -476,6 +497,9 @@ fn convert_node_config(network: Network, cfg: Config) -> Result<GlobalConfig, Co
buffer_size_base_node_wallet,
buffer_rate_limit_base_node,
buffer_rate_limit_base_node_wallet,
fetch_blocks_timeout,
fetch_utxos_timeout,
service_request_timeout,
base_node_query_timeout,
transaction_broadcast_monitoring_timeout,
transaction_chain_monitoring_timeout,
Expand Down
3 changes: 3 additions & 0 deletions common/src/configuration/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ pub fn default_config(bootstrap: &ConfigBootstrap) -> Config {
cfg.set_default("common.buffer_rate_limit_base_node", 1_000).unwrap();
cfg.set_default("common.buffer_rate_limit_base_node_wallet", 1_000)
.unwrap();
cfg.set_default("common.fetch_blocks_timeout", 30).unwrap();
cfg.set_default("common.fetch_utxos_timeout", 600).unwrap();
cfg.set_default("common.service_request_timeout", 180).unwrap();

// Wallet settings
cfg.set_default("wallet.grpc_enabled", false).unwrap();
Expand Down

0 comments on commit 1eabd46

Please sign in to comment.