diff --git a/Cargo.lock b/Cargo.lock index 513a18f56c2..fa07d1578ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5710,6 +5710,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tower", "tracing", "uint", "zeroize", diff --git a/base_layer/core/Cargo.toml b/base_layer/core/Cargo.toml index b9d212d1e8c..7f66301c8ee 100644 --- a/base_layer/core/Cargo.toml +++ b/base_layer/core/Cargo.toml @@ -76,6 +76,7 @@ tokio = { version = "1.23", features = ["time", "sync", "macros"] } tracing = "0.1.26" uint = { version = "0.9", default-features = false } zeroize = "1" +tower = "0.4.11" [dev-dependencies] tari_p2p = { path = "../../base_layer/p2p", features = ["test-mocks"] } diff --git a/base_layer/core/src/base_node/state_machine_service/state_machine.rs b/base_layer/core/src/base_node/state_machine_service/state_machine.rs index acebad629a2..4578ee60860 100644 --- a/base_layer/core/src/base_node/state_machine_service/state_machine.rs +++ b/base_layer/core/src/base_node/state_machine_service/state_machine.rs @@ -153,7 +153,7 @@ impl BaseNodeStateMachine { db.set_disable_add_block_flag(); HeaderSync(HeaderSyncState::new(sync_peers, local_metadata)) }, - (HeaderSync(s), HeaderSyncFailed) => { + (HeaderSync(s), HeaderSyncFailed(_err)) => { db.clear_disable_add_block_flag(); Waiting(s.into()) }, @@ -161,7 +161,7 @@ impl BaseNodeStateMachine { db.clear_disable_add_block_flag(); Listening(s.into()) }, - (HeaderSync(s), HeadersSynchronized(_)) => DecideNextSync(s.into()), + (HeaderSync(s), HeadersSynchronized(..)) => DecideNextSync(s.into()), (DecideNextSync(_), ProceedToHorizonSync(peers)) => HorizonStateSync(peers.into()), (DecideNextSync(s), Continue) => { diff --git a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs index 2fea428f714..5f0ef9c9f55 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs @@ -37,7 +37,7 @@ use crate::base_node::{ Starting, Waiting, }, - sync::{HorizonSyncInfo, SyncPeer}, + sync::{AttemptSyncResult, HorizonSyncInfo, SyncPeer}, }; #[derive(Debug)] @@ -57,8 +57,8 @@ pub enum BaseNodeState { #[derive(Debug, Clone, PartialEq)] pub enum StateEvent { Initialized, - HeadersSynchronized(SyncPeer), - HeaderSyncFailed, + HeadersSynchronized(SyncPeer, AttemptSyncResult), + HeaderSyncFailed(String), ProceedToHorizonSync(Vec), ProceedToBlockSync(Vec), HorizonStateSynchronized, @@ -145,8 +145,8 @@ impl Display for StateEvent { match self { Initialized => write!(f, "Initialized"), BlocksSynchronized => write!(f, "Synchronised Blocks"), - HeadersSynchronized(peer) => write!(f, "Headers Synchronized from peer `{}`", peer), - HeaderSyncFailed => write!(f, "Header Synchronization Failed"), + HeadersSynchronized(peer, result) => write!(f, "Headers Synchronized from peer `{}` ({:?})", peer, result), + HeaderSyncFailed(err) => write!(f, "Header Synchronization Failed ({})", err), ProceedToHorizonSync(_) => write!(f, "Proceed to horizon sync"), ProceedToBlockSync(_) => write!(f, "Proceed to block sync"), HorizonStateSynchronized => write!(f, "Horizon State Synchronized"), diff --git a/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs b/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs index d1e07e3e041..d13b19198e6 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/header_sync.rs @@ -36,7 +36,6 @@ use crate::{ }, chain_storage::BlockchainBackend, }; - const LOG_TARGET: &str = "c::bn::header_sync"; #[derive(Clone, Debug)] @@ -78,6 +77,7 @@ impl HeaderSyncState { } // converting u64 to i64 is okay as the future time limit is the hundreds so way below u32 even + #[allow(clippy::too_many_lines)] #[allow(clippy::cast_possible_wrap)] pub async fn next_event( &mut self, @@ -158,7 +158,7 @@ impl HeaderSyncState { let mut mdc = vec![]; log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned()))); match synchronizer.synchronize().await { - Ok(sync_peer) => { + Ok((sync_peer, sync_result)) => { log_mdc::extend(mdc); info!( target: LOG_TARGET, @@ -174,9 +174,10 @@ impl HeaderSyncState { } } self.is_synced = true; - StateEvent::HeadersSynchronized(sync_peer) + StateEvent::HeadersSynchronized(sync_peer, sync_result) }, Err(err) => { + println!("HeaderSyncState::next_event - {}", err); let _ignore = shared.status_event_sender.send(StatusInfo { bootstrapped, state_info: StateInfo::SyncFailed("HeaderSyncFailed".to_string()), @@ -193,7 +194,7 @@ impl HeaderSyncState { _ => { log_mdc::extend(mdc); debug!(target: LOG_TARGET, "Header sync failed: {}", err); - StateEvent::HeaderSyncFailed + StateEvent::HeaderSyncFailed(err.to_string()) }, } }, diff --git a/base_layer/core/src/base_node/sync/header_sync/mod.rs b/base_layer/core/src/base_node/sync/header_sync/mod.rs index d71c092a525..da4f5676645 100644 --- a/base_layer/core/src/base_node/sync/header_sync/mod.rs +++ b/base_layer/core/src/base_node/sync/header_sync/mod.rs @@ -26,4 +26,4 @@ pub use error::BlockHeaderSyncError; mod validator; mod synchronizer; -pub use synchronizer::HeaderSynchronizer; +pub use synchronizer::{AttemptSyncResult, HeaderSyncStatus, HeaderSynchronizer}; diff --git a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs index 7ed245666a2..c4f395be946 100644 --- a/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs +++ b/base_layer/core/src/base_node/sync/header_sync/synchronizer.rs @@ -106,7 +106,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { self.hooks.add_on_rewind_hook(hook); } - pub async fn synchronize(&mut self) -> Result { + pub async fn synchronize(&mut self) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> { debug!(target: LOG_TARGET, "Starting header sync.",); info!( @@ -118,7 +118,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { let mut latency_increases_counter = 0; loop { match self.try_sync_from_all_peers(max_latency).await { - Ok(sync_peer) => break Ok(sync_peer), + Ok((peer, sync_result)) => break Ok((peer, sync_result)), Err(err @ BlockHeaderSyncError::AllSyncPeersExceedLatency) => { // If we have few sync peers, throw this out to be retried later if self.sync_peers.len() < 2 { @@ -136,7 +136,10 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { } #[allow(clippy::too_many_lines)] - pub async fn try_sync_from_all_peers(&mut self, max_latency: Duration) -> Result { + pub async fn try_sync_from_all_peers( + &mut self, + max_latency: Duration, + ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> { let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::>(); info!( target: LOG_TARGET, @@ -146,7 +149,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { let mut latency_counter = 0usize; for node_id in sync_peer_node_ids { match self.connect_and_attempt_sync(&node_id, max_latency).await { - Ok(peer) => return Ok(peer), + Ok((peer, sync_result)) => return Ok((peer, sync_result)), Err(err) => { let ban_reason = BlockHeaderSyncError::get_ban_reason( &err, @@ -181,7 +184,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { &mut self, node_id: &NodeId, max_latency: Duration, - ) -> Result { + ) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> { let peer_index = self .get_sync_peer_index(node_id) .ok_or(BlockHeaderSyncError::PeerNotFound)?; @@ -215,8 +218,8 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { debug!(target: LOG_TARGET, "Sync peer latency is {:.2?}", latency); let sync_peer = self.sync_peers[peer_index].clone(); - self.attempt_sync(&sync_peer, client, max_latency).await?; - Ok(sync_peer) + let sync_result = self.attempt_sync(&sync_peer, client, max_latency).await?; + Ok((sync_peer, sync_result)) } async fn dial_sync_peer(&self, node_id: &NodeId) -> Result { @@ -237,7 +240,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { sync_peer: &SyncPeer, mut client: rpc::BaseNodeSyncRpcClient, max_latency: Duration, - ) -> Result<(), BlockHeaderSyncError> { + ) -> Result { let latency = client.get_last_request_latency(); debug!( target: LOG_TARGET, @@ -268,10 +271,10 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { .find_chain_split(sync_peer.node_id(), &mut client, NUM_INITIAL_HEADERS_TO_REQUEST as u64) .await?; let header_sync_status = self - .determine_sync_status(&sync_peer.to_string(), best_header, best_block, peer_response) + .determine_sync_status(&sync_peer.to_string(), best_header, best_block, peer_response.clone()) .await?; - match header_sync_status { + match header_sync_status.clone() { HeaderSyncStatus::InSyncOrAhead => { if best_block_height < best_header_height { debug!( @@ -281,7 +284,11 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { best_block_height ); - Ok(()) + Ok(AttemptSyncResult { + headers_returned: peer_response.headers.len() as u64, + fork_hash_index: peer_response.fork_hash_index, + header_sync_status, + }) } else { // We will only attempt sync if the our accumulated difficulty is less than the peer's claimed // accumulated difficulty, thus this is adverse behaviour form the peer. @@ -313,7 +320,11 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { ); self.synchronize_headers(sync_peer.clone(), &mut client, *split_info, max_latency) .await?; - Ok(()) + Ok(AttemptSyncResult { + headers_returned: peer_response.headers.len() as u64, + fork_hash_index: peer_response.fork_hash_index, + header_sync_status, + }) }, } } @@ -448,7 +459,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { .map(BlockHeader::try_from) .collect::, _>>() .map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?; - let num_new_headers = headers.len(); + let num_new_headers = headers.len(); // Required fro later use, no 'Copy' trait on 'BlockHeader' // NOTE: We can trust that the header associated with this hash exists because `block_hashes` was supplied by // this node. Bounds checking for fork_hash_index has been done above. @@ -786,6 +797,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> { } } +#[derive(Debug, Clone)] struct PeerChainSplitResponse { block_hashes: Vec, reorg_steps_back: u64, @@ -794,14 +806,32 @@ struct PeerChainSplitResponse { remote_tip_height: u64, } -struct ChainSplitInfo { - best_block: ChainHeader, - remote_tip_height: u64, - reorg_steps_back: u64, - chain_split_hash: HashOutput, +/// Information about the chain split from the remote node. +#[derive(Debug, Clone, PartialEq)] +pub struct ChainSplitInfo { + /// The best block on the local chain. + pub best_block: ChainHeader, + /// The height of the remote node's tip. + pub remote_tip_height: u64, + /// The number of blocks to reorg back to the fork. + pub reorg_steps_back: u64, + /// The hash of the block at the fork. + pub chain_split_hash: HashOutput, +} + +/// The result of an attempt to synchronize headers with a peer. +#[derive(Debug, Clone, PartialEq)] +pub struct AttemptSyncResult { + /// The number of headers that were returned. + pub headers_returned: u64, + /// The fork hash index of the remote peer. + pub fork_hash_index: u64, + /// The header sync status. + pub header_sync_status: HeaderSyncStatus, } -enum HeaderSyncStatus { +#[derive(Debug, Clone, PartialEq)] +pub enum HeaderSyncStatus { /// Local and remote node are in sync or ahead InSyncOrAhead, /// Local node is lagging behind remote node diff --git a/base_layer/core/src/base_node/sync/mod.rs b/base_layer/core/src/base_node/sync/mod.rs index 2193bb6482f..1c5ef806910 100644 --- a/base_layer/core/src/base_node/sync/mod.rs +++ b/base_layer/core/src/base_node/sync/mod.rs @@ -36,7 +36,7 @@ pub use block_sync::{BlockSyncError, BlockSynchronizer}; #[cfg(feature = "base_node")] mod header_sync; #[cfg(feature = "base_node")] -pub use header_sync::{BlockHeaderSyncError, HeaderSynchronizer}; +pub use header_sync::{AttemptSyncResult, BlockHeaderSyncError, HeaderSyncStatus, HeaderSynchronizer}; #[cfg(feature = "base_node")] mod horizon_state_sync; diff --git a/base_layer/core/tests/helpers/nodes.rs b/base_layer/core/tests/helpers/nodes.rs index 908852faa20..c17f45d3769 100644 --- a/base_layer/core/tests/helpers/nodes.rs +++ b/base_layer/core/tests/helpers/nodes.rs @@ -26,12 +26,14 @@ use rand::rngs::OsRng; use tari_common::configuration::Network; use tari_comms::{ peer_manager::{NodeIdentity, PeerFeatures}, - protocol::messaging::MessagingEventSender, + protocol::{messaging::MessagingEventSender, rpc::RpcServer}, transports::MemoryTransport, CommsNode, + UnspawnedCommsNode, }; use tari_comms_dht::{outbound::OutboundMessageRequester, Dht}; use tari_core::{ + base_node, base_node::{ chain_metadata_service::{ChainMetadataHandle, ChainMetadataServiceInitializer}, comms_interface::OutboundNodeCommsInterface, @@ -63,6 +65,7 @@ use tari_p2p::{ comms_connector::{pubsub_connector, InboundDomainConnector}, initialization::initialize_local_test_comms, services::liveness::{config::LivenessConfig, LivenessHandle, LivenessInitializer}, + P2pConfig, }; use tari_service_framework::{RegisterHandle, StackBuilder}; use tari_shutdown::Shutdown; @@ -105,6 +108,7 @@ pub struct BaseNodeBuilder { mempool_config: Option, mempool_service_config: Option, liveness_service_config: Option, + p2p_config: Option, validators: Option>, consensus_manager: Option, network: NetworkConsensus, @@ -120,6 +124,7 @@ impl BaseNodeBuilder { mempool_config: None, mempool_service_config: None, liveness_service_config: None, + p2p_config: None, validators: None, consensus_manager: None, network, @@ -156,6 +161,12 @@ impl BaseNodeBuilder { self } + /// Set the p2p configuration + pub fn with_p2p_config(mut self, config: P2pConfig) -> Self { + self.p2p_config = Some(config); + self + } + pub fn with_validators( mut self, block: impl CandidateBlockValidator + 'static, @@ -203,6 +214,7 @@ impl BaseNodeBuilder { mempool, consensus_manager.clone(), self.liveness_service_config.unwrap_or_default(), + self.p2p_config.unwrap_or_default(), data_path, ) .await; @@ -251,6 +263,7 @@ pub async fn create_network_with_2_base_nodes(data_path: &str) -> (NodeInterface pub async fn create_network_with_2_base_nodes_with_config>( mempool_service_config: MempoolServiceConfig, liveness_service_config: LivenessConfig, + p2p_config: P2pConfig, consensus_manager: ConsensusManager, data_path: P, ) -> (NodeInterfaces, NodeInterfaces, ConsensusManager) { @@ -261,6 +274,7 @@ pub async fn create_network_with_2_base_nodes_with_config>( .with_node_identity(alice_node_identity.clone()) .with_mempool_service_config(mempool_service_config.clone()) .with_liveness_service_config(liveness_service_config.clone()) + .with_p2p_config(p2p_config.clone()) .with_consensus_manager(consensus_manager) .start(data_path.as_ref().join("alice").as_os_str().to_str().unwrap()) .await; @@ -269,6 +283,7 @@ pub async fn create_network_with_2_base_nodes_with_config>( .with_peers(vec![alice_node_identity]) .with_mempool_service_config(mempool_service_config) .with_liveness_service_config(liveness_service_config) + .with_p2p_config(p2p_config.clone()) .with_consensus_manager(consensus_manager) .start(data_path.as_ref().join("bob").as_os_str().to_str().unwrap()) .await; @@ -360,9 +375,9 @@ async fn setup_comms_services( peers: Vec>, publisher: InboundDomainConnector, data_path: &str, -) -> (CommsNode, Dht, MessagingEventSender, Shutdown) { + shutdown: &Shutdown, +) -> (UnspawnedCommsNode, Dht, MessagingEventSender) { let peers = peers.into_iter().map(|p| p.to_peer()).collect(); - let shutdown = Shutdown::new(); let (comms, dht, messaging_events) = initialize_local_test_comms( node_identity, @@ -375,7 +390,7 @@ async fn setup_comms_services( .await .unwrap(); - (comms, dht, messaging_events, shutdown) + (comms, dht, messaging_events) } // Helper function for starting the services of the Base node. @@ -386,12 +401,15 @@ async fn setup_base_node_services( mempool: Mempool, consensus_manager: ConsensusManager, liveness_service_config: LivenessConfig, + p2p_config: P2pConfig, data_path: &str, ) -> NodeInterfaces { let (publisher, subscription_factory) = pubsub_connector(100); let subscription_factory = Arc::new(subscription_factory); - let (comms, dht, messaging_events, shutdown) = - setup_comms_services(node_identity.clone(), peers, publisher, data_path).await; + let shutdown = Shutdown::new(); + + let (comms, dht, messaging_events) = + setup_comms_services(node_identity.clone(), peers, publisher.clone(), data_path, &shutdown).await; let mock_state_machine = MockBaseNodeStateMachine::new(); let randomx_factory = RandomXFactory::new(2); @@ -417,6 +435,25 @@ async fn setup_base_node_services( .await .unwrap(); + let base_node_service = handles.expect_handle::(); + let rpc_server = RpcServer::builder() + .with_maximum_simultaneous_sessions(p2p_config.rpc_max_simultaneous_sessions) + .with_maximum_sessions_per_client(p2p_config.rpc_max_sessions_per_peer) + .finish(); + let rpc_server = rpc_server.add_service(base_node::create_base_node_sync_rpc_service( + blockchain_db.clone().into(), + base_node_service, + )); + let comms = comms + .add_protocol_extension(rpc_server) + .spawn_with_transport(MemoryTransport) + .await + .unwrap(); + // Set the public address for tests + comms + .node_identity() + .add_public_address(comms.listening_address().clone()); + let outbound_nci = handles.expect_handle::(); let local_nci = handles.expect_handle::(); let outbound_mp_interface = handles.expect_handle::(); diff --git a/base_layer/core/tests/tests/header_sync.rs b/base_layer/core/tests/tests/header_sync.rs new file mode 100644 index 00000000000..d5fa7c802a0 --- /dev/null +++ b/base_layer/core/tests/tests/header_sync.rs @@ -0,0 +1,299 @@ +// Copyright 2022. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::time::Duration; + +use tari_common::configuration::Network; +use tari_core::{ + base_node::{ + chain_metadata_service::PeerChainMetadata, + state_machine_service::{ + states::{HeaderSyncState, StateEvent, StatusInfo}, + BaseNodeStateMachine, + BaseNodeStateMachineConfig, + }, + sync::{HeaderSyncStatus, SyncPeer}, + SyncValidators, + }, + consensus::{ConsensusConstantsBuilder, ConsensusManagerBuilder}, + mempool::MempoolServiceConfig, + proof_of_work::{randomx_factory::RandomXFactory, Difficulty}, + test_helpers::blockchain::TempDatabase, + transactions::test_helpers::create_test_core_key_manager_with_memory_db, + validation::mocks::MockValidator, +}; +use tari_p2p::{services::liveness::config::LivenessConfig, P2pConfig}; +use tari_shutdown::Shutdown; +use tempfile::tempdir; +use tokio::sync::{broadcast, watch}; + +use crate::helpers::{ + block_builders::{append_block, create_genesis_block}, + nodes::{create_network_with_2_base_nodes_with_config, NodeInterfaces}, +}; + +static EMISSION: [u64; 2] = [10, 10]; + +#[allow(clippy::too_many_lines)] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_header_sync() { + // Create the network with alice node and bob node + let network = Network::LocalNet; + let temp_dir = tempdir().unwrap(); + let key_manager = create_test_core_key_manager_with_memory_db(); + let consensus_constants = ConsensusConstantsBuilder::new(network) + .with_emission_amounts(100_000_000.into(), &EMISSION, 100.into()) + .build(); + let (initial_block, _) = create_genesis_block(&consensus_constants, &key_manager).await; + let consensus_manager = ConsensusManagerBuilder::new(network) + .add_consensus_constants(consensus_constants) + .with_block(initial_block.clone()) + .build() + .unwrap(); + let (alice_node, bob_node, consensus_manager) = create_network_with_2_base_nodes_with_config( + MempoolServiceConfig::default(), + LivenessConfig { + auto_ping_interval: Some(Duration::from_millis(100)), + ..Default::default() + }, + P2pConfig::default(), + consensus_manager, + temp_dir.path().to_str().unwrap(), + ) + .await; + let shutdown = Shutdown::new(); + let (state_change_event_publisher, _) = broadcast::channel(10); + let (status_event_sender, _status_event_receiver) = watch::channel(StatusInfo::new()); + + // Alice needs a state machine for header sync + let mut alice_state_machine = BaseNodeStateMachine::new( + alice_node.blockchain_db.clone().into(), + alice_node.local_nci.clone(), + alice_node.comms.connectivity(), + alice_node.comms.peer_manager(), + alice_node.chain_metadata_handle.get_event_stream(), + BaseNodeStateMachineConfig::default(), + SyncValidators::new(MockValidator::new(true), MockValidator::new(true)), + status_event_sender, + state_change_event_publisher, + RandomXFactory::default(), + consensus_manager.clone(), + shutdown.to_signal(), + ); + + // Add 1 block to bob's chain + let block_1_bob = append_block( + &bob_node.blockchain_db, + &initial_block, + vec![], + &consensus_manager, + Difficulty::from_u64(3).unwrap(), + &key_manager, + ) + .await + .unwrap(); + assert_eq!(block_1_bob.height(), 1); + assert_eq!(bob_node.blockchain_db.get_height().unwrap(), 1); + + // Alice attempts header sync, still on the genesys block, headers will be lagging + let event = sync_headers(&mut alice_state_machine, &alice_node, &bob_node).await; + // "Lagging" + match event.clone() { + StateEvent::HeadersSynchronized(val, sync_result) => { + assert_eq!(val.claimed_chain_metadata().height_of_longest_chain(), 1); + assert_eq!(val.claimed_chain_metadata().accumulated_difficulty(), 4); + assert_eq!(sync_result.headers_returned, 1); + assert_eq!(sync_result.fork_hash_index, 0); + if let HeaderSyncStatus::Lagging(val) = sync_result.header_sync_status { + assert_eq!(val.remote_tip_height, 1); + assert_eq!(val.best_block.height(), 0); + assert_eq!(val.reorg_steps_back, 0); + } else { + panic!("Should be 'Lagging'"); + } + }, + _ => panic!("Expected HeadersSynchronized event"), + } + + // Alice attempts header sync again, still on the genesys block, headers will be in sync + let event = sync_headers(&mut alice_state_machine, &alice_node, &bob_node).await; + // "InSyncOrAhead" + match event.clone() { + StateEvent::HeadersSynchronized(val, sync_result) => { + assert_eq!(val.claimed_chain_metadata().height_of_longest_chain(), 1); + assert_eq!(val.claimed_chain_metadata().accumulated_difficulty(), 4); + assert_eq!(sync_result.headers_returned, 0); + assert_eq!(sync_result.fork_hash_index, 0); + if let HeaderSyncStatus::InSyncOrAhead = sync_result.header_sync_status { + // Good + } else { + panic!("Should be 'InSyncOrAhead'"); + } + match sync_result.header_sync_status { + HeaderSyncStatus::InSyncOrAhead => {}, + HeaderSyncStatus::Lagging(_) => panic!("Should be lagging"), + } + }, + _ => panic!("Expected StateEvent::HeadersSynchronized event"), + } + + // Bob adds another block + let block_2_bob = append_block( + &bob_node.blockchain_db, + &block_1_bob, + vec![], + &consensus_manager, + Difficulty::from_u64(3).unwrap(), + &key_manager, + ) + .await + .unwrap(); + assert_eq!(block_2_bob.height(), 2); + + // Alice attempts header sync, still on the genesys block, headers will be lagging + let event = sync_headers(&mut alice_state_machine, &alice_node, &bob_node).await; + // "Lagging" + match event.clone() { + StateEvent::HeadersSynchronized(val, sync_result) => { + assert_eq!(val.claimed_chain_metadata().height_of_longest_chain(), 2); + assert_eq!(val.claimed_chain_metadata().accumulated_difficulty(), 7); + assert_eq!(sync_result.headers_returned, 1); + assert_eq!(sync_result.fork_hash_index, 0); + if let HeaderSyncStatus::Lagging(val) = sync_result.header_sync_status { + assert_eq!(val.remote_tip_height, 2); + assert_eq!(val.best_block.height(), 0); + assert_eq!(val.reorg_steps_back, 0); + } else { + panic!("Should be 'Lagging'"); + } + }, + _ => panic!("Expected StateEvent::HeadersSynchronized event"), + } + + // Alice adds 3 (different) blocks, with POW on par with bob's chain, but with greater height + let block_1_alice = append_block( + &alice_node.blockchain_db, + &initial_block, + vec![], + &consensus_manager, + Difficulty::from_u64(3).unwrap(), + &key_manager, + ) + .await + .unwrap(); + let block_2_alice = append_block( + &alice_node.blockchain_db, + &block_1_alice, + vec![], + &consensus_manager, + Difficulty::from_u64(2).unwrap(), + &key_manager, + ) + .await + .unwrap(); + // Alice adds another block, with POW on par with bob's chain, but with greater height + let block_3_alice = append_block( + &alice_node.blockchain_db, + &block_2_alice, + vec![], + &consensus_manager, + Difficulty::from_u64(1).unwrap(), + &key_manager, + ) + .await + .unwrap(); + assert_eq!(block_3_alice.height(), 3); + assert_eq!(block_3_alice.accumulated_data().total_accumulated_difficulty, 7); + assert_eq!( + block_3_alice.accumulated_data().total_accumulated_difficulty, + block_2_bob.accumulated_data().total_accumulated_difficulty + ); + + // Alice attempts header sync, headers will be lagging due to lesser POW + let event = sync_headers(&mut alice_state_machine, &alice_node, &bob_node).await; + // "Header sync not attempted, sync peer does not have better POW" + match event.clone() { + StateEvent::Continue => { + // Good + }, + _ => panic!("Expected StateEvent::Continue event"), + } + + // Bob adds more blocks and draws ahead of Alice + let block_3_bob = append_block( + &bob_node.blockchain_db, + &block_2_bob, + vec![], + &consensus_manager, + Difficulty::from_u64(3).unwrap(), + &key_manager, + ) + .await + .unwrap(); + let block_4_bob = append_block( + &bob_node.blockchain_db, + &block_3_bob, + vec![], + &consensus_manager, + Difficulty::from_u64(3).unwrap(), + &key_manager, + ) + .await + .unwrap(); + assert_eq!(block_4_bob.height(), 4); + + // Alice attempts header sync, on a higher chain with less POW, headers will be lagging with reorg steps + let event = sync_headers(&mut alice_state_machine, &alice_node, &bob_node).await; + // "Lagging" + match event { + StateEvent::HeadersSynchronized(val, sync_result) => { + assert_eq!(val.claimed_chain_metadata().height_of_longest_chain(), 4); + assert_eq!(val.claimed_chain_metadata().accumulated_difficulty(), 13); + assert_eq!(sync_result.headers_returned, 4); + assert_eq!(sync_result.fork_hash_index, 3); + if let HeaderSyncStatus::Lagging(val) = sync_result.header_sync_status { + assert_eq!(val.remote_tip_height, 4); + assert_eq!(val.best_block.height(), 3); + assert_eq!(val.reorg_steps_back, 3); + } else { + panic!("Should be 'Lagging'"); + } + }, + _ => panic!("Expected StateEvent::HeadersSynchronized event"), + } +} + +async fn sync_headers( + alice_state_machine: &mut BaseNodeStateMachine, + alice_node: &NodeInterfaces, + bob_node: &NodeInterfaces, +) -> StateEvent { + let mut header_sync = HeaderSyncState::new( + vec![SyncPeer::from(PeerChainMetadata::new( + bob_node.node_identity.node_id().clone(), + bob_node.blockchain_db.get_chain_metadata().unwrap(), + None, + ))], + alice_node.blockchain_db.get_chain_metadata().unwrap(), + ); + header_sync.next_event(alice_state_machine).await +} diff --git a/base_layer/core/tests/tests/mempool.rs b/base_layer/core/tests/tests/mempool.rs index 026ee2a0219..04d9133e290 100644 --- a/base_layer/core/tests/tests/mempool.rs +++ b/base_layer/core/tests/tests/mempool.rs @@ -69,7 +69,7 @@ use tari_core::{ }, }; use tari_key_manager::key_manager_service::KeyManagerInterface; -use tari_p2p::{services::liveness::LivenessConfig, tari_message::TariMessageType}; +use tari_p2p::{services::liveness::LivenessConfig, tari_message::TariMessageType, P2pConfig}; use tari_script::script; use tari_test_utils::async_assert_eventually; use tempfile::tempdir; @@ -1721,6 +1721,7 @@ async fn block_event_and_reorg_event_handling() { let (mut alice, mut bob, consensus_manager) = create_network_with_2_base_nodes_with_config( MempoolServiceConfig::default(), LivenessConfig::default(), + P2pConfig::default(), consensus_manager, temp_dir.path().to_str().unwrap(), ) diff --git a/base_layer/core/tests/tests/mod.rs b/base_layer/core/tests/tests/mod.rs index 7e69440d1fa..42f811db0f8 100644 --- a/base_layer/core/tests/tests/mod.rs +++ b/base_layer/core/tests/tests/mod.rs @@ -25,6 +25,7 @@ use tari_core::{blocks::ChainBlock, chain_storage::BlockAddResult}; mod async_db; mod base_node_rpc; mod block_validation; +mod header_sync; mod mempool; mod node_comms_interface; mod node_service; diff --git a/base_layer/core/tests/tests/node_state_machine.rs b/base_layer/core/tests/tests/node_state_machine.rs index ed4fd2d9508..08b42961844 100644 --- a/base_layer/core/tests/tests/node_state_machine.rs +++ b/base_layer/core/tests/tests/node_state_machine.rs @@ -43,7 +43,7 @@ use tari_core::{ transactions::test_helpers::create_test_core_key_manager_with_memory_db, validation::mocks::MockValidator, }; -use tari_p2p::services::liveness::config::LivenessConfig; +use tari_p2p::{services::liveness::config::LivenessConfig, P2pConfig}; use tari_shutdown::Shutdown; use tari_test_utils::unpack_enum; use tari_utilities::ByteArray; @@ -81,6 +81,7 @@ async fn test_listening_lagging() { auto_ping_interval: Some(Duration::from_millis(100)), ..Default::default() }, + P2pConfig::default(), consensus_manager, temp_dir.path().to_str().unwrap(), ) diff --git a/base_layer/p2p/src/initialization.rs b/base_layer/p2p/src/initialization.rs index f87eed31b0d..ac9ab9b6536 100644 --- a/base_layer/p2p/src/initialization.rs +++ b/base_layer/p2p/src/initialization.rs @@ -135,7 +135,7 @@ pub async fn initialize_local_test_comms>( discovery_request_timeout: Duration, seed_peers: Vec, shutdown_signal: ShutdownSignal, -) -> Result<(CommsNode, Dht, MessagingEventSender), CommsInitializationError> { +) -> Result<(UnspawnedCommsNode, Dht, MessagingEventSender), CommsInitializationError> { let peer_database_name = { let mut rng = thread_rng(); iter::repeat(()) @@ -201,17 +201,10 @@ pub async fn initialize_local_test_comms>( ) .build(); - let comms = comms - .add_protocol_extension( - MessagingProtocolExtension::new(MESSAGING_PROTOCOL_ID.clone(), event_sender.clone(), pipeline) - .enable_message_received_event(), - ) - .spawn_with_transport(MemoryTransport) - .await?; - - comms - .node_identity() - .add_public_address(comms.listening_address().clone()); + let comms = comms.add_protocol_extension( + MessagingProtocolExtension::new(MESSAGING_PROTOCOL_ID.clone(), event_sender.clone(), pipeline) + .enable_message_received_event(), + ); Ok((comms, dht, event_sender)) } @@ -424,7 +417,7 @@ fn acquire_exclusive_file_lock(db_path: &Path) -> Result, diff --git a/base_layer/p2p/tests/support/comms_and_services.rs b/base_layer/p2p/tests/support/comms_and_services.rs index 490fcaece46..4bd2dca73f8 100644 --- a/base_layer/p2p/tests/support/comms_and_services.rs +++ b/base_layer/p2p/tests/support/comms_and_services.rs @@ -22,7 +22,12 @@ use std::{sync::Arc, time::Duration}; -use tari_comms::{peer_manager::NodeIdentity, protocol::messaging::MessagingEventSender, CommsNode}; +use tari_comms::{ + peer_manager::NodeIdentity, + protocol::messaging::MessagingEventSender, + transports::MemoryTransport, + CommsNode, +}; use tari_comms_dht::Dht; use tari_p2p::{comms_connector::InboundDomainConnector, initialization::initialize_local_test_comms}; use tari_shutdown::ShutdownSignal; @@ -46,5 +51,11 @@ pub async fn setup_comms_services( .await .unwrap(); + let comms = comms.spawn_with_transport(MemoryTransport).await.unwrap(); + // Set the public address for tests + comms + .node_identity() + .add_public_address(comms.listening_address().clone()); + (comms, dht, messaging_events) } diff --git a/base_layer/wallet/tests/support/comms_and_services.rs b/base_layer/wallet/tests/support/comms_and_services.rs index 296d1175074..b6c7344f0e0 100644 --- a/base_layer/wallet/tests/support/comms_and_services.rs +++ b/base_layer/wallet/tests/support/comms_and_services.rs @@ -26,6 +26,7 @@ use tari_comms::{ message::MessageTag, net_address::MultiaddressesWithStats, peer_manager::{NodeId, NodeIdentity, Peer, PeerFeatures, PeerFlags}, + transports::MemoryTransport, types::CommsPublicKey, CommsNode, }; @@ -57,6 +58,12 @@ pub async fn setup_comms_services( .await .unwrap(); + let comms = comms.spawn_with_transport(MemoryTransport).await.unwrap(); + // Set the public address for tests + comms + .node_identity() + .add_public_address(comms.listening_address().clone()); + (comms, dht) }