From b5adc656744a082466fa539f6999a0621d7f5ff6 Mon Sep 17 00:00:00 2001 From: SW van Heerden Date: Fri, 8 Sep 2023 17:00:23 +0200 Subject: [PATCH] Add consistency to header sync This PR fixes an edge case for header sync where the local node has a higher chain header, but the remote node has a higher actual blockchain height. It also simplifies the attempted header sync logic and error handling when a peer does not return headers and introduces consistent naming, variable re-use and use of information. When doing header sync, and determining from which peer to sync, we need to consider our actual blockchain state when comparing chains. It is possible that our node has valid block headers that will translate into a higher proof of work when fully synced than the remote syncing peer, but lacking the blocks to back the block headers, thus the remote chain has an actual higher proof of work blockchain. With the current implementation, race conditions can exist when determining if the peer is misbehaving, i.e. lying about their proof-of-work or not wanting to supply block headers, due to a mismatch in tip height used for the check. This PR fixes the race conditions by always using the latest local chain metadata and block headers, ignoring all sync peers that do not exceed our own accumulated difficulty and using consistent information in all the checks. Add a header sync tests integration unit test. --- Cargo.lock | 1 + .../state_machine_service/state_machine.rs | 4 +- .../states/events_and_states.rs | 10 +- .../states/header_sync.rs | 39 ++- .../src/base_node/sync/header_sync/mod.rs | 2 +- .../sync/header_sync/synchronizer.rs | 263 ++++++++------- base_layer/core/src/base_node/sync/mod.rs | 2 +- base_layer/core/tests/helpers/nodes.rs | 49 ++- base_layer/core/tests/tests/header_sync.rs | 299 ++++++++++++++++++ base_layer/core/tests/tests/mempool.rs | 3 +- base_layer/core/tests/tests/mod.rs | 1 + .../core/tests/tests/node_state_machine.rs | 3 +- base_layer/p2p/src/initialization.rs | 19 +- .../p2p/tests/support/comms_and_services.rs | 13 +- .../tests/support/comms_and_services.rs | 7 + 15 files changed, 569 insertions(+), 146 deletions(-) create mode 100644 base_layer/core/tests/tests/header_sync.rs diff --git a/Cargo.lock b/Cargo.lock index 513a18f56c2..fa07d1578ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5710,6 +5710,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tower", "tracing", "uint", "zeroize", diff --git a/base_layer/core/src/base_node/state_machine_service/state_machine.rs b/base_layer/core/src/base_node/state_machine_service/state_machine.rs index acebad629a2..4578ee60860 100644 --- a/base_layer/core/src/base_node/state_machine_service/state_machine.rs +++ b/base_layer/core/src/base_node/state_machine_service/state_machine.rs @@ -153,7 +153,7 @@ impl BaseNodeStateMachine { db.set_disable_add_block_flag(); HeaderSync(HeaderSyncState::new(sync_peers, local_metadata)) }, - (HeaderSync(s), HeaderSyncFailed) => { + (HeaderSync(s), HeaderSyncFailed(_err)) => { db.clear_disable_add_block_flag(); Waiting(s.into()) }, @@ -161,7 +161,7 @@ impl BaseNodeStateMachine { db.clear_disable_add_block_flag(); Listening(s.into()) }, - (HeaderSync(s), HeadersSynchronized(_)) => DecideNextSync(s.into()), + (HeaderSync(s), HeadersSynchronized(..)) => DecideNextSync(s.into()), (DecideNextSync(_), ProceedToHorizonSync(peers)) => HorizonStateSync(peers.into()), (DecideNextSync(s), Continue) => { diff --git a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs index 2fea428f714..5f0ef9c9f55 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs @@ -37,7 +37,7 @@ use crate::base_node::{ Starting, Waiting, }, - sync::{HorizonSyncInfo, SyncPeer}, + sync::{AttemptSyncResult, HorizonSyncInfo, SyncPeer}, }; #[derive(Debug)] @@ -57,8 +57,8 @@ pub enum BaseNodeState { #[derive(Debug, Clone, PartialEq)] pub enum StateEvent { Initialized, - HeadersSynchronized(SyncPeer), - HeaderSyncFailed, + HeadersSynchronized(SyncPeer, AttemptSyncResult), + HeaderSyncFailed(String), ProceedToHorizonSync(Vec), ProceedToBlockSync(Vec), HorizonStateSynchronized, @@ -145,8 +145,8 @@ impl Display for StateEvent { match self { Initialized => write!(f, "Initialized"), BlocksSynchronized => write!(f, "Synchronised Blocks"), - HeadersSynchronized(peer) => write!(f, "Headers Synchronized from peer `{}`", peer), - HeaderSyncFailed => write!(f, "Header Synchronization Failed"), + HeadersSynchronized(peer, result) => write!(f, "Headers Synchronized from peer `{}` ({:?})", peer, result), + HeaderSyncFailed(err) => write!(f, "Header Synchronization Failed ({})", err), ProceedToHorizonSync(_) => write!(f, "Proceed to horizon sync"), ProceedToBlockSync(_) => write!(f, "Proceed to block sync"), HorizonStateSynchronized => write!(f, "Horizon State Synchronized"), diff --git a/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs index 1c51066ea19..d13b19198e6 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs @@ -24,6 +24,7 @@ use std::{cmp::Ordering, time::Instant}; use log::*; use tari_common_types::chain_metadata::ChainMetadata; +use tari_comms::peer_manager::NodeId; use crate::{ base_node::{ @@ -35,7 +36,6 @@ use crate::{ }, chain_storage::BlockchainBackend, }; - const LOG_TARGET: &str = "c::bn::header_sync"; #[derive(Clone, Debug)] @@ -70,12 +70,42 @@ impl HeaderSyncState { self.sync_peers } + fn remove_sync_peer(&mut self, node_id: &NodeId) { + if let Some(pos) = self.sync_peers.iter().position(|p| p.node_id() == node_id) { + self.sync_peers.remove(pos); + } + } + // converting u64 to i64 is okay as the future time limit is the hundreds so way below u32 even + #[allow(clippy::too_many_lines)] #[allow(clippy::cast_possible_wrap)] pub async fn next_event( &mut self, shared: &mut BaseNodeStateMachine, ) -> StateEvent { + // Only sync to peers with better claimed accumulated difficulty than the local chain: this may be possible + // at this stage due to read-write lock race conditions in the database + match shared.db.get_chain_metadata().await { + Ok(best_block_metadata) => { + let mut remove = Vec::new(); + for sync_peer in &self.sync_peers { + if sync_peer.claimed_chain_metadata().accumulated_difficulty() <= + best_block_metadata.accumulated_difficulty() + { + remove.push(sync_peer.node_id().clone()); + } + } + for node_id in remove { + self.remove_sync_peer(&node_id); + } + if self.sync_peers.is_empty() { + // Go back to Listening state + return StateEvent::Continue; + } + }, + Err(e) => return StateEvent::FatalError(format!("{}", e)), + } + let mut synchronizer = HeaderSynchronizer::new( shared.config.blockchain_sync_config.clone(), shared.db.clone(), @@ -128,7 +158,7 @@ impl HeaderSyncState { let mut mdc = vec![]; log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned()))); match synchronizer.synchronize().await { - Ok(sync_peer) => { + Ok((sync_peer, sync_result)) => { log_mdc::extend(mdc); info!( target: LOG_TARGET, @@ -144,9 +174,10 @@ impl HeaderSyncState { } } self.is_synced = true; - StateEvent::HeadersSynchronized(sync_peer) + StateEvent::HeadersSynchronized(sync_peer, sync_result) }, Err(err) => { + println!("HeaderSyncState::next_event - {}", err); let _ignore = shared.status_event_sender.send(StatusInfo { bootstrapped, state_info: StateInfo::SyncFailed("HeaderSyncFailed".to_string()), @@ -163,7 +194,7 @@ impl HeaderSyncState { _ => { log_mdc::extend(mdc); debug!(target: LOG_TARGET, "Header sync failed: {}", err); - StateEvent::HeaderSyncFailed + StateEvent::HeaderSyncFailed(err.to_string()) }, } }, diff --git a/base_layer/core/src/base_node/sync/header_sync/mod.rs b/base_layer/core/src/base_node/sync/header_sync/mod.rs index d71c092a525..da4f5676645 100644 --- a/base_layer/core/src/base_node/sync/header_sync/mod.rs +++ b/base_layer/core/src/base_node/sync/header_sync/mod.rs @@ -26,4 +26,4 @@ pub use error::BlockHeaderSyncError; mod validator; mod synchronizer; -pub use synchronizer::HeaderSynchronizer; +pub use synchronizer::{AttemptSyncResult, HeaderSyncStatus, HeaderSynchronizer}; diff --git a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs index c29abc3ff2a..c4f395be946 100644 --- a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs @@ -40,13 +40,14 @@ use super::{validator::BlockHeaderSyncValidator, BlockHeaderSyncError}; use crate::{ base_node::sync::{ban::PeerBanManager, hooks::Hooks, rpc, BlockchainSyncConfig, SyncPeer}, blocks::{BlockHeader, ChainBlock, ChainHeader}, - chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend}, + chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError}, common::rolling_avg::RollingAverageTime, consensus::ConsensusManager, proof_of_work::randomx_factory::RandomXFactory, proto::{ base_node as proto, base_node::{FindChainSplitRequest, SyncHeadersRequest}, + core::BlockHeader as ProtoBlockHeader, }, }; @@ -63,7 +64,7 @@ pub struct HeaderSynchronizer<'a, B> { connectivity: ConnectivityRequester, sync_peers: &'a mut Vec, hooks: Hooks, - local_metadata: &'a ChainMetadata, + local_cached_metadata: &'a ChainMetadata, peer_ban_manager: PeerBanManager, } @@ -85,7 +86,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { connectivity, sync_peers, hooks: Default::default(), - local_metadata, + local_cached_metadata: local_metadata, peer_ban_manager, } } @@ -105,7 +106,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { self.hooks.add_on_rewind_hook(hook); } - pub async fn synchronize(&mut self) -> Result { + pub async fn synchronize(&mut self) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> { debug!(target: LOG_TARGET, "Starting header sync.",); info!( @@ -117,7 +118,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { let mut latency_increases_counter = 0; loop { match self.try_sync_from_all_peers(max_latency).await { - Ok(sync_peer) => break Ok(sync_peer), + Ok((peer, sync_result)) => break Ok((peer, sync_result)), Err(err @ BlockHeaderSyncError::AllSyncPeersExceedLatency) => { // If we have few sync peers, throw this out to be retried later if self.sync_peers.len() < 2 { @@ -135,7 +136,10 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { } #[allow(clippy::too_many_lines)] - pub async fn try_sync_from_all_peers(&mut self, max_latency: Duration) -> Result { + pub async fn try_sync_from_all_peers( + &mut self, + max_latency: Duration, + ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> { let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::>(); info!( target: LOG_TARGET, @@ -145,7 +149,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { let mut latency_counter = 0usize; for node_id in sync_peer_node_ids { match self.connect_and_attempt_sync(&node_id, max_latency).await { - Ok(peer) => return Ok(peer), + Ok((peer, sync_result)) => return Ok((peer, sync_result)), Err(err) => { let ban_reason = BlockHeaderSyncError::get_ban_reason( &err, @@ -180,7 +184,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { &mut self, node_id: &NodeId, max_latency: Duration, - ) -> Result { + ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> { let peer_index = self .get_sync_peer_index(node_id) .ok_or(BlockHeaderSyncError::PeerNotFound)?; @@ -214,8 +218,8 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { debug!(target: LOG_TARGET, "Sync peer latency is {:.2?}", latency); let sync_peer = self.sync_peers[peer_index].clone(); - self.attempt_sync(&sync_peer, client, max_latency).await?; - Ok(sync_peer) + let sync_result = self.attempt_sync(&sync_peer, client, max_latency).await?; + Ok((sync_peer, sync_result)) } async fn dial_sync_peer(&self, node_id: &NodeId) -> Result { @@ -236,7 +240,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { sync_peer: &SyncPeer, mut client: rpc::BaseNodeSyncRpcClient, max_latency: Duration, - ) -> Result<(), BlockHeaderSyncError> { + ) -> Result { let latency = client.get_last_request_latency(); debug!( target: LOG_TARGET, @@ -245,59 +249,69 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { latency.unwrap_or_default().as_millis() ); - // Fetch the local tip header at the beginning of the sync process - let local_tip_header = self.db.fetch_last_chain_header().await?; - let local_total_accumulated_difficulty = local_tip_header.accumulated_data().total_accumulated_difficulty; - let header_tip_height = local_tip_header.height(); - let sync_status = self - .determine_sync_status(sync_peer, local_tip_header, &mut client) + // Fetch best local data at the beginning of the sync process + let best_block_metadata = self.db.get_chain_metadata().await?; + let best_header = self.db.fetch_last_chain_header().await?; + let best_block = self + .db + .fetch_chain_header(best_block_metadata.height_of_longest_chain()) + .await?; + let best_header_height = best_header.height(); + let best_block_height = best_block.height(); + + if best_header_height < best_block_height || + best_block_height < self.local_cached_metadata.height_of_longest_chain() + { + return Err(BlockHeaderSyncError::ChainStorageError( + ChainStorageError::CorruptedDatabase("Inconsistent block and header data".to_string()), + )); + } + + let peer_response = self + .find_chain_split(sync_peer.node_id(), &mut client, NUM_INITIAL_HEADERS_TO_REQUEST as u64) .await?; - match sync_status { - SyncStatus::InSync | SyncStatus::WereAhead => { - let metadata = self.db.get_chain_metadata().await?; - if metadata.height_of_longest_chain() < header_tip_height { + let header_sync_status = self + .determine_sync_status(&sync_peer.to_string(), best_header, best_block, peer_response.clone()) + .await?; + + match header_sync_status.clone() { + HeaderSyncStatus::InSyncOrAhead => { + if best_block_height < best_header_height { debug!( target: LOG_TARGET, "Headers are in sync at height {} but tip is {}. Proceeding to archival/pruned block sync", - header_tip_height, - metadata.height_of_longest_chain() + best_header_height, + best_block_height ); - Ok(()) + + Ok(AttemptSyncResult { + headers_returned: peer_response.headers.len() as u64, + fork_hash_index: peer_response.fork_hash_index, + header_sync_status, + }) } else { - // Check if the metadata that we had when we decided to enter header sync is behind the peer's - // claimed one. If so, our chain has updated in the meantime and the sync peer - // is behaving. - if self.local_metadata.accumulated_difficulty() <= - sync_peer.claimed_chain_metadata().accumulated_difficulty() - { - debug!( - target: LOG_TARGET, - "Local blockchain received a better block through propagation at height {} (was: {}). \ - Proceeding to archival/pruned block sync", - metadata.height_of_longest_chain(), - self.local_metadata.height_of_longest_chain() - ); - return Ok(()); - } - debug!( + // We will only attempt sync if the our accumulated difficulty is less than the peer's claimed + // accumulated difficulty, thus this is adverse behaviour form the peer. + warn!( target: LOG_TARGET, "Headers and block state are already in-sync (Header Tip: {}, Block tip: {}, Peer's height: \ - {})", - header_tip_height, - metadata.height_of_longest_chain(), + {}), peer has lied about chain metadata or did not want to provide headers", + best_header_height, + best_block_height, sync_peer.claimed_chain_metadata().height_of_longest_chain(), ); + Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata { claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(), actual: None, - local: local_total_accumulated_difficulty, + local: self.local_cached_metadata.accumulated_difficulty(), }) } }, - SyncStatus::Lagging(split_info) => { + HeaderSyncStatus::Lagging(split_info) => { self.hooks.call_on_progress_header_hooks( split_info - .local_tip_header + .best_block .height() .checked_sub(split_info.reorg_steps_back) .unwrap_or_default(), @@ -306,7 +320,11 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { ); self.synchronize_headers(sync_peer.clone(), &mut client, *split_info, max_latency) .await?; - Ok(()) + Ok(AttemptSyncResult { + headers_returned: peer_response.headers.len() as u64, + fork_hash_index: peer_response.fork_hash_index, + header_sync_status, + }) }, } } @@ -316,7 +334,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { peer: &NodeId, client: &mut rpc::BaseNodeSyncRpcClient, header_count: u64, - ) -> Result<(proto::FindChainSplitResponse, Vec, u64), BlockHeaderSyncError> { + ) -> Result { const NUM_CHAIN_SPLIT_HEADERS: usize = 500; // Limit how far back we're willing to go. A peer might just say it does not have a chain split // and keep us busy going back until the genesis. @@ -380,69 +398,76 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { )); } - let steps_back = resp.fork_hash_index.saturating_add(offset as u64); - return Ok((resp, block_hashes, steps_back)); + let reorg_steps_back = resp.fork_hash_index.saturating_add(offset as u64); + let proto::FindChainSplitResponse { + headers, + fork_hash_index, + tip_height: remote_tip_height, + } = resp; + return Ok(PeerChainSplitResponse { + block_hashes, + reorg_steps_back, + headers, + fork_hash_index, + remote_tip_height, + }); } } /// Attempt to determine the point at which the remote and local chain diverge, returning the relevant information - /// of the chain split (see [SyncStatus]). + /// of the chain split (see [HeaderSyncStatus]). /// - /// If the local node is behind the remote chain (i.e. `SyncStatus::Lagging`), the appropriate `ChainSplitInfo` is - /// returned, the header validator is initialized and the preliminary headers are validated. + /// If the local node is behind the remote chain (i.e. `HeaderSyncStatus::Lagging`), the appropriate + /// `ChainSplitInfo` is returned, the header validator is initialized and the preliminary headers are validated. async fn determine_sync_status( &mut self, - sync_peer: &SyncPeer, - local_tip_header: ChainHeader, - client: &mut rpc::BaseNodeSyncRpcClient, - ) -> Result { - let (resp, block_hashes, steps_back) = self - .find_chain_split(sync_peer.node_id(), client, NUM_INITIAL_HEADERS_TO_REQUEST as u64) - .await?; - let proto::FindChainSplitResponse { - headers, - fork_hash_index, - tip_height: remote_tip_height, - } = resp; - - if steps_back > 0 { + sync_peer: &str, + best_header: ChainHeader, + best_block: ChainHeader, + peer_response: PeerChainSplitResponse, + ) -> Result { + if peer_response.reorg_steps_back > 0 { debug!( target: LOG_TARGET, "Found chain split {} blocks back, received {} headers from peer `{}`", - steps_back, - headers.len(), + peer_response.reorg_steps_back, + peer_response.headers.len(), sync_peer ); } - // If the peer returned no new headers, this means header sync is done. - if headers.is_empty() { - if fork_hash_index > 0 { + // If the peer returned no new headers, they have no headers, but may still have more blocks than we have, + // thus have a higher accumulated difficulty. + if peer_response.headers.is_empty() { + if peer_response.fork_hash_index > 0 { debug!( target: LOG_TARGET, - "Peer `{}` has sent no headers but forked_hash_index is {}. The peer is behind our chain.", + "Peer `{}` has sent no headers with forked_hash_index {}. The peer has less headers than we have.", sync_peer, - fork_hash_index + peer_response.fork_hash_index ); - - return Ok(SyncStatus::WereAhead); + } else { + debug!(target: LOG_TARGET, "Headers already in sync with peer `{}`.", sync_peer); } - debug!(target: LOG_TARGET, "Already in sync with peer `{}`.", sync_peer); - return Ok(SyncStatus::InSync); + return Ok(HeaderSyncStatus::InSyncOrAhead); } - let headers = headers + let headers = peer_response + .headers .into_iter() .map(BlockHeader::try_from) .collect::, _>>() .map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?; - let num_new_headers = headers.len(); + let num_new_headers = headers.len(); // Required fro later use, no 'Copy' trait on 'BlockHeader' // NOTE: We can trust that the header associated with this hash exists because `block_hashes` was supplied by // this node. Bounds checking for fork_hash_index has been done above. #[allow(clippy::cast_possible_truncation)] - let chain_split_hash = block_hashes.get(fork_hash_index as usize).unwrap(); + let chain_split_hash = peer_response + .block_hashes + .get(peer_response.fork_hash_index as usize) + .unwrap(); self.header_validator.initialize_state(chain_split_hash).await?; for header in headers { @@ -462,8 +487,8 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { ); // Basic sanity check that the peer sent tip height greater than the split. - let split_height = local_tip_header.height().saturating_sub(steps_back); - if remote_tip_height < split_height { + let split_height = best_header.height().saturating_sub(peer_response.reorg_steps_back); + if peer_response.remote_tip_height < split_height { return Err(BlockHeaderSyncError::InvalidProtocolResponse(format!( "Peer {} sent invalid remote tip height", sync_peer @@ -471,12 +496,12 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { } let chain_split_info = ChainSplitInfo { - local_tip_header, - remote_tip_height, - reorg_steps_back: steps_back, + best_block, + remote_tip_height: peer_response.remote_tip_height, + reorg_steps_back: peer_response.reorg_steps_back, chain_split_hash: *chain_split_hash, }; - Ok(SyncStatus::Lagging(Box::new(chain_split_info))) + Ok(HeaderSyncStatus::Lagging(Box::new(chain_split_info))) } async fn rewind_blockchain(&self, split_hash: HashOutput) -> Result>, BlockHeaderSyncError> { @@ -520,7 +545,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { // If we already have a stronger chain at this point, switch over to it. // just in case we happen to be exactly NUM_INITIAL_HEADERS_TO_REQUEST headers behind. - let has_better_pow = self.pending_chain_has_higher_pow(&split_info.local_tip_header); + let has_better_pow = self.pending_chain_has_higher_pow(&split_info.best_block); if has_better_pow { debug!( @@ -541,10 +566,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata { claimed: sync_peer.claimed_chain_metadata().accumulated_difficulty(), actual: Some(total_accumulated_difficulty), - local: split_info - .local_tip_header - .accumulated_data() - .total_accumulated_difficulty, + local: split_info.best_block.accumulated_data().total_accumulated_difficulty, }); } // The pow is higher, we swapped to the higher chain, we have all the better chain headers, we can move on @@ -625,7 +647,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { // The remote chain has not (yet) been accepted. // We check the tip difficulties, switching over to the new chain if a higher accumulated difficulty is // achieved. - if self.pending_chain_has_higher_pow(&split_info.local_tip_header) { + if self.pending_chain_has_higher_pow(&split_info.best_block) { self.switch_to_pending_chain(&split_info).await?; has_switched_to_new_chain = true; } @@ -665,10 +687,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { .header_validator .current_valid_chain_tip_header() .map(|h| h.accumulated_data().total_accumulated_difficulty), - local: split_info - .local_tip_header - .accumulated_data() - .total_accumulated_difficulty, + local: split_info.best_block.accumulated_data().total_accumulated_difficulty, }); } else { warn!( @@ -677,7 +696,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { swapped. Ignoring", sync_peer.claimed_chain_metadata().accumulated_difficulty(), split_info - .local_tip_header + .best_block .accumulated_data() .total_accumulated_difficulty ); @@ -697,10 +716,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { return Err(BlockHeaderSyncError::PeerSentInaccurateChainMetadata { claimed: claimed_total_accumulated_diff, actual: Some(last_total_accumulated_difficulty), - local: split_info - .local_tip_header - .accumulated_data() - .total_accumulated_difficulty, + local: split_info.best_block.accumulated_data().total_accumulated_difficulty, }); } @@ -781,18 +797,43 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { } } -struct ChainSplitInfo { - local_tip_header: ChainHeader, - remote_tip_height: u64, +#[derive(Debug, Clone)] +struct PeerChainSplitResponse { + block_hashes: Vec, reorg_steps_back: u64, - chain_split_hash: HashOutput, + headers: Vec, + fork_hash_index: u64, + remote_tip_height: u64, +} + +/// Information about the chain split from the remote node. +#[derive(Debug, Clone, PartialEq)] +pub struct ChainSplitInfo { + /// The best block on the local chain. + pub best_block: ChainHeader, + /// The height of the remote node's tip. + pub remote_tip_height: u64, + /// The number of blocks to reorg back to the fork. + pub reorg_steps_back: u64, + /// The hash of the block at the fork. + pub chain_split_hash: HashOutput, +} + +/// The result of an attempt to synchronize headers with a peer. +#[derive(Debug, Clone, PartialEq)] +pub struct AttemptSyncResult { + /// The number of headers that were returned. + pub headers_returned: u64, + /// The fork hash index of the remote peer. + pub fork_hash_index: u64, + /// The header sync status. + pub header_sync_status: HeaderSyncStatus, } -enum SyncStatus { - /// Local and remote node are in sync - InSync, - /// Local node is ahead of the remote node - WereAhead, +#[derive(Debug, Clone, PartialEq)] +pub enum HeaderSyncStatus { + /// Local and remote node are in sync or ahead + InSyncOrAhead, /// Local node is lagging behind remote node Lagging(Box), } diff --git a/base_layer/core/src/base_node/sync/mod.rs b/base_layer/core/src/base_node/sync/mod.rs index 2193bb6482f..1c5ef806910 100644 --- a/base_layer/core/src/base_node/sync/mod.rs +++ b/base_layer/core/src/base_node/sync/mod.rs @@ -36,7 +36,7 @@ pub use block_sync::{BlockSyncError, BlockSynchronizer}; #[cfg(feature = "base_node")] mod header_sync; #[cfg(feature = "base_node")] -pub use header_sync::{BlockHeaderSyncError, HeaderSynchronizer}; +pub use header_sync::{AttemptSyncResult, BlockHeaderSyncError, HeaderSyncStatus, HeaderSynchronizer}; #[cfg(feature = "base_node")] mod horizon_state_sync; diff --git a/base_layer/core/tests/helpers/nodes.rs b/base_layer/core/tests/helpers/nodes.rs index 908852faa20..c17f45d3769 100644 --- a/base_layer/core/tests/helpers/nodes.rs +++ b/base_layer/core/tests/helpers/nodes.rs @@ -26,12 +26,14 @@ use rand::rngs::OsRng; use tari_common::configuration::Network; use tari_comms::{ peer_manager::{NodeIdentity, PeerFeatures}, - protocol::messaging::MessagingEventSender, + protocol::{messaging::MessagingEventSender, rpc::RpcServer}, transports::MemoryTransport, CommsNode, + UnspawnedCommsNode, }; use tari_comms_dht::{outbound::OutboundMessageRequester, Dht}; use tari_core::{ + base_node, base_node::{ chain_metadata_service::{ChainMetadataHandle, ChainMetadataServiceInitializer}, comms_interface::OutboundNodeCommsInterface, @@ -63,6 +65,7 @@ use tari_p2p::{ comms_connector::{pubsub_connector, InboundDomainConnector}, initialization::initialize_local_test_comms, services::liveness::{config::LivenessConfig, LivenessHandle, LivenessInitializer}, + P2pConfig, }; use tari_service_framework::{RegisterHandle, StackBuilder}; use tari_shutdown::Shutdown; @@ -105,6 +108,7 @@ pub struct BaseNodeBuilder { mempool_config: Option, mempool_service_config: Option, liveness_service_config: Option, + p2p_config: Option, validators: Option>, consensus_manager: Option, network: NetworkConsensus, @@ -120,6 +124,7 @@ impl BaseNodeBuilder { mempool_config: None, mempool_service_config: None, liveness_service_config: None, + p2p_config: None, validators: None, consensus_manager: None, network, @@ -156,6 +161,12 @@ impl BaseNodeBuilder { self } + /// Set the p2p configuration + pub fn with_p2p_config(mut self, config: P2pConfig) -> Self { + self.p2p_config = Some(config); + self + } + pub fn with_validators( mut self, block: impl CandidateBlockValidator + 'static, @@ -203,6 +214,7 @@ impl BaseNodeBuilder { mempool, consensus_manager.clone(), self.liveness_service_config.unwrap_or_default(), + self.p2p_config.unwrap_or_default(), data_path, ) .await; @@ -251,6 +263,7 @@ pub async fn create_network_with_2_base_nodes(data_path: &str) -> (NodeInterface pub async fn create_network_with_2_base_nodes_with_config>( mempool_service_config: MempoolServiceConfig, liveness_service_config: LivenessConfig, + p2p_config: P2pConfig, consensus_manager: ConsensusManager, data_path: P, ) -> (NodeInterfaces, NodeInterfaces, ConsensusManager) { @@ -261,6 +274,7 @@ pub async fn create_network_with_2_base_nodes_with_config>( .with_node_identity(alice_node_identity.clone()) .with_mempool_service_config(mempool_service_config.clone()) .with_liveness_service_config(liveness_service_config.clone()) + .with_p2p_config(p2p_config.clone()) .with_consensus_manager(consensus_manager) .start(data_path.as_ref().join("alice").as_os_str().to_str().unwrap()) .await; @@ -269,6 +283,7 @@ pub async fn create_network_with_2_base_nodes_with_config>( .with_peers(vec![alice_node_identity]) .with_mempool_service_config(mempool_service_config) .with_liveness_service_config(liveness_service_config) + .with_p2p_config(p2p_config.clone()) .with_consensus_manager(consensus_manager) .start(data_path.as_ref().join("bob").as_os_str().to_str().unwrap()) .await; @@ -360,9 +375,9 @@ async fn setup_comms_services( peers: Vec>, publisher: InboundDomainConnector, data_path: &str, -) -> (CommsNode, Dht, MessagingEventSender, Shutdown) { + shutdown: &Shutdown, +) -> (UnspawnedCommsNode, Dht, MessagingEventSender) { let peers = peers.into_iter().map(|p| p.to_peer()).collect(); - let shutdown = Shutdown::new(); let (comms, dht, messaging_events) = initialize_local_test_comms( node_identity, @@ -375,7 +390,7 @@ async fn setup_comms_services( .await .unwrap(); - (comms, dht, messaging_events, shutdown) + (comms, dht, messaging_events) } // Helper function for starting the services of the Base node. @@ -386,12 +401,15 @@ async fn setup_base_node_services( mempool: Mempool, consensus_manager: ConsensusManager, liveness_service_config: LivenessConfig, + p2p_config: P2pConfig, data_path: &str, ) -> NodeInterfaces { let (publisher, subscription_factory) = pubsub_connector(100); let subscription_factory = Arc::new(subscription_factory); - let (comms, dht, messaging_events, shutdown) = - setup_comms_services(node_identity.clone(), peers, publisher, data_path).await; + let shutdown = Shutdown::new(); + + let (comms, dht, messaging_events) = + setup_comms_services(node_identity.clone(), peers, publisher.clone(), data_path, &shutdown).await; let mock_state_machine = MockBaseNodeStateMachine::new(); let randomx_factory = RandomXFactory::new(2); @@ -417,6 +435,25 @@ async fn setup_base_node_services( .await .unwrap(); + let base_node_service = handles.expect_handle::(); + let rpc_server = RpcServer::builder() + .with_maximum_simultaneous_sessions(p2p_config.rpc_max_simultaneous_sessions) + .with_maximum_sessions_per_client(p2p_config.rpc_max_sessions_per_peer) + .finish(); + let rpc_server = rpc_server.add_service(base_node::create_base_node_sync_rpc_service( + blockchain_db.clone().into(), + base_node_service, + )); + let comms = comms + .add_protocol_extension(rpc_server) + .spawn_with_transport(MemoryTransport) + .await + .unwrap(); + // Set the public address for tests + comms + .node_identity() + .add_public_address(comms.listening_address().clone()); + let outbound_nci = handles.expect_handle::(); let local_nci = handles.expect_handle::(); let outbound_mp_interface = handles.expect_handle::(); diff --git a/base_layer/core/tests/tests/header_sync.rs b/base_layer/core/tests/tests/header_sync.rs new file mode 100644 index 00000000000..d5fa7c802a0 --- /dev/null +++ b/base_layer/core/tests/tests/header_sync.rs @@ -0,0 +1,299 @@ +// Copyright 2022. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::time::Duration; + +use tari_common::configuration::Network; +use tari_core::{ + base_node::{ + chain_metadata_service::PeerChainMetadata, + state_machine_service::{ + states::{HeaderSyncState, StateEvent, StatusInfo}, + BaseNodeStateMachine, + BaseNodeStateMachineConfig, + }, + sync::{HeaderSyncStatus, SyncPeer}, + SyncValidators, + }, + consensus::{ConsensusConstantsBuilder, ConsensusManagerBuilder}, + mempool::MempoolServiceConfig, + proof_of_work::{randomx_factory::RandomXFactory, Difficulty}, + test_helpers::blockchain::TempDatabase, + transactions::test_helpers::create_test_core_key_manager_with_memory_db, + validation::mocks::MockValidator, +}; +use tari_p2p::{services::liveness::config::LivenessConfig, P2pConfig}; +use tari_shutdown::Shutdown; +use tempfile::tempdir; +use tokio::sync::{broadcast, watch}; + +use crate::helpers::{ + block_builders::{append_block, create_genesis_block}, + nodes::{create_network_with_2_base_nodes_with_config, NodeInterfaces}, +}; + +static EMISSION: [u64; 2] = [10, 10]; + +#[allow(clippy::too_many_lines)] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_header_sync() { + // Create the network with alice node and bob node + let network = Network::LocalNet; + let temp_dir = tempdir().unwrap(); + let key_manager = create_test_core_key_manager_with_memory_db(); + let consensus_constants = ConsensusConstantsBuilder::new(network) + .with_emission_amounts(100_000_000.into(), &EMISSION, 100.into()) + .build(); + let (initial_block, _) = create_genesis_block(&consensus_constants, &key_manager).await; + let consensus_manager = ConsensusManagerBuilder::new(network) + .add_consensus_constants(consensus_constants) + .with_block(initial_block.clone()) + .build() + .unwrap(); + let (alice_node, bob_node, consensus_manager) = create_network_with_2_base_nodes_with_config( + MempoolServiceConfig::default(), + LivenessConfig { + auto_ping_interval: Some(Duration::from_millis(100)), + ..Default::default() + }, + P2pConfig::default(), + consensus_manager, + temp_dir.path().to_str().unwrap(), + ) + .await; + let shutdown = Shutdown::new(); + let (state_change_event_publisher, _) = broadcast::channel(10); + let (status_event_sender, _status_event_receiver) = watch::channel(StatusInfo::new()); + + // Alice needs a state machine for header sync + let mut alice_state_machine = BaseNodeStateMachine::new( + alice_node.blockchain_db.clone().into(), + alice_node.local_nci.clone(), + alice_node.comms.connectivity(), + alice_node.comms.peer_manager(), + alice_node.chain_metadata_handle.get_event_stream(), + BaseNodeStateMachineConfig::default(), + SyncValidators::new(MockValidator::new(true), MockValidator::new(true)), + status_event_sender, + state_change_event_publisher, + RandomXFactory::default(), + consensus_manager.clone(), + shutdown.to_signal(), + ); + + // Add 1 block to bob's chain + let block_1_bob = append_block( + &bob_node.blockchain_db, + &initial_block, + vec![], + &consensus_manager, + Difficulty::from_u64(3).unwrap(), + &key_manager, + ) + .await + .unwrap(); + assert_eq!(block_1_bob.height(), 1); + assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 1); + + // Alice attempts header sync, still on the genesys block, headers will be lagging + let event = sync_headers(&mut alice_state_machine, &alice_node, &bob_node).await; + // "Lagging" + match event.clone() { + StateEvent::HeadersSynchronized(val, sync_result) => { + assert_eq!(val.claimed_chain_metadata().height_of_longest_chain(), 1); + assert_eq!(val.claimed_chain_metadata().accumulated_difficulty(), 4); + assert_eq!(sync_result.headers_returned, 1); + assert_eq!(sync_result.fork_hash_index, 0); + if let HeaderSyncStatus::Lagging(val) = sync_result.header_sync_status { + assert_eq!(val.remote_tip_height, 1); + assert_eq!(val.best_block.height(), 0); + assert_eq!(val.reorg_steps_back, 0); + } else { + panic!("Should be 'Lagging'"); + } + }, + _ => panic!("Expected HeadersSynchronized event"), + } + + // Alice attempts header sync again, still on the genesys block, headers will be in sync + let event = sync_headers(&mut alice_state_machine, &alice_node, &bob_node).await; + // "InSyncOrAhead" + match event.clone() { + StateEvent::HeadersSynchronized(val, sync_result) => { + assert_eq!(val.claimed_chain_metadata().height_of_longest_chain(), 1); + assert_eq!(val.claimed_chain_metadata().accumulated_difficulty(), 4); + assert_eq!(sync_result.headers_returned, 0); + assert_eq!(sync_result.fork_hash_index, 0); + if let HeaderSyncStatus::InSyncOrAhead = sync_result.header_sync_status { + // Good + } else { + panic!("Should be 'InSyncOrAhead'"); + } + match sync_result.header_sync_status { + HeaderSyncStatus::InSyncOrAhead => {}, + HeaderSyncStatus::Lagging(_) => panic!("Should be lagging"), + } + }, + _ => panic!("Expected StateEvent::HeadersSynchronized event"), + } + + // Bob adds another block + let block_2_bob = append_block( + &bob_node.blockchain_db, + &block_1_bob, + vec![], + &consensus_manager, + Difficulty::from_u64(3).unwrap(), + &key_manager, + ) + .await + .unwrap(); + assert_eq!(block_2_bob.height(), 2); + + // Alice attempts header sync, still on the genesys block, headers will be lagging + let event = sync_headers(&mut alice_state_machine, &alice_node, &bob_node).await; + // "Lagging" + match event.clone() { + StateEvent::HeadersSynchronized(val, sync_result) => { + assert_eq!(val.claimed_chain_metadata().height_of_longest_chain(), 2); + assert_eq!(val.claimed_chain_metadata().accumulated_difficulty(), 7); + assert_eq!(sync_result.headers_returned, 1); + assert_eq!(sync_result.fork_hash_index, 0); + if let HeaderSyncStatus::Lagging(val) = sync_result.header_sync_status { + assert_eq!(val.remote_tip_height, 2); + assert_eq!(val.best_block.height(), 0); + assert_eq!(val.reorg_steps_back, 0); + } else { + panic!("Should be 'Lagging'"); + } + }, + _ => panic!("Expected StateEvent::HeadersSynchronized event"), + } + + // Alice adds 3 (different) blocks, with POW on par with bob's chain, but with greater height + let block_1_alice = append_block( + &alice_node.blockchain_db, + &initial_block, + vec![], + &consensus_manager, + Difficulty::from_u64(3).unwrap(), + &key_manager, + ) + .await + .unwrap(); + let block_2_alice = append_block( + &alice_node.blockchain_db, + &block_1_alice, + vec![], + &consensus_manager, + Difficulty::from_u64(2).unwrap(), + &key_manager, + ) + .await + .unwrap(); + // Alice adds another block, with POW on par with bob's chain, but with greater height + let block_3_alice = append_block( + &alice_node.blockchain_db, + &block_2_alice, + vec![], + &consensus_manager, + Difficulty::from_u64(1).unwrap(), + &key_manager, + ) + .await + .unwrap(); + assert_eq!(block_3_alice.height(), 3); + assert_eq!(block_3_alice.accumulated_data().total_accumulated_difficulty, 7); + assert_eq!( + block_3_alice.accumulated_data().total_accumulated_difficulty, + block_2_bob.accumulated_data().total_accumulated_difficulty + ); + + // Alice attempts header sync, headers will be lagging due to lesser POW + let event = sync_headers(&mut alice_state_machine, &alice_node, &bob_node).await; + // "Header sync not attempted, sync peer does not have better POW" + match event.clone() { + StateEvent::Continue => { + // Good + }, + _ => panic!("Expected StateEvent::Continue event"), + } + + // Bob adds more blocks and draws ahead of Alice + let block_3_bob = append_block( + &bob_node.blockchain_db, + &block_2_bob, + vec![], + &consensus_manager, + Difficulty::from_u64(3).unwrap(), + &key_manager, + ) + .await + .unwrap(); + let block_4_bob = append_block( + &bob_node.blockchain_db, + &block_3_bob, + vec![], + &consensus_manager, + Difficulty::from_u64(3).unwrap(), + &key_manager, + ) + .await + .unwrap(); + assert_eq!(block_4_bob.height(), 4); + + // Alice attempts header sync, on a higher chain with less POW, headers will be lagging with reorg steps + let event = sync_headers(&mut alice_state_machine, &alice_node, &bob_node).await; + // "Lagging" + match event { + StateEvent::HeadersSynchronized(val, sync_result) => { + assert_eq!(val.claimed_chain_metadata().height_of_longest_chain(), 4); + assert_eq!(val.claimed_chain_metadata().accumulated_difficulty(), 13); + assert_eq!(sync_result.headers_returned, 4); + assert_eq!(sync_result.fork_hash_index, 3); + if let HeaderSyncStatus::Lagging(val) = sync_result.header_sync_status { + assert_eq!(val.remote_tip_height, 4); + assert_eq!(val.best_block.height(), 3); + assert_eq!(val.reorg_steps_back, 3); + } else { + panic!("Should be 'Lagging'"); + } + }, + _ => panic!("Expected StateEvent::HeadersSynchronized event"), + } +} + +async fn sync_headers( + alice_state_machine: &mut BaseNodeStateMachine, + alice_node: &NodeInterfaces, + bob_node: &NodeInterfaces, +) -> StateEvent { + let mut header_sync = HeaderSyncState::new( + vec![SyncPeer::from(PeerChainMetadata::new( + bob_node.node_identity.node_id().clone(), + bob_node.blockchain_db.get_chain_metadata().unwrap(), + None, + ))], + alice_node.blockchain_db.get_chain_metadata().unwrap(), + ); + header_sync.next_event(alice_state_machine).await +} diff --git a/base_layer/core/tests/tests/mempool.rs b/base_layer/core/tests/tests/mempool.rs index 026ee2a0219..04d9133e290 100644 --- a/base_layer/core/tests/tests/mempool.rs +++ b/base_layer/core/tests/tests/mempool.rs @@ -69,7 +69,7 @@ use tari_core::{ }, }; use tari_key_manager::key_manager_service::KeyManagerInterface; -use tari_p2p::{services::liveness::LivenessConfig, tari_message::TariMessageType}; +use tari_p2p::{services::liveness::LivenessConfig, tari_message::TariMessageType, P2pConfig}; use tari_script::script; use tari_test_utils::async_assert_eventually; use tempfile::tempdir; @@ -1721,6 +1721,7 @@ async fn block_event_and_reorg_event_handling() { let (mut alice, mut bob, consensus_manager) = create_network_with_2_base_nodes_with_config( MempoolServiceConfig::default(), LivenessConfig::default(), + P2pConfig::default(), consensus_manager, temp_dir.path().to_str().unwrap(), ) diff --git a/base_layer/core/tests/tests/mod.rs b/base_layer/core/tests/tests/mod.rs index 7e69440d1fa..42f811db0f8 100644 --- a/base_layer/core/tests/tests/mod.rs +++ b/base_layer/core/tests/tests/mod.rs @@ -25,6 +25,7 @@ use tari_core::{blocks::ChainBlock, chain_storage::BlockAddResult}; mod async_db; mod base_node_rpc; mod block_validation; +mod header_sync; mod mempool; mod node_comms_interface; mod node_service; diff --git a/base_layer/core/tests/tests/node_state_machine.rs b/base_layer/core/tests/tests/node_state_machine.rs index ed4fd2d9508..08b42961844 100644 --- a/base_layer/core/tests/tests/node_state_machine.rs +++ b/base_layer/core/tests/tests/node_state_machine.rs @@ -43,7 +43,7 @@ use tari_core::{ transactions::test_helpers::create_test_core_key_manager_with_memory_db, validation::mocks::MockValidator, }; -use tari_p2p::services::liveness::config::LivenessConfig; +use tari_p2p::{services::liveness::config::LivenessConfig, P2pConfig}; use tari_shutdown::Shutdown; use tari_test_utils::unpack_enum; use tari_utilities::ByteArray; @@ -81,6 +81,7 @@ async fn test_listening_lagging() { auto_ping_interval: Some(Duration::from_millis(100)), ..Default::default() }, + P2pConfig::default(), consensus_manager, temp_dir.path().to_str().unwrap(), ) diff --git a/base_layer/p2p/src/initialization.rs b/base_layer/p2p/src/initialization.rs index f87eed31b0d..ac9ab9b6536 100644 --- a/base_layer/p2p/src/initialization.rs +++ b/base_layer/p2p/src/initialization.rs @@ -135,7 +135,7 @@ pub async fn initialize_local_test_comms>( discovery_request_timeout: Duration, seed_peers: Vec, shutdown_signal: ShutdownSignal, -) -> Result<(CommsNode, Dht, MessagingEventSender), CommsInitializationError> { +) -> Result<(UnspawnedCommsNode, Dht, MessagingEventSender), CommsInitializationError> { let peer_database_name = { let mut rng = thread_rng(); iter::repeat(()) @@ -201,17 +201,10 @@ pub async fn initialize_local_test_comms>( ) .build(); - let comms = comms - .add_protocol_extension( - MessagingProtocolExtension::new(MESSAGING_PROTOCOL_ID.clone(), event_sender.clone(), pipeline) - .enable_message_received_event(), - ) - .spawn_with_transport(MemoryTransport) - .await?; - - comms - .node_identity() - .add_public_address(comms.listening_address().clone()); + let comms = comms.add_protocol_extension( + MessagingProtocolExtension::new(MESSAGING_PROTOCOL_ID.clone(), event_sender.clone(), pipeline) + .enable_message_received_event(), + ); Ok((comms, dht, event_sender)) } @@ -424,7 +417,7 @@ fn acquire_exclusive_file_lock(db_path: &Path) -> Result, diff --git a/base_layer/p2p/tests/support/comms_and_services.rs b/base_layer/p2p/tests/support/comms_and_services.rs index 490fcaece46..4bd2dca73f8 100644 --- a/base_layer/p2p/tests/support/comms_and_services.rs +++ b/base_layer/p2p/tests/support/comms_and_services.rs @@ -22,7 +22,12 @@ use std::{sync::Arc, time::Duration}; -use tari_comms::{peer_manager::NodeIdentity, protocol::messaging::MessagingEventSender, CommsNode}; +use tari_comms::{ + peer_manager::NodeIdentity, + protocol::messaging::MessagingEventSender, + transports::MemoryTransport, + CommsNode, +}; use tari_comms_dht::Dht; use tari_p2p::{comms_connector::InboundDomainConnector, initialization::initialize_local_test_comms}; use tari_shutdown::ShutdownSignal; @@ -46,5 +51,11 @@ pub async fn setup_comms_services( .await .unwrap(); + let comms = comms.spawn_with_transport(MemoryTransport).await.unwrap(); + // Set the public address for tests + comms + .node_identity() + .add_public_address(comms.listening_address().clone()); + (comms, dht, messaging_events) } diff --git a/base_layer/wallet/tests/support/comms_and_services.rs b/base_layer/wallet/tests/support/comms_and_services.rs index 296d1175074..b6c7344f0e0 100644 --- a/base_layer/wallet/tests/support/comms_and_services.rs +++ b/base_layer/wallet/tests/support/comms_and_services.rs @@ -26,6 +26,7 @@ use tari_comms::{ message::MessageTag, net_address::MultiaddressesWithStats, peer_manager::{NodeId, NodeIdentity, Peer, PeerFeatures, PeerFlags}, + transports::MemoryTransport, types::CommsPublicKey, CommsNode, }; @@ -57,6 +58,12 @@ pub async fn setup_comms_services( .await .unwrap(); + let comms = comms.spawn_with_transport(MemoryTransport).await.unwrap(); + // Set the public address for tests + comms + .node_identity() + .add_public_address(comms.listening_address().clone()); + (comms, dht) }