diff --git a/applications/tari_base_node/src/bootstrap.rs b/applications/tari_base_node/src/bootstrap.rs index d0a9bb0395..e86fbe0d7d 100644 --- a/applications/tari_base_node/src/bootstrap.rs +++ b/applications/tari_base_node/src/bootstrap.rs @@ -265,7 +265,8 @@ where B: BlockchainBackend + 'static auxilary_tcp_listener_address: self.config.auxilary_tcp_listener_address.clone(), datastore_path: self.config.peer_db_path.clone(), peer_database_name: "peers".to_string(), - max_concurrent_inbound_tasks: 100, + max_concurrent_inbound_tasks: 50, + max_concurrent_outbound_tasks: 100, outbound_buffer_size: 100, dht: DhtConfig { database_url: DbConnectionUrl::File(self.config.data_dir.join("dht.db")), diff --git a/applications/tari_base_node/src/command_handler.rs b/applications/tari_base_node/src/command_handler.rs index e6d8fd76f0..f1ac3905a0 100644 --- a/applications/tari_base_node/src/command_handler.rs +++ b/applications/tari_base_node/src/command_handler.rs @@ -183,7 +183,7 @@ impl CommandHandler { status_line.add_field( "Rpc", format!( - "{}/{} sessions", + "{}/{}", num_active_rpc_sessions, config .rpc_max_simultaneous_sessions diff --git a/applications/tari_console_wallet/src/init/mod.rs b/applications/tari_console_wallet/src/init/mod.rs index 19f7fe7e79..96644b0f04 100644 --- a/applications/tari_console_wallet/src/init/mod.rs +++ b/applications/tari_console_wallet/src/init/mod.rs @@ -374,8 +374,9 @@ pub async fn init_wallet( auxilary_tcp_listener_address: None, datastore_path: config.console_wallet_peer_db_path.clone(), peer_database_name: "peers".to_string(), - max_concurrent_inbound_tasks: 100, - outbound_buffer_size: 100, + max_concurrent_inbound_tasks: 10, + max_concurrent_outbound_tasks: 10, + outbound_buffer_size: 10, dht: DhtConfig { database_url: DbConnectionUrl::File(config.data_dir.join("dht-console-wallet.db")), auto_join: true, diff --git a/applications/tari_validator_node/src/comms.rs b/applications/tari_validator_node/src/comms.rs index 45f748d57d..4211d5a05a 100644 --- a/applications/tari_validator_node/src/comms.rs +++ b/applications/tari_validator_node/src/comms.rs @@ -133,7 +133,8 @@ fn create_comms_config(config: &GlobalConfig, node_identity: Arc) transport_type: create_transport_type(config), datastore_path: config.peer_db_path.clone(), peer_database_name: "peers".to_string(), - max_concurrent_inbound_tasks: 100, + max_concurrent_inbound_tasks: 50, + max_concurrent_outbound_tasks: 100, outbound_buffer_size: 100, dht: DhtConfig { database_url: DbConnectionUrl::File(config.data_dir.join("dht.db")), diff --git a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs index 0e14200a9d..c800e6385a 100644 --- a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs +++ b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs @@ -479,7 +479,7 @@ where B: BlockchainBackend + 'static }) }, NodeCommsRequest::FetchMempoolTransactionsByExcessSigs { excess_sigs } => { - let (transactions, not_found) = self.mempool.retrieve_by_excess_sigs(&excess_sigs).await; + let (transactions, not_found) = self.mempool.retrieve_by_excess_sigs(excess_sigs).await?; Ok(NodeCommsResponse::FetchMempoolTransactionsByExcessSigsResponse( FetchMempoolTransactionsResponse { transactions, @@ -553,7 +553,7 @@ where B: BlockchainBackend + 'static kernel_excess_sigs: excess_sigs, } = new_block; - let (known_transactions, missing_excess_sigs) = self.mempool.retrieve_by_excess_sigs(&excess_sigs).await; + let (known_transactions, missing_excess_sigs) = self.mempool.retrieve_by_excess_sigs(excess_sigs).await?; let known_transactions = known_transactions.into_iter().map(|tx| (*tx).clone()).collect(); metrics::compact_block_tx_misses(header.height).set(missing_excess_sigs.len() as i64); @@ -587,7 +587,7 @@ where B: BlockchainBackend + 'static // Add returned transactions to unconfirmed pool if !transactions.is_empty() { - self.mempool.insert_all(&transactions).await?; + self.mempool.insert_all(transactions.clone()).await?; } if !not_found.is_empty() { @@ -708,8 +708,6 @@ where B: BlockchainBackend + 'static BlockAddResult::ChainReorg { .. } => true, }; - self.blockchain_db.cleanup_orphans().await?; - self.update_block_result_metrics(&block_add_result); self.publish_block_event(BlockEvent::ValidBlockAdded(block.clone(), block_add_result)); diff --git a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs index 80937f4d9b..50800cd876 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/horizon_state_sync/horizon_state_synchronization.rs @@ -49,7 +49,14 @@ use crate::{ sync::{rpc, SyncPeer}, }, blocks::{BlockHeader, ChainHeader, UpdateBlockAccumulatedData}, - chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError, MmrTree, PrunedOutput}, + chain_storage::{ + async_db::AsyncBlockchainDb, + BlockchainBackend, + ChainStorageError, + DbTransaction, + MmrTree, + PrunedOutput, + }, proto::base_node::{ sync_utxo as proto_sync_utxo, sync_utxos_response::UtxoOrDeleted, @@ -697,84 +704,89 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> { let mut prev_mmr = 0; let mut prev_kernel_mmr = 0; + let height = header.height(); let bitmap = self.take_final_bitmap(); - let mut txn = self.db().write_transaction(); - let mut utxo_mmr_position = 0; - let mut prune_positions = vec![]; - - for h in 0..=header.height() { - let curr_header = self.db().fetch_chain_header(h).await?; - - trace!( - target: LOG_TARGET, - "Fetching utxos from db: height:{}, header.output_mmr:{}, prev_mmr:{}, end:{}", - curr_header.height(), - curr_header.header().output_mmr_size, - prev_mmr, - curr_header.header().output_mmr_size - 1 - ); - let (utxos, _) = self.db().fetch_utxos_in_block(curr_header.hash().clone(), None).await?; - trace!( - target: LOG_TARGET, - "Fetching kernels from db: height:{}, header.kernel_mmr:{}, prev_mmr:{}, end:{}", - curr_header.height(), - curr_header.header().kernel_mmr_size, - prev_kernel_mmr, - curr_header.header().kernel_mmr_size - 1 - ); - - trace!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len()); - let mut prune_counter = 0; - for u in utxos { - match u { - PrunedOutput::NotPruned { output } => { - if bitmap.contains(utxo_mmr_position) { - debug!( - target: LOG_TARGET, - "Found output that needs pruning at height: {} position: {}", h, utxo_mmr_position - ); - prune_positions.push(utxo_mmr_position); - prune_counter += 1; - } else { - pruned_utxo_sum = &output.commitment + &pruned_utxo_sum; - } - }, - _ => { - prune_counter += 1; - }, - } - utxo_mmr_position += 1; - } - if prune_counter > 0 { - trace!(target: LOG_TARGET, "Pruned {} outputs", prune_counter); - } - prev_mmr = curr_header.header().output_mmr_size; + let db = self.db().inner().clone(); + task::spawn_blocking(move || { + let mut txn = DbTransaction::new(); + let mut utxo_mmr_position = 0; + let mut prune_positions = vec![]; - let kernels = self.db().fetch_kernels_in_block(curr_header.hash().clone()).await?; - trace!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len()); - for k in kernels { - pruned_kernel_sum = &k.excess + &pruned_kernel_sum; - } - prev_kernel_mmr = curr_header.header().kernel_mmr_size; + for h in 0..=height { + let curr_header = db.fetch_chain_header(h)?; - if h % 1000 == 0 { - debug!( + trace!( + target: LOG_TARGET, + "Fetching utxos from db: height:{}, header.output_mmr:{}, prev_mmr:{}, end:{}", + curr_header.height(), + curr_header.header().output_mmr_size, + prev_mmr, + curr_header.header().output_mmr_size - 1 + ); + let (utxos, _) = db.fetch_utxos_in_block(curr_header.hash().clone(), None)?; + trace!( target: LOG_TARGET, - "Final Validation: {:.2}% complete. Height: {}, mmr_position: {} ", - (h as f32 / header.height() as f32) * 100.0, - h, - utxo_mmr_position, + "Fetching kernels from db: height:{}, header.kernel_mmr:{}, prev_mmr:{}, end:{}", + curr_header.height(), + curr_header.header().kernel_mmr_size, + prev_kernel_mmr, + curr_header.header().kernel_mmr_size - 1 ); + + trace!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len()); + let mut prune_counter = 0; + for u in utxos { + match u { + PrunedOutput::NotPruned { output } => { + if bitmap.contains(utxo_mmr_position) { + debug!( + target: LOG_TARGET, + "Found output that needs pruning at height: {} position: {}", h, utxo_mmr_position + ); + prune_positions.push(utxo_mmr_position); + prune_counter += 1; + } else { + pruned_utxo_sum = &output.commitment + &pruned_utxo_sum; + } + }, + _ => { + prune_counter += 1; + }, + } + utxo_mmr_position += 1; + } + if prune_counter > 0 { + trace!(target: LOG_TARGET, "Pruned {} outputs", prune_counter); + } + prev_mmr = curr_header.header().output_mmr_size; + + let kernels = db.fetch_kernels_in_block(curr_header.hash().clone())?; + trace!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len()); + for k in kernels { + pruned_kernel_sum = &k.excess + &pruned_kernel_sum; + } + prev_kernel_mmr = curr_header.header().kernel_mmr_size; + + if h % 1000 == 0 { + debug!( + target: LOG_TARGET, + "Final Validation: {:.2}% complete. Height: {}, mmr_position: {} ", + (h as f32 / height as f32) * 100.0, + h, + utxo_mmr_position, + ); + } } - } - if !prune_positions.is_empty() { - debug!(target: LOG_TARGET, "Pruning {} spent outputs", prune_positions.len()); - txn.prune_output_at_positions(prune_positions); - txn.commit().await?; - } + if !prune_positions.is_empty() { + debug!(target: LOG_TARGET, "Pruning {} spent outputs", prune_positions.len()); + txn.prune_outputs_at_positions(prune_positions); + db.write(txn)?; + } - Ok((pruned_utxo_sum, pruned_kernel_sum)) + Ok((pruned_utxo_sum, pruned_kernel_sum)) + }) + .await? } #[inline] diff --git a/base_layer/core/src/blocks/block_header.rs b/base_layer/core/src/blocks/block_header.rs index 815a36f547..1a15a8666a 100644 --- a/base_layer/core/src/blocks/block_header.rs +++ b/base_layer/core/src/blocks/block_header.rs @@ -273,7 +273,7 @@ impl Display for BlockHeader { )?; writeln!( fmt, - "Merkle roots:\nInputs: {},\n Outputs: {} ({})\nWitness: {}\nKernels: {} ({})\n", + "Merkle roots:\nInputs: {},\nOutputs: {} ({})\nWitness: {}\nKernels: {} ({})\n", self.input_mr.to_hex(), self.output_mr.to_hex(), self.output_mmr_size, diff --git a/base_layer/core/src/chain_storage/async_db.rs b/base_layer/core/src/chain_storage/async_db.rs index 6fc32f5955..621ef100b6 100644 --- a/base_layer/core/src/chain_storage/async_db.rs +++ b/base_layer/core/src/chain_storage/async_db.rs @@ -387,7 +387,7 @@ impl<'a, B: BlockchainBackend + 'static> AsyncDbTransaction<'a, B> { self } - pub fn prune_output_at_positions(&mut self, positions: Vec) -> &mut Self { + pub fn prune_outputs_at_positions(&mut self, positions: Vec) -> &mut Self { self.transaction.prune_outputs_at_positions(positions); self } diff --git a/base_layer/core/src/chain_storage/blockchain_database.rs b/base_layer/core/src/chain_storage/blockchain_database.rs index d5f68a7df5..15bbafbcd2 100644 --- a/base_layer/core/src/chain_storage/blockchain_database.rs +++ b/base_layer/core/src/chain_storage/blockchain_database.rs @@ -888,13 +888,20 @@ where B: BlockchainBackend ); return Err(e.into()); } - trace!( target: LOG_TARGET, - "[add_block] acquired write access db lock for block #{} ", - &new_height + "[add_block] waiting for write access to add block block #{}", + new_height ); + let timer = Instant::now(); let mut db = self.db_write_access()?; + + trace!( + target: LOG_TARGET, + "[add_block] acquired write access db lock for block #{} in {:.2?}", + new_height, + timer.elapsed() + ); let block_add_result = add_block( &mut *db, &self.config, @@ -915,6 +922,10 @@ where B: BlockchainBackend prune_database_if_needed(&mut *db, self.config.pruning_horizon, self.config.pruning_interval)?; } + if let Err(e) = cleanup_orphans(&mut *db, self.config.orphan_storage_capacity) { + warn!(target: LOG_TARGET, "Failed to clean up orphans: {}", e); + } + debug!( target: LOG_TARGET, "Candidate block `add_block` result: {}", block_add_result diff --git a/base_layer/core/src/mempool/error.rs b/base_layer/core/src/mempool/error.rs index 953b20b8af..f0d09db1d7 100644 --- a/base_layer/core/src/mempool/error.rs +++ b/base_layer/core/src/mempool/error.rs @@ -22,6 +22,7 @@ use tari_service_framework::reply_channel::TransportChannelError; use thiserror::Error; +use tokio::task::JoinError; use crate::{mempool::unconfirmed_pool::UnconfirmedPoolError, transactions::transaction::TransactionError}; @@ -35,4 +36,8 @@ pub enum MempoolError { TransportChannelError(#[from] TransportChannelError), #[error("The transaction did not contain any kernels")] TransactionNoKernels, + #[error("Mempool lock poisoned. This indicates that the mempool has panicked while holding a RwLockGuard.")] + RwLockPoisonError, + #[error(transparent)] + BlockingTaskError(#[from] JoinError), } diff --git a/base_layer/core/src/mempool/mempool.rs b/base_layer/core/src/mempool/mempool.rs index 25d597789e..079aa8e21a 100644 --- a/base_layer/core/src/mempool/mempool.rs +++ b/base_layer/core/src/mempool/mempool.rs @@ -20,10 +20,10 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use tari_common_types::types::{PrivateKey, Signature}; -use tokio::sync::RwLock; +use tokio::task; use crate::{ blocks::Block, @@ -62,22 +62,25 @@ impl Mempool { /// Insert an unconfirmed transaction into the Mempool. pub async fn insert(&self, tx: Arc) -> Result { - self.pool_storage.write().await.insert(tx) + self.do_write_task(|storage| storage.insert(tx)).await } /// Inserts all transactions into the mempool. - pub async fn insert_all(&self, transactions: &[Arc]) -> Result<(), MempoolError> { - let mut mempool = self.pool_storage.write().await; - for tx in transactions { - mempool.insert(tx.clone())?; - } - - Ok(()) + pub async fn insert_all(&self, transactions: Vec>) -> Result<(), MempoolError> { + self.do_write_task(|storage| { + for tx in transactions { + storage.insert(tx)?; + } + + Ok(()) + }) + .await } /// Update the Mempool based on the received published block. - pub async fn process_published_block(&self, published_block: &Block) -> Result<(), MempoolError> { - self.pool_storage.write().await.process_published_block(published_block) + pub async fn process_published_block(&self, published_block: Arc) -> Result<(), MempoolError> { + self.do_write_task(move |storage| storage.process_published_block(&published_block)) + .await } /// In the event of a ReOrg, resubmit all ReOrged transactions into the Mempool and process each newly introduced @@ -87,48 +90,74 @@ impl Mempool { removed_blocks: Vec>, new_blocks: Vec>, ) -> Result<(), MempoolError> { - self.pool_storage - .write() + self.do_write_task(move |storage| storage.process_reorg(&removed_blocks, &new_blocks)) .await - .process_reorg(&removed_blocks, &new_blocks) } /// Returns all unconfirmed transaction stored in the Mempool, except the transactions stored in the ReOrgPool. - // TODO: Investigate returning an iterator rather than a large vector of transactions - pub async fn snapshot(&self) -> Vec> { - self.pool_storage.read().await.snapshot() + pub async fn snapshot(&self) -> Result>, MempoolError> { + self.do_read_task(|storage| Ok(storage.snapshot())).await } /// Returns a list of transaction ranked by transaction priority up to a given weight. /// Only transactions that fit into a block will be returned pub async fn retrieve(&self, total_weight: u64) -> Result>, MempoolError> { - self.pool_storage.write().await.retrieve(total_weight) + self.do_write_task(move |storage| storage.retrieve_and_revalidate(total_weight)) + .await } pub async fn retrieve_by_excess_sigs( &self, - excess_sigs: &[PrivateKey], - ) -> (Vec>, Vec) { - self.pool_storage.read().await.retrieve_by_excess_sigs(excess_sigs) + excess_sigs: Vec, + ) -> Result<(Vec>, Vec), MempoolError> { + self.do_read_task(move |storage| Ok(storage.retrieve_by_excess_sigs(&excess_sigs))) + .await } /// Check if the specified excess signature is found in the Mempool. - pub async fn has_tx_with_excess_sig(&self, excess_sig: &Signature) -> TxStorageResponse { - self.pool_storage.read().await.has_tx_with_excess_sig(excess_sig) + pub async fn has_tx_with_excess_sig(&self, excess_sig: Signature) -> Result { + self.do_read_task(move |storage| Ok(storage.has_tx_with_excess_sig(&excess_sig))) + .await } /// Check if the specified transaction is stored in the Mempool. - pub async fn has_transaction(&self, tx: &Transaction) -> Result { - self.pool_storage.read().await.has_transaction(tx) + pub async fn has_transaction(&self, tx: Arc) -> Result { + self.do_read_task(move |storage| storage.has_transaction(&tx)).await } /// Gathers and returns the stats of the Mempool. - pub async fn stats(&self) -> StatsResponse { - self.pool_storage.read().await.stats() + pub async fn stats(&self) -> Result { + self.do_read_task(|storage| Ok(storage.stats())).await } /// Gathers and returns a breakdown of all the transaction in the Mempool. - pub async fn state(&self) -> StateResponse { - self.pool_storage.read().await.state() + pub async fn state(&self) -> Result { + self.do_read_task(|storage| Ok(storage.state())).await + } + + async fn do_read_task(&self, callback: F) -> Result + where + F: FnOnce(&MempoolStorage) -> Result + Send + 'static, + T: Send + 'static, + { + let storage = self.pool_storage.clone(); + task::spawn_blocking(move || { + let lock = storage.read().map_err(|_| MempoolError::RwLockPoisonError)?; + callback(&*lock) + }) + .await? + } + + async fn do_write_task(&self, callback: F) -> Result + where + F: FnOnce(&mut MempoolStorage) -> Result + Send + 'static, + T: Send + 'static, + { + let storage = self.pool_storage.clone(); + task::spawn_blocking(move || { + let mut lock = storage.write().map_err(|_| MempoolError::RwLockPoisonError)?; + callback(&mut *lock) + }) + .await? } } diff --git a/base_layer/core/src/mempool/mempool_storage.rs b/base_layer/core/src/mempool/mempool_storage.rs index 9ad4cd8170..dc6a6a2c17 100644 --- a/base_layer/core/src/mempool/mempool_storage.rs +++ b/base_layer/core/src/mempool/mempool_storage.rs @@ -72,17 +72,19 @@ impl MempoolStorage { /// Insert an unconfirmed transaction into the Mempool. The transaction *MUST* have passed through the validation /// pipeline already and will thus always be internally consistent by this stage pub fn insert(&mut self, tx: Arc) -> Result { - debug!( - target: LOG_TARGET, - "Inserting tx into mempool: {}", - tx.body - .kernels() - .first() - .map(|k| k.excess_sig.get_signature().to_hex()) - .unwrap_or_else(|| "None?!".into()) - ); + let tx_id = tx + .body + .kernels() + .first() + .map(|k| k.excess_sig.get_signature().to_hex()) + .unwrap_or_else(|| "None?!".into()); + debug!(target: LOG_TARGET, "Inserting tx into mempool: {}", tx_id); match self.validator.validate(&tx) { Ok(()) => { + debug!( + target: LOG_TARGET, + "Transaction {} is VALID, inserting in unconfirmed pool", tx_id + ); let weight = self.get_transaction_weighting(0); self.unconfirmed_pool.insert(tx, None, &weight)?; Ok(TxStorageResponse::UnconfirmedPool) @@ -209,7 +211,7 @@ impl MempoolStorage { /// Returns a list of transaction ranked by transaction priority up to a given weight. /// Will only return transactions that will fit into the given weight - pub fn retrieve(&mut self, total_weight: u64) -> Result>, MempoolError> { + pub fn retrieve_and_revalidate(&mut self, total_weight: u64) -> Result>, MempoolError> { let results = self.unconfirmed_pool.fetch_highest_priority_txs(total_weight)?; self.insert_txs(results.transactions_to_insert)?; Ok(results.retrieved_transactions) diff --git a/base_layer/core/src/mempool/service/inbound_handlers.rs b/base_layer/core/src/mempool/service/inbound_handlers.rs index 054f6a5d9d..eeecc00ebc 100644 --- a/base_layer/core/src/mempool/service/inbound_handlers.rs +++ b/base_layer/core/src/mempool/service/inbound_handlers.rs @@ -59,10 +59,10 @@ impl MempoolInboundHandlers { debug!(target: LOG_TARGET, "Handling remote request: {}", request); use MempoolRequest::*; match request { - GetStats => Ok(MempoolResponse::Stats(self.mempool.stats().await)), - GetState => Ok(MempoolResponse::State(self.mempool.state().await)), + GetStats => Ok(MempoolResponse::Stats(self.mempool.stats().await?)), + GetState => Ok(MempoolResponse::State(self.mempool.state().await?)), GetTxStateByExcessSig(excess_sig) => Ok(MempoolResponse::TxStorage( - self.mempool.has_tx_with_excess_sig(&excess_sig).await, + self.mempool.has_tx_with_excess_sig(excess_sig).await?, )), SubmitTransaction(tx) => { debug!( @@ -102,7 +102,8 @@ impl MempoolInboundHandlers { ) -> Result { trace!(target: LOG_TARGET, "submit_transaction: {}.", tx); - let tx_storage = self.mempool.has_transaction(&tx).await?; + let tx = Arc::new(tx); + let tx_storage = self.mempool.has_transaction(tx.clone()).await?; let kernel_excess_sig = tx .first_kernel_excess_sig() .ok_or(MempoolServiceError::TransactionNoKernels)? @@ -115,7 +116,6 @@ impl MempoolInboundHandlers { ); return Ok(tx_storage); } - let tx = Arc::new(tx); match self.mempool.insert(tx.clone()).await { Ok(tx_storage) => { if tx_storage.is_stored() { @@ -146,9 +146,10 @@ impl MempoolInboundHandlers { } async fn update_pool_size_metrics(&self) { - let stats = self.mempool.stats().await; - metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64); - metrics::reorg_pool_size().set(stats.reorg_txs as i64); + if let Ok(stats) = self.mempool.stats().await { + metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64); + metrics::reorg_pool_size().set(stats.reorg_txs as i64); + } } /// Handle inbound block events from the local base node service. @@ -156,7 +157,7 @@ impl MempoolInboundHandlers { use BlockEvent::*; match block_event { ValidBlockAdded(block, BlockAddResult::Ok(_)) => { - self.mempool.process_published_block(block).await?; + self.mempool.process_published_block(block.clone()).await?; }, ValidBlockAdded(_, BlockAddResult::ChainReorg { added, removed }) => { self.mempool @@ -173,7 +174,7 @@ impl MempoolInboundHandlers { .await?; }, BlockSyncComplete(tip_block) => { - self.mempool.process_published_block(tip_block.block()).await?; + self.mempool.process_published_block(tip_block.to_arc_block()).await?; }, AddBlockFailed(_) => {}, } diff --git a/base_layer/core/src/mempool/sync_protocol/mod.rs b/base_layer/core/src/mempool/sync_protocol/mod.rs index f0f0900492..488fcd28e0 100644 --- a/base_layer/core/src/mempool/sync_protocol/mod.rs +++ b/base_layer/core/src/mempool/sync_protocol/mod.rs @@ -306,7 +306,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin self.peer_node_id.short_str() ); - let transactions = self.mempool.snapshot().await; + let transactions = self.mempool.snapshot().await?; let items = transactions .iter() .take(self.config.initial_sync_max_transactions) @@ -392,7 +392,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin inventory.items.len() ); - let transactions = self.mempool.snapshot().await; + let transactions = self.mempool.snapshot().await?; let mut duplicate_inventory_items = Vec::new(); let (transactions, _) = transactions.into_iter().partition::, _>(|transaction| { @@ -483,7 +483,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin } } - let stats = self.mempool.stats().await; + let stats = self.mempool.stats().await?; metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64); metrics::reorg_pool_size().set(stats.reorg_txs as i64); @@ -509,13 +509,13 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin excess_sig_hex, self.peer_node_id.short_str() ); - - let store_state = self.mempool.has_transaction(&txn).await?; + let txn = Arc::new(txn); + let store_state = self.mempool.has_transaction(txn.clone()).await?; if store_state.is_stored() { return Ok(()); } - let stored_result = self.mempool.insert(Arc::new(txn)).await?; + let stored_result = self.mempool.insert(txn).await?; if stored_result.is_stored() { metrics::inbound_transactions(Some(&self.peer_node_id)).inc(); debug!( diff --git a/base_layer/core/src/mempool/sync_protocol/test.rs b/base_layer/core/src/mempool/sync_protocol/test.rs index e1aea8d650..0e268df30e 100644 --- a/base_layer/core/src/mempool/sync_protocol/test.rs +++ b/base_layer/core/src/mempool/sync_protocol/test.rs @@ -122,10 +122,10 @@ async fn empty_set() { .await .unwrap(); - let transactions = mempool2.snapshot().await; + let transactions = mempool2.snapshot().await.unwrap(); assert_eq!(transactions.len(), 0); - let transactions = mempool1.snapshot().await; + let transactions = mempool1.snapshot().await.unwrap(); assert_eq!(transactions.len(), 0); } @@ -319,7 +319,14 @@ async fn responder_messages() { } async fn get_snapshot(mempool: &Mempool) -> Vec { - mempool.snapshot().await.iter().map(|t| &**t).cloned().collect() + mempool + .snapshot() + .await + .unwrap() + .iter() + .map(|t| &**t) + .cloned() + .collect() } async fn read_message(reader: &mut S) -> T diff --git a/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs b/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs index 72d7bac4e0..3450d20368 100644 --- a/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs +++ b/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs @@ -223,12 +223,14 @@ impl UnconfirmedPool { } } } - // we need to remove all transactions that need to be rechecked. - debug!( - target: LOG_TARGET, - "Removing {} transaction(s) from unconfirmed pool because they need re-evaluation", - transactions_to_remove_and_recheck.len() - ); + if !transactions_to_remove_and_recheck.is_empty() { + // we need to remove all transactions that need to be rechecked. + debug!( + target: LOG_TARGET, + "Removing {} transaction(s) from unconfirmed pool because they need re-evaluation", + transactions_to_remove_and_recheck.len() + ); + } for (tx_key, _) in &transactions_to_remove_and_recheck { self.remove_transaction(*tx_key); } diff --git a/base_layer/core/src/validation/transaction_validators.rs b/base_layer/core/src/validation/transaction_validators.rs index d1a0b0cd9e..1676ced387 100644 --- a/base_layer/core/src/validation/transaction_validators.rs +++ b/base_layer/core/src/validation/transaction_validators.rs @@ -63,8 +63,10 @@ impl TxInternalConsistencyValidator { impl MempoolTransactionValidation for TxInternalConsistencyValidator { fn validate(&self, tx: &Transaction) -> Result<(), ValidationError> { - let db = self.db.db_read_access()?; - let tip = db.fetch_chain_metadata()?; + let tip = { + let db = self.db.db_read_access()?; + db.fetch_chain_metadata() + }?; tx.validate_internal_consistency( self.bypass_range_proof_verification, @@ -181,11 +183,13 @@ impl TxInputAndMaturityValidator { impl MempoolTransactionValidation for TxInputAndMaturityValidator { fn validate(&self, tx: &Transaction) -> Result<(), ValidationError> { let constants = self.db.consensus_constants()?; - let db = self.db.db_read_access()?; - check_inputs_are_utxos(&*db, tx.body())?; - check_outputs(&*db, constants, tx.body())?; + let tip_height = { + let db = self.db.db_read_access()?; + check_inputs_are_utxos(&*db, tx.body())?; + check_outputs(&*db, constants, tx.body())?; + db.fetch_chain_metadata()?.height_of_longest_chain() + }; - let tip_height = db.fetch_chain_metadata()?.height_of_longest_chain(); verify_timelocks(tx, tip_height)?; verify_no_duplicated_inputs_outputs(tx)?; Ok(()) diff --git a/base_layer/core/tests/mempool.rs b/base_layer/core/tests/mempool.rs index fdf3de1496..51d9fddcf5 100644 --- a/base_layer/core/tests/mempool.rs +++ b/base_layer/core/tests/mempool.rs @@ -115,37 +115,50 @@ async fn test_insert_and_process_published_block() { mempool.insert(tx2.clone()).await.unwrap(); mempool.insert(tx3.clone()).await.unwrap(); mempool.insert(tx5.clone()).await.unwrap(); - mempool.process_published_block(blocks[1].block()).await.unwrap(); + mempool.process_published_block(blocks[1].to_arc_block()).await.unwrap(); assert_eq!( mempool - .has_tx_with_excess_sig(&orphan.body.kernels()[0].excess_sig) - .await, + .has_tx_with_excess_sig(orphan.body.kernels()[0].excess_sig.clone()) + .await + .unwrap(), TxStorageResponse::NotStored ); assert_eq!( - mempool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig).await, + mempool + .has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.clone()) + .await + .unwrap(), TxStorageResponse::UnconfirmedPool ); assert_eq!( - mempool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig).await, + mempool + .has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.clone()) + .await + .unwrap(), TxStorageResponse::NotStored ); assert_eq!( - mempool.has_tx_with_excess_sig(&tx5.body.kernels()[0].excess_sig).await, + mempool + .has_tx_with_excess_sig(tx5.body.kernels()[0].excess_sig.clone()) + .await + .unwrap(), TxStorageResponse::NotStored ); assert_eq!( - mempool.has_tx_with_excess_sig(&tx6.body.kernels()[0].excess_sig).await, + mempool + .has_tx_with_excess_sig(tx6.body.kernels()[0].excess_sig.clone()) + .await + .unwrap(), TxStorageResponse::NotStored ); - let snapshot_txs = mempool.snapshot().await; + let snapshot_txs = mempool.snapshot().await.unwrap(); assert_eq!(snapshot_txs.len(), 1); assert!(snapshot_txs.contains(&tx2)); - let stats = mempool.stats().await; + let stats = mempool.stats().await.unwrap(); assert_eq!(stats.total_txs, 1); assert_eq!(stats.unconfirmed_txs, 1); assert_eq!(stats.reorg_txs, 0); @@ -159,35 +172,48 @@ async fn test_insert_and_process_published_block() { // Spend tx2, so it goes in Reorg pool generate_block(&store, &mut blocks, vec![tx2.deref().clone()], &consensus_manager).unwrap(); - mempool.process_published_block(blocks[2].block()).await.unwrap(); + mempool.process_published_block(blocks[2].to_arc_block()).await.unwrap(); assert_eq!( mempool - .has_tx_with_excess_sig(&orphan.body.kernels()[0].excess_sig) - .await, + .has_tx_with_excess_sig(orphan.body.kernels()[0].excess_sig.clone()) + .await + .unwrap(), TxStorageResponse::NotStored ); assert_eq!( - mempool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig).await, + mempool + .has_tx_with_excess_sig(tx2.body.kernels()[0].excess_sig.clone()) + .await + .unwrap(), TxStorageResponse::ReorgPool ); assert_eq!( - mempool.has_tx_with_excess_sig(&tx3.body.kernels()[0].excess_sig).await, + mempool + .has_tx_with_excess_sig(tx3.body.kernels()[0].excess_sig.clone()) + .await + .unwrap(), TxStorageResponse::NotStored ); assert_eq!( - mempool.has_tx_with_excess_sig(&tx5.body.kernels()[0].excess_sig).await, + mempool + .has_tx_with_excess_sig(tx5.body.kernels()[0].excess_sig.clone()) + .await + .unwrap(), TxStorageResponse::NotStored ); assert_eq!( - mempool.has_tx_with_excess_sig(&tx6.body.kernels()[0].excess_sig).await, + mempool + .has_tx_with_excess_sig(tx6.body.kernels()[0].excess_sig.clone()) + .await + .unwrap(), TxStorageResponse::NotStored ); - let snapshot_txs = mempool.snapshot().await; + let snapshot_txs = mempool.snapshot().await.unwrap(); assert_eq!(snapshot_txs.len(), 0); - let stats = mempool.stats().await; + let stats = mempool.stats().await.unwrap(); assert_eq!(stats.total_txs, 1); assert_eq!(stats.unconfirmed_txs, 0); assert_eq!(stats.reorg_txs, 1); @@ -211,7 +237,7 @@ async fn test_time_locked() { to: vec![2 * T, 2 * T, 2 * T, 2 * T], fee: 5*uT, lock: 0, features: OutputFeatures::default() )]; generate_new_block(&mut store, &mut blocks, &mut outputs, txs, &consensus_manager).unwrap(); - mempool.process_published_block(blocks[1].block()).await.unwrap(); + mempool.process_published_block(blocks[1].to_arc_block()).await.unwrap(); // Block height should be 1 let mut tx2 = txn_schema!(from: vec![outputs[1][0].clone()], to: vec![1*T], fee: 20*uT, lock: 0, features: OutputFeatures::default()); tx2.lock_height = 3; @@ -239,7 +265,7 @@ async fn test_time_locked() { // Spend tx3, so that the height of the chain will increase generate_block(&store, &mut blocks, vec![tx3.deref().clone()], &consensus_manager).unwrap(); - mempool.process_published_block(blocks[2].block()).await.unwrap(); + mempool.process_published_block(blocks[2].to_arc_block()).await.unwrap(); // Block height increased, so tx2 should no go in. assert_eq!(mempool.insert(tx2).await.unwrap(), TxStorageResponse::UnconfirmedPool); @@ -262,7 +288,7 @@ async fn test_retrieve() { )]; // "Mine" Block 1 generate_new_block(&mut store, &mut blocks, &mut outputs, txs, &consensus_manager).unwrap(); - mempool.process_published_block(blocks[1].block()).await.unwrap(); + mempool.process_published_block(blocks[1].to_arc_block()).await.unwrap(); // 1-Block, 8 UTXOs, empty mempool let txs = vec![ txn_schema!(from: vec![outputs[1][0].clone()], to: vec![], fee: 30*uT, lock: 0, features: OutputFeatures::default()), @@ -291,7 +317,7 @@ async fn test_retrieve() { assert!(retrieved_txs.contains(&tx[6])); assert!(retrieved_txs.contains(&tx[2])); assert!(retrieved_txs.contains(&tx[3])); - let stats = mempool.stats().await; + let stats = mempool.stats().await.unwrap(); assert_eq!(stats.unconfirmed_txs, 7); // assert_eq!(stats.timelocked_txs, 1); assert_eq!(stats.reorg_txs, 0); @@ -306,9 +332,9 @@ async fn test_retrieve() { // "Mine" block 2 generate_block(&store, &mut blocks, block2_txns, &consensus_manager).unwrap(); outputs.push(utxos); - mempool.process_published_block(blocks[2].block()).await.unwrap(); + mempool.process_published_block(blocks[2].to_arc_block()).await.unwrap(); // 2-blocks, 2 unconfirmed txs in mempool - let stats = mempool.stats().await; + let stats = mempool.stats().await.unwrap(); assert_eq!(stats.unconfirmed_txs, 2); // assert_eq!(stats.timelocked_txs, 0); assert_eq!(stats.reorg_txs, 5); @@ -327,7 +353,7 @@ async fn test_retrieve() { // Top 2 txs are tx[3] (fee/g = 50) and tx2[1] (fee/g = 40). tx2[0] (fee/g = 80) is still not matured. let weight = tx[3].calculate_weight(weighting) + tx2[1].calculate_weight(weighting); let retrieved_txs = mempool.retrieve(weight).await.unwrap(); - let stats = mempool.stats().await; + let stats = mempool.stats().await.unwrap(); assert_eq!(stats.unconfirmed_txs, 3); // assert_eq!(stats.timelocked_txs, 1); @@ -354,7 +380,7 @@ async fn test_zero_conf() { )]; // "Mine" Block 1 generate_new_block(&mut store, &mut blocks, &mut outputs, txs, &consensus_manager).unwrap(); - mempool.process_published_block(blocks[1].block()).await.unwrap(); + mempool.process_published_block(blocks[1].to_arc_block()).await.unwrap(); // This transaction graph will be created, containing 3 levels of zero-conf transactions, inheriting dependent // outputs from multiple parents @@ -551,7 +577,10 @@ async fn test_zero_conf() { ); // Try to retrieve all transactions in the mempool (a couple of our transactions should be missing from retrieved) - let retrieved_txs = mempool.retrieve(mempool.stats().await.total_weight).await.unwrap(); + let retrieved_txs = mempool + .retrieve(mempool.stats().await.unwrap().total_weight) + .await + .unwrap(); assert_eq!(retrieved_txs.len(), 10); assert!(retrieved_txs.contains(&Arc::new(tx01.clone()))); assert!(!retrieved_txs.contains(&Arc::new(tx02.clone()))); // Missing @@ -600,7 +629,10 @@ async fn test_zero_conf() { ); // Try to retrieve all transactions in the mempool (all transactions should be retrieved) - let retrieved_txs = mempool.retrieve(mempool.stats().await.total_weight).await.unwrap(); + let retrieved_txs = mempool + .retrieve(mempool.stats().await.unwrap().total_weight) + .await + .unwrap(); assert_eq!(retrieved_txs.len(), 16); assert!(retrieved_txs.contains(&Arc::new(tx01.clone()))); assert!(retrieved_txs.contains(&Arc::new(tx02.clone()))); @@ -621,7 +653,7 @@ async fn test_zero_conf() { // Verify that a higher priority transaction is not retrieved due to its zero-conf dependency instead of the lowest // priority transaction - let weight = mempool.stats().await.total_weight - 1; + let weight = mempool.stats().await.unwrap().total_weight - 1; let retrieved_txs = mempool.retrieve(weight).await.unwrap(); assert_eq!(retrieved_txs.len(), 15); assert!(retrieved_txs.contains(&Arc::new(tx01))); @@ -659,7 +691,7 @@ async fn test_reorg() { txn_schema!(from: vec![outputs[0][0].clone()], to: vec![1 * T, 1 * T], fee: 25*uT, lock: 0, features: OutputFeatures::default()), ]; generate_new_block(&mut db, &mut blocks, &mut outputs, txs, &consensus_manager).unwrap(); - mempool.process_published_block(blocks[1].block()).await.unwrap(); + mempool.process_published_block(blocks[1].to_arc_block()).await.unwrap(); // "Mine" block 2 let schemas = vec![ @@ -672,11 +704,11 @@ async fn test_reorg() { for tx in &txns2 { mempool.insert(tx.clone()).await.unwrap(); } - let stats = mempool.stats().await; + let stats = mempool.stats().await.unwrap(); assert_eq!(stats.unconfirmed_txs, 3); let txns2 = txns2.iter().map(|t| t.deref().clone()).collect(); generate_block(&db, &mut blocks, txns2, &consensus_manager).unwrap(); - mempool.process_published_block(blocks[2].block()).await.unwrap(); + mempool.process_published_block(blocks[2].to_arc_block()).await.unwrap(); // "Mine" block 3 let schemas = vec![ @@ -698,9 +730,9 @@ async fn test_reorg() { &consensus_manager, ) .unwrap(); - mempool.process_published_block(blocks[3].block()).await.unwrap(); + mempool.process_published_block(blocks[3].to_arc_block()).await.unwrap(); - let stats = mempool.stats().await; + let stats = mempool.stats().await.unwrap(); assert_eq!(stats.unconfirmed_txs, 0); // assert_eq!(stats.timelocked_txs, 1); assert_eq!(stats.reorg_txs, 5); @@ -714,7 +746,7 @@ async fn test_reorg() { .process_reorg(vec![blocks[3].to_arc_block()], vec![reorg_block3.into()]) .await .unwrap(); - let stats = mempool.stats().await; + let stats = mempool.stats().await.unwrap(); assert_eq!(stats.unconfirmed_txs, 2); // assert_eq!(stats.timelocked_txs, 1); assert_eq!(stats.reorg_txs, 3); @@ -803,20 +835,32 @@ async fn receive_and_propagate_transaction() { .unwrap(); async_assert_eventually!( - bob_node.mempool.has_tx_with_excess_sig(&tx_excess_sig).await, + bob_node + .mempool + .has_tx_with_excess_sig(tx_excess_sig.clone()) + .await + .unwrap(), expect = TxStorageResponse::NotStored, max_attempts = 20, interval = Duration::from_millis(1000) ); async_assert_eventually!( - carol_node.mempool.has_tx_with_excess_sig(&tx_excess_sig).await, + carol_node + .mempool + .has_tx_with_excess_sig(tx_excess_sig.clone()) + .await + .unwrap(), expect = TxStorageResponse::NotStored, max_attempts = 10, interval = Duration::from_millis(1000) ); // Carol got sent the orphan tx directly, so it will be in her mempool async_assert_eventually!( - carol_node.mempool.has_tx_with_excess_sig(&orphan_excess_sig).await, + carol_node + .mempool + .has_tx_with_excess_sig(orphan_excess_sig.clone()) + .await + .unwrap(), expect = TxStorageResponse::NotStored, max_attempts = 10, interval = Duration::from_millis(1000) @@ -824,7 +868,11 @@ async fn receive_and_propagate_transaction() { // It's difficult to test a negative here, but let's at least make sure that the orphan TX was not propagated // by the time we check it async_assert_eventually!( - bob_node.mempool.has_tx_with_excess_sig(&orphan_excess_sig).await, + bob_node + .mempool + .has_tx_with_excess_sig(orphan_excess_sig.clone()) + .await + .unwrap(), expect = TxStorageResponse::NotStored, ); } @@ -1134,7 +1182,11 @@ async fn block_event_and_reorg_event_handling() { // Add Block1 - tx1 will be moved to the ReorgPool. assert!(bob.local_nci.submit_block(block1.clone(),).await.is_ok()); async_assert_eventually!( - alice.mempool.has_tx_with_excess_sig(&tx1_excess_sig).await, + alice + .mempool + .has_tx_with_excess_sig(tx1_excess_sig.clone()) + .await + .unwrap(), expect = TxStorageResponse::ReorgPool, max_attempts = 20, interval = Duration::from_millis(1000) @@ -1164,27 +1216,46 @@ async fn block_event_and_reorg_event_handling() { assert!(bob.local_nci.submit_block(block2a.clone(),).await.is_ok()); async_assert_eventually!( - bob.mempool.has_tx_with_excess_sig(&tx2a_excess_sig).await, + bob.mempool + .has_tx_with_excess_sig(tx2a_excess_sig.clone()) + .await + .unwrap(), expect = TxStorageResponse::ReorgPool, max_attempts = 20, interval = Duration::from_millis(1000) ); async_assert_eventually!( - alice.mempool.has_tx_with_excess_sig(&tx2a_excess_sig).await, + alice + .mempool + .has_tx_with_excess_sig(tx2a_excess_sig.clone()) + .await + .unwrap(), expect = TxStorageResponse::ReorgPool, max_attempts = 20, interval = Duration::from_millis(1000) ); assert_eq!( - alice.mempool.has_tx_with_excess_sig(&tx3a_excess_sig).await, + alice + .mempool + .has_tx_with_excess_sig(tx3a_excess_sig.clone()) + .await + .unwrap(), TxStorageResponse::ReorgPool ); assert_eq!( - alice.mempool.has_tx_with_excess_sig(&tx2b_excess_sig).await, + alice + .mempool + .has_tx_with_excess_sig(tx2b_excess_sig.clone()) + .await + .unwrap(), TxStorageResponse::ReorgPool ); assert_eq!( - alice.mempool.has_tx_with_excess_sig(&tx3b_excess_sig).await, + alice + .mempool + .has_tx_with_excess_sig(tx3b_excess_sig.clone()) + .await + .unwrap(), TxStorageResponse::ReorgPool ); } diff --git a/base_layer/p2p/src/initialization.rs b/base_layer/p2p/src/initialization.rs index 29e12178a4..565f9a34ad 100644 --- a/base_layer/p2p/src/initialization.rs +++ b/base_layer/p2p/src/initialization.rs @@ -121,6 +121,9 @@ pub struct P2pConfig { pub peer_database_name: String, /// The maximum number of concurrent Inbound tasks allowed before back-pressure is applied to peers pub max_concurrent_inbound_tasks: usize, + /// The maximum number of concurrent outbound tasks allowed before back-pressure is applied to outbound messaging + /// queue + pub max_concurrent_outbound_tasks: usize, /// The size of the buffer (channel) which holds pending outbound message requests pub outbound_buffer_size: usize, /// Configuration for DHT @@ -386,6 +389,7 @@ async fn configure_comms_and_dht( ServiceBuilder::new().layer(dht_outbound_layer).service(sink) }) .max_concurrent_inbound_tasks(config.max_concurrent_inbound_tasks) + .max_concurrent_outbound_tasks(config.max_concurrent_outbound_tasks) .with_inbound_pipeline( ServiceBuilder::new() .layer(dht.inbound_middleware_layer()) diff --git a/base_layer/wallet/tests/wallet.rs b/base_layer/wallet/tests/wallet.rs index 6565bfc990..5adbc50215 100644 --- a/base_layer/wallet/tests/wallet.rs +++ b/base_layer/wallet/tests/wallet.rs @@ -133,7 +133,8 @@ async fn create_wallet( auxilary_tcp_listener_address: None, datastore_path: data_path.to_path_buf(), peer_database_name: random::string(8), - max_concurrent_inbound_tasks: 100, + max_concurrent_inbound_tasks: 10, + max_concurrent_outbound_tasks: 10, outbound_buffer_size: 100, dht: DhtConfig { discovery_request_timeout: Duration::from_secs(1), @@ -708,8 +709,9 @@ async fn test_import_utxo() { auxilary_tcp_listener_address: None, datastore_path: temp_dir.path().to_path_buf(), peer_database_name: random::string(8), - max_concurrent_inbound_tasks: 100, - outbound_buffer_size: 100, + max_concurrent_inbound_tasks: 10, + max_concurrent_outbound_tasks: 10, + outbound_buffer_size: 10, dht: Default::default(), allow_test_addresses: true, listener_liveness_allowlist_cidrs: Vec::new(), diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 7265e7229b..91fc6d8e16 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -3010,8 +3010,9 @@ pub unsafe extern "C" fn comms_config_create( auxilary_tcp_listener_address: None, datastore_path, peer_database_name: database_name_string, - max_concurrent_inbound_tasks: 100, - outbound_buffer_size: 100, + max_concurrent_inbound_tasks: 25, + max_concurrent_outbound_tasks: 50, + outbound_buffer_size: 50, dht: DhtConfig { discovery_request_timeout: Duration::from_secs(discovery_timeout_in_secs), database_url: DbConnectionUrl::File(dht_database_path), diff --git a/comms/src/bounded_executor.rs b/comms/src/bounded_executor.rs index e1aa1a44f6..7558bfd610 100644 --- a/comms/src/bounded_executor.rs +++ b/comms/src/bounded_executor.rs @@ -227,6 +227,22 @@ impl OptionallyBoundedExecutor { Either::Right(exec) => exec.num_available(), } } + + /// Returns the max number tasks that can be performed concurrenly + pub fn max_available(&self) -> Option { + match &self.inner { + Either::Left(_) => None, + Either::Right(exec) => Some(exec.max_available()), + } + } +} + +impl From for OptionallyBoundedExecutor { + fn from(handle: runtime::Handle) -> Self { + Self { + inner: Either::Left(handle), + } + } } #[cfg(test)] diff --git a/comms/src/multiplexing/yamux.rs b/comms/src/multiplexing/yamux.rs index f17b48a9e4..f11da2172a 100644 --- a/comms/src/multiplexing/yamux.rs +++ b/comms/src/multiplexing/yamux.rs @@ -282,14 +282,14 @@ where TSocket: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static biased; _ = self.shutdown_signal.wait() => { - debug!( - target: LOG_TARGET, - "{} Yamux connection shutdown", self.connection - ); let mut control = self.connection.control(); if let Err(err) = control.close().await { error!(target: LOG_TARGET, "Failed to close yamux connection: {}", err); } + debug!( + target: LOG_TARGET, + "{} Yamux connection has closed", self.connection + ); break } @@ -300,7 +300,7 @@ where TSocket: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static if self.sender.send(stream).await.is_err() { debug!( target: LOG_TARGET, - "{} Incoming peer substream task is shutting down because the internal stream sender channel \ + "{} Incoming peer substream task is stopping because the internal stream sender channel \ was closed", self.connection ); @@ -310,7 +310,7 @@ where TSocket: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static Ok(None) =>{ debug!( target: LOG_TARGET, - "{} Incoming peer substream completed. IncomingWorker exiting", + "{} Incoming peer substream ended.", self.connection ); break; @@ -334,8 +334,6 @@ where TSocket: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static } } } - - debug!(target: LOG_TARGET, "Incoming peer substream task is shutting down"); } } diff --git a/comms/src/pipeline/builder.rs b/comms/src/pipeline/builder.rs index 66c23661fc..c4963a5c71 100644 --- a/comms/src/pipeline/builder.rs +++ b/comms/src/pipeline/builder.rs @@ -37,6 +37,7 @@ type OutboundMessageSinkService = SinkService>; #[derive(Default)] pub struct Builder { max_concurrent_inbound_tasks: usize, + max_concurrent_outbound_tasks: Option, outbound_buffer_size: usize, inbound: Option, outbound_rx: Option>, @@ -47,6 +48,7 @@ impl Builder<(), (), ()> { pub fn new() -> Self { Self { max_concurrent_inbound_tasks: DEFAULT_MAX_CONCURRENT_TASKS, + max_concurrent_outbound_tasks: None, outbound_buffer_size: DEFAULT_OUTBOUND_BUFFER_SIZE, inbound: None, outbound_rx: None, @@ -61,6 +63,11 @@ impl Builder { self } + pub fn max_concurrent_outbound_tasks(mut self, max_tasks: usize) -> Self { + self.max_concurrent_outbound_tasks = Some(max_tasks); + self + } + pub fn outbound_buffer_size(mut self, buf_size: usize) -> Self { self.outbound_buffer_size = buf_size; self @@ -77,6 +84,7 @@ impl Builder { outbound_pipeline_factory: Some(Box::new(factory)), max_concurrent_inbound_tasks: self.max_concurrent_inbound_tasks, + max_concurrent_outbound_tasks: self.max_concurrent_outbound_tasks, inbound: self.inbound, outbound_buffer_size: self.outbound_buffer_size, } @@ -88,6 +96,7 @@ impl Builder { inbound: Some(inbound), max_concurrent_inbound_tasks: self.max_concurrent_inbound_tasks, + max_concurrent_outbound_tasks: self.max_concurrent_outbound_tasks, outbound_rx: self.outbound_rx, outbound_pipeline_factory: self.outbound_pipeline_factory, outbound_buffer_size: self.outbound_buffer_size, @@ -126,6 +135,7 @@ where Ok(Config { max_concurrent_inbound_tasks: self.max_concurrent_inbound_tasks, + max_concurrent_outbound_tasks: self.max_concurrent_outbound_tasks, inbound, outbound, }) @@ -147,6 +157,7 @@ pub struct OutboundPipelineConfig { pub struct Config { pub max_concurrent_inbound_tasks: usize, + pub max_concurrent_outbound_tasks: Option, pub inbound: TInSvc, pub outbound: OutboundPipelineConfig, } diff --git a/comms/src/pipeline/inbound.rs b/comms/src/pipeline/inbound.rs index 2e762a52e8..35d910c8a1 100644 --- a/comms/src/pipeline/inbound.rs +++ b/comms/src/pipeline/inbound.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::fmt::Display; +use std::{fmt::Display, time::Instant}; use futures::future::FusedFuture; use log::*; @@ -66,6 +66,7 @@ where } pub async fn run(mut self) { + let mut current_id = 0; while let Some(item) = self.stream.recv().await { // Check if the shutdown signal has been triggered. // If there are messages in the stream, drop them. Otherwise the stream is empty, @@ -90,12 +91,24 @@ where max_available ); } + + let id = current_id; + current_id = (current_id + 1) % u64::MAX; + // Call the service in it's own spawned task self.executor .spawn(async move { + let timer = Instant::now(); + trace!(target: LOG_TARGET, "Start inbound pipeline {}", id); if let Err(err) = service.oneshot(item).await { warn!(target: LOG_TARGET, "Inbound pipeline returned an error: '{}'", err); } + trace!( + target: LOG_TARGET, + "Finished inbound pipeline {} in {:.2?}", + id, + timer.elapsed() + ); }) .await; } diff --git a/comms/src/pipeline/outbound.rs b/comms/src/pipeline/outbound.rs index 53de72191e..37fe074ec2 100644 --- a/comms/src/pipeline/outbound.rs +++ b/comms/src/pipeline/outbound.rs @@ -20,14 +20,15 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::fmt::Display; +use std::{fmt::Display, time::Instant}; use futures::future::Either; use log::*; -use tokio::{runtime, sync::mpsc}; +use tokio::sync::mpsc; use tower::{Service, ServiceExt}; use crate::{ + bounded_executor::OptionallyBoundedExecutor, message::OutboundMessage, pipeline::builder::OutboundPipelineConfig, protocol::messaging::MessagingRequest, @@ -37,7 +38,7 @@ const LOG_TARGET: &str = "comms::pipeline::outbound"; pub struct Outbound { /// Executor used to spawn a pipeline for each received item on the stream - executor: runtime::Handle, + executor: OptionallyBoundedExecutor, /// Outbound pipeline configuration containing the pipeline and it's in and out streams config: OutboundPipelineConfig, /// Request sender for Messaging @@ -52,7 +53,7 @@ where TPipeline::Future: Send, { pub fn new( - executor: runtime::Handle, + executor: OptionallyBoundedExecutor, config: OutboundPipelineConfig, messaging_request_tx: mpsc::Sender, ) -> Self { @@ -64,6 +65,7 @@ where } pub async fn run(mut self) { + let mut current_id = 0; loop { let either = tokio::select! { next = self.config.in_receiver.recv() => Either::Left(next), @@ -72,12 +74,41 @@ where match either { // Pipeline IN received a message. Spawn a new task for the pipeline Either::Left(Some(msg)) => { - let pipeline = self.config.pipeline.clone(); - self.executor.spawn(async move { - if let Err(err) = pipeline.oneshot(msg).await { - error!(target: LOG_TARGET, "Outbound pipeline returned an error: '{}'", err); + let num_available = self.executor.num_available(); + if let Some(max_available) = self.executor.max_available() { + // Only emit this message if there is any concurrent usage + if num_available < max_available { + debug!( + target: LOG_TARGET, + "Outbound pipeline usage: {}/{}", + max_available - num_available, + max_available + ); } - }); + } + let pipeline = self.config.pipeline.clone(); + let id = current_id; + current_id = (current_id + 1) % u64::MAX; + + self.executor + .spawn(async move { + let timer = Instant::now(); + trace!(target: LOG_TARGET, "Start outbound pipeline {}", id); + if let Err(err) = pipeline.oneshot(msg).await { + error!( + target: LOG_TARGET, + "Outbound pipeline {} returned an error: '{}'", id, err + ); + } + + trace!( + target: LOG_TARGET, + "Finished outbound pipeline {} in {:.2?}", + id, + timer.elapsed() + ); + }) + .await; }, // Pipeline IN channel closed Either::Left(None) => { @@ -144,7 +175,7 @@ mod test { let executor = Handle::current(); let pipeline = Outbound::new( - executor.clone(), + executor.clone().into(), OutboundPipelineConfig { in_receiver, out_receiver: out_rx, diff --git a/comms/src/protocol/messaging/extension.rs b/comms/src/protocol/messaging/extension.rs index fe656e8de3..f3c625c6f6 100644 --- a/comms/src/protocol/messaging/extension.rs +++ b/comms/src/protocol/messaging/extension.rs @@ -27,7 +27,7 @@ use tower::Service; use super::MessagingProtocol; use crate::{ - bounded_executor::BoundedExecutor, + bounded_executor::{BoundedExecutor, OptionallyBoundedExecutor}, message::InboundMessage, pipeline, protocol::{ @@ -36,7 +36,6 @@ use crate::{ ProtocolExtensionContext, ProtocolExtensionError, }, - runtime, runtime::task, }; @@ -104,8 +103,9 @@ where ); task::spawn(inbound.run()); + let executor = OptionallyBoundedExecutor::from_current(self.pipeline.max_concurrent_outbound_tasks); // Spawn outbound pipeline - let outbound = pipeline::Outbound::new(runtime::current(), self.pipeline.outbound, messaging_request_tx); + let outbound = pipeline::Outbound::new(executor, self.pipeline.outbound, messaging_request_tx); task::spawn(outbound.run()); Ok(()) diff --git a/comms/src/protocol/rpc/client/mod.rs b/comms/src/protocol/rpc/client/mod.rs index b4ae9ac68a..f62e8b9597 100644 --- a/comms/src/protocol/rpc/client/mod.rs +++ b/comms/src/protocol/rpc/client/mod.rs @@ -781,12 +781,21 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + StreamId } async fn read_response(&mut self, request_id: u16) -> Result { - let mut reader = RpcResponseReader::new(&mut self.framed, self.config, request_id); + let stream_id = self.stream_id(); + let protocol_name = self.protocol_name().to_string(); + let mut reader = RpcResponseReader::new(&mut self.framed, self.config, request_id); let mut num_ignored = 0; let resp = loop { match reader.read_response().await { Ok(resp) => { + debug!( + target: LOG_TARGET, + "(stream: {}, {}) Received body len = {}", + stream_id, + protocol_name, + reader.bytes_read() + ); metrics::inbound_response_bytes(&self.node_id, &self.protocol_id) .observe(reader.bytes_read() as f64); break resp; @@ -879,6 +888,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin let mut chunk_count = 1; let mut last_chunk_flags = RpcMessageFlags::from_bits_truncate(resp.flags as u8); let mut last_chunk_size = resp.payload.len(); + self.bytes_read += last_chunk_size; loop { trace!( target: LOG_TARGET,