diff --git a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs index 5788b059a9..5e7671393a 100644 --- a/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs +++ b/base_layer/core/src/base_node/comms_interface/inbound_handlers.rs @@ -21,6 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::{ + collections::HashSet, convert::{TryFrom, TryInto}, sync::Arc, time::{Duration, Instant}, @@ -31,7 +32,7 @@ use strum_macros::Display; use tari_common_types::types::{BlockHash, FixedHash, HashOutput}; use tari_comms::{connectivity::ConnectivityRequester, peer_manager::NodeId}; use tari_utilities::hex::Hex; -use tokio::sync::Semaphore; +use tokio::sync::RwLock; use crate::{ base_node::{ @@ -81,7 +82,7 @@ pub struct InboundNodeCommsHandlers { blockchain_db: AsyncBlockchainDb, mempool: Mempool, consensus_manager: ConsensusManager, - new_block_request_semaphore: Arc, + list_of_reconciling_blocks: Arc>>, outbound_nci: OutboundNodeCommsInterface, connectivity: ConnectivityRequester, } @@ -103,7 +104,7 @@ where B: BlockchainBackend + 'static blockchain_db, mempool, consensus_manager, - new_block_request_semaphore: Arc::new(Semaphore::new(1)), + list_of_reconciling_blocks: Arc::new(RwLock::new(HashSet::new())), outbound_nci, connectivity, } @@ -436,14 +437,7 @@ where B: BlockchainBackend + 'static return Ok(()); } - // Only a single block request can complete at a time. - // As multiple NewBlock requests arrive from propagation, this semaphore prevents multiple requests to nodes for - // the same full block. The first request that succeeds will stop the node from requesting the block from any - // other node (block_exists is true). - // Arc clone to satisfy the borrow checker - let semaphore = self.new_block_request_semaphore.clone(); - let _permit = semaphore.acquire().await.unwrap(); - + // Lets check if the block exists before we try and ask for a complete block if self.blockchain_db.block_exists(block_hash).await? { debug!( target: LOG_TARGET, @@ -453,6 +447,39 @@ where B: BlockchainBackend + 'static return Ok(()); } + { + // we use a double lock to make sure we can only reconcile one unique block at a time. We may receive the + // same block from multiple peer near simultaneously. We should only reconcile each unique block once. + let read_lock = self.list_of_reconciling_blocks.read().await; + if read_lock.contains(&block_hash) { + debug!( + target: LOG_TARGET, + "Block with hash `{}` is already being reconciled", + block_hash.to_hex() + ); + return Ok(()); + } + } + { + let mut write_lock = self.list_of_reconciling_blocks.write().await; + if self.blockchain_db.block_exists(block_hash).await? { + debug!( + target: LOG_TARGET, + "Block with hash `{}` already stored", + block_hash.to_hex() + ); + return Ok(()); + } + if !write_lock.insert(block_hash) { + debug!( + target: LOG_TARGET, + "Block with hash `{}` is already being reconciled", + block_hash.to_hex() + ); + return Ok(()); + } + } + debug!( target: LOG_TARGET, "Block with hash `{}` is unknown. Constructing block from known mempool transactions / requesting missing \ @@ -461,9 +488,23 @@ where B: BlockchainBackend + 'static source_peer ); + let result = self.reconcile_and_add_block(source_peer.clone(), new_block).await; + + { + let mut write_lock = self.list_of_reconciling_blocks.write().await; + write_lock.remove(&block_hash); + } + result?; + Ok(()) + } + + async fn reconcile_and_add_block( + &mut self, + source_peer: NodeId, + new_block: NewBlock, + ) -> Result<(), CommsInterfaceError> { let block = self.reconcile_block(source_peer.clone(), new_block).await?; self.handle_block(block, Some(source_peer)).await?; - Ok(()) } @@ -890,7 +931,7 @@ impl Clone for InboundNodeCommsHandlers { blockchain_db: self.blockchain_db.clone(), mempool: self.mempool.clone(), consensus_manager: self.consensus_manager.clone(), - new_block_request_semaphore: self.new_block_request_semaphore.clone(), + list_of_reconciling_blocks: self.list_of_reconciling_blocks.clone(), outbound_nci: self.outbound_nci.clone(), connectivity: self.connectivity.clone(), }