diff --git a/applications/minotari_app_grpc/proto/base_node.proto b/applications/minotari_app_grpc/proto/base_node.proto index d27fb38d40..1ea4428757 100644 --- a/applications/minotari_app_grpc/proto/base_node.proto +++ b/applications/minotari_app_grpc/proto/base_node.proto @@ -357,6 +357,7 @@ message SyncProgressResponse { uint64 local_height = 2; SyncState state = 3; string short_desc = 4; + uint64 initial_connected_peers = 5; } enum SyncState { diff --git a/applications/minotari_node/src/grpc/base_node_grpc_server.rs b/applications/minotari_node/src/grpc/base_node_grpc_server.rs index 96fae8d189..8738da2433 100644 --- a/applications/minotari_node/src/grpc/base_node_grpc_server.rs +++ b/applications/minotari_node/src/grpc/base_node_grpc_server.rs @@ -2012,24 +2012,28 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { local_height: 0, state: tari_rpc::SyncState::HeaderStarting.into(), short_desc, + initial_connected_peers: 0, }, StateInfo::HeaderSync(Some(info)) => tari_rpc::SyncProgressResponse { tip_height: info.tip_height, local_height: info.local_height, state: tari_rpc::SyncState::Header.into(), short_desc, + initial_connected_peers: 0, }, StateInfo::Connecting(_) => tari_rpc::SyncProgressResponse { tip_height: 0, local_height: 0, state: tari_rpc::SyncState::BlockStarting.into(), short_desc, + initial_connected_peers: 0, }, StateInfo::BlockSync(info) => tari_rpc::SyncProgressResponse { tip_height: info.tip_height, local_height: info.local_height, state: tari_rpc::SyncState::Block.into(), short_desc, + initial_connected_peers: 0, }, _ => tari_rpc::SyncProgressResponse { tip_height: 0, @@ -2040,6 +2044,7 @@ impl tari_rpc::base_node_server::BaseNode for BaseNodeGrpcServer { tari_rpc::SyncState::Startup.into() }, short_desc, + initial_connected_peers: state.get_initial_connected_peers(), }, }; Ok(Response::new(response)) diff --git a/base_layer/core/src/base_node/state_machine_service/state_machine.rs b/base_layer/core/src/base_node/state_machine_service/state_machine.rs index 4578ee6086..9d3547ba76 100644 --- a/base_layer/core/src/base_node/state_machine_service/state_machine.rs +++ b/base_layer/core/src/base_node/state_machine_service/state_machine.rs @@ -62,6 +62,9 @@ pub struct BaseNodeStateMachineConfig { /// to always be behind the network #[serde(with = "serializers::seconds")] pub time_before_considered_lagging: Duration, + /// This is the amount of metadata events that a node will wait for before decide to start syncing for a peer, + /// choosing the best peer out of the list + pub initial_sync_peer_count: u64, } #[allow(clippy::derivable_impls)] @@ -71,6 +74,7 @@ impl Default for BaseNodeStateMachineConfig { blockchain_sync_config: Default::default(), blocks_behind_before_considered_lagging: 1, time_before_considered_lagging: Duration::from_secs(10), + initial_sync_peer_count: 5, } } } diff --git a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs index 19c88d1278..fce4dc9a88 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/events_and_states.rs @@ -210,7 +210,17 @@ impl StateInfo { HorizonSync(info) => info.to_progress_string(), BlockSync(info) => format!("Syncing blocks: {}", info.sync_progress_string_blocks()), - Listening(_) => "Listening".to_string(), + Listening(info) => { + if info.is_synced() { + "Listening".to_string() + } else { + format!( + "Waiting for peer data: {}/{}", + info.initial_delay_connected_count(), + info.initial_sync_peer_wait_count() + ) + } + }, SyncFailed(details) => format!("Sync failed: {}", details), } } @@ -230,6 +240,13 @@ impl StateInfo { Listening(info) => info.is_synced(), } } + + pub fn get_initial_connected_peers(&self) -> u64 { + match self { + StateInfo::Listening(info) => info.initial_delay_connected_count(), + _ => 0, + } + } } impl Display for StateInfo { diff --git a/base_layer/core/src/base_node/state_machine_service/states/listening.rs b/base_layer/core/src/base_node/state_machine_service/states/listening.rs index 4575434bb6..699a6f370e 100644 --- a/base_layer/core/src/base_node/state_machine_service/states/listening.rs +++ b/base_layer/core/src/base_node/state_machine_service/states/listening.rs @@ -55,7 +55,6 @@ use crate::{ }; const LOG_TARGET: &str = "c::bn::state_machine_service::states::listening"; -const INITIAL_SYNC_PEER_COUNT: usize = 5; /// This struct contains the info of the peer, and is used to serialised and deserialised. #[derive(Serialize, Deserialize)] @@ -78,6 +77,8 @@ impl PeerMetadata { /// This struct contains info that is use full for external viewing of state info pub struct ListeningInfo { synced: bool, + initial_delay_connected_count: u64, + initial_sync_peer_wait_count: u64, } impl Display for ListeningInfo { @@ -88,13 +89,25 @@ impl Display for ListeningInfo { impl ListeningInfo { /// Creates a new ListeningInfo - pub const fn new(is_synced: bool) -> Self { - Self { synced: is_synced } + pub const fn new(is_synced: bool, initial_delay_connected_count: u64, initial_sync_peer_wait_count: u64) -> Self { + Self { + synced: is_synced, + initial_delay_connected_count, + initial_sync_peer_wait_count, + } } pub fn is_synced(&self) -> bool { self.synced } + + pub fn initial_delay_connected_count(&self) -> u64 { + self.initial_delay_connected_count + } + + pub fn initial_sync_peer_wait_count(&self) -> u64 { + self.initial_sync_peer_wait_count + } } /// This state listens for chain metadata events received from the liveness and chain metadata service. Based on the @@ -103,6 +116,7 @@ impl ListeningInfo { #[derive(Clone, Debug, PartialEq, Eq, Default)] pub struct Listening { is_synced: bool, + initial_delay_count: u64, } impl Listening { @@ -116,7 +130,11 @@ impl Listening { shared: &mut BaseNodeStateMachine, ) -> StateEvent { info!(target: LOG_TARGET, "Listening for chain metadata updates"); - shared.set_state_info(StateInfo::Listening(ListeningInfo::new(self.is_synced))); + shared.set_state_info(StateInfo::Listening(ListeningInfo::new( + self.is_synced, + self.initial_delay_count, + shared.config.initial_sync_peer_count, + ))); let mut time_since_better_block = None; let mut initial_sync_counter = 0; let mut initial_sync_peer_list = Vec::new(); @@ -130,11 +148,25 @@ impl Listening { debug!("NetworkSilence event received"); if !self.is_synced { self.is_synced = true; - shared.set_state_info(StateInfo::Listening(ListeningInfo::new(true))); + self.initial_delay_count = 0; + shared.set_state_info(StateInfo::Listening(ListeningInfo::new( + true, + 0, + shared.config.initial_sync_peer_count, + ))); debug!(target: LOG_TARGET, "Initial sync achieved"); } }, Ok(ChainMetadataEvent::PeerChainMetadataReceived(peer_metadata)) => { + // if we are not yet synced, we wait for the initial delay of ping/pongs, so let's propagate the + // updated info + if !self.is_synced { + shared.set_state_info(StateInfo::Listening(ListeningInfo::new( + self.is_synced, + self.initial_delay_count, + shared.config.initial_sync_peer_count, + ))); + } // We already ban the peer based on some previous logic, but this message was already in the // pipeline before the ban went into effect. match shared.peer_manager.is_peer_banned(peer_metadata.node_id()).await { @@ -224,7 +256,12 @@ impl Listening { if !self.is_synced && sync_mode.is_up_to_date() { self.is_synced = true; - shared.set_state_info(StateInfo::Listening(ListeningInfo::new(true))); + self.initial_delay_count = 0; + shared.set_state_info(StateInfo::Listening(ListeningInfo::new( + true, + 0, + shared.config.initial_sync_peer_count, + ))); debug!(target: LOG_TARGET, "Initial sync achieved"); } @@ -243,6 +280,7 @@ impl Listening { } = sync_mode { initial_sync_counter += 1; + self.initial_delay_count = initial_sync_counter; for peer in sync_peers { let mut found = false; // lets search the list list to ensure we only have unique peers in the list with the latest @@ -263,7 +301,7 @@ impl Listening { } // We use a list here to ensure that we dont wait for even for INITIAL_SYNC_PEER_COUNT different // peers - if initial_sync_counter >= INITIAL_SYNC_PEER_COUNT { + if initial_sync_counter >= shared.config.initial_sync_peer_count { // lets return now that we have enough peers to chose from return StateEvent::FallenBehind(SyncStatus::Lagging { local, @@ -293,7 +331,10 @@ impl Listening { impl From for Listening { fn from(_: Waiting) -> Self { - Self { is_synced: false } + Self { + is_synced: false, + initial_delay_count: 0, + } } } @@ -301,6 +342,7 @@ impl From for Listening { fn from(sync: HeaderSyncState) -> Self { Self { is_synced: sync.is_synced(), + initial_delay_count: 0, } } } @@ -309,6 +351,7 @@ impl From for Listening { fn from(sync: BlockSync) -> Self { Self { is_synced: sync.is_synced(), + initial_delay_count: 0, } } } @@ -317,6 +360,7 @@ impl From for Listening { fn from(sync: DecideNextSync) -> Self { Self { is_synced: sync.is_synced(), + initial_delay_count: 0, } } } diff --git a/base_layer/core/tests/tests/base_node_rpc.rs b/base_layer/core/tests/tests/base_node_rpc.rs index b78ae3014b..9cff63d807 100644 --- a/base_layer/core/tests/tests/base_node_rpc.rs +++ b/base_layer/core/tests/tests/base_node_rpc.rs @@ -99,7 +99,7 @@ async fn setup() -> ( .await; base_node.mock_base_node_state_machine.publish_status(StatusInfo { bootstrapped: true, - state_info: StateInfo::Listening(ListeningInfo::new(true)), + state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)), randomx_vm_cnt: 0, randomx_vm_flags: RandomXFlag::FLAG_DEFAULT, }); diff --git a/base_layer/core/tests/tests/mempool.rs b/base_layer/core/tests/tests/mempool.rs index 023fa5faf2..87ebcd9618 100644 --- a/base_layer/core/tests/tests/mempool.rs +++ b/base_layer/core/tests/tests/mempool.rs @@ -1067,19 +1067,19 @@ async fn receive_and_propagate_transaction() { alice_node.mock_base_node_state_machine.publish_status(StatusInfo { bootstrapped: true, - state_info: StateInfo::Listening(ListeningInfo::new(true)), + state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)), randomx_vm_cnt: 0, randomx_vm_flags: RandomXFlag::FLAG_DEFAULT, }); bob_node.mock_base_node_state_machine.publish_status(StatusInfo { bootstrapped: true, - state_info: StateInfo::Listening(ListeningInfo::new(true)), + state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)), randomx_vm_cnt: 0, randomx_vm_flags: RandomXFlag::FLAG_DEFAULT, }); carol_node.mock_base_node_state_machine.publish_status(StatusInfo { bootstrapped: true, - state_info: StateInfo::Listening(ListeningInfo::new(true)), + state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)), randomx_vm_cnt: 0, randomx_vm_flags: RandomXFlag::FLAG_DEFAULT, }); @@ -1738,7 +1738,7 @@ async fn block_event_and_reorg_event_handling() { alice.mock_base_node_state_machine.publish_status(StatusInfo { bootstrapped: true, - state_info: StateInfo::Listening(ListeningInfo::new(true)), + state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)), randomx_vm_cnt: 0, randomx_vm_flags: RandomXFlag::FLAG_DEFAULT, }); diff --git a/base_layer/core/tests/tests/node_service.rs b/base_layer/core/tests/tests/node_service.rs index 8f3abcf78e..f45703d2dd 100644 --- a/base_layer/core/tests/tests/node_service.rs +++ b/base_layer/core/tests/tests/node_service.rs @@ -139,25 +139,25 @@ async fn propagate_and_forward_many_valid_blocks() { wait_until_online(&[&alice_node, &bob_node, &carol_node, &dan_node]).await; alice_node.mock_base_node_state_machine.publish_status(StatusInfo { bootstrapped: true, - state_info: StateInfo::Listening(ListeningInfo::new(true)), + state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)), randomx_vm_cnt: 0, randomx_vm_flags: RandomXFlag::FLAG_DEFAULT, }); bob_node.mock_base_node_state_machine.publish_status(StatusInfo { bootstrapped: true, - state_info: StateInfo::Listening(ListeningInfo::new(true)), + state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)), randomx_vm_cnt: 0, randomx_vm_flags: RandomXFlag::FLAG_DEFAULT, }); carol_node.mock_base_node_state_machine.publish_status(StatusInfo { bootstrapped: true, - state_info: StateInfo::Listening(ListeningInfo::new(true)), + state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)), randomx_vm_cnt: 0, randomx_vm_flags: RandomXFlag::FLAG_DEFAULT, }); dan_node.mock_base_node_state_machine.publish_status(StatusInfo { bootstrapped: true, - state_info: StateInfo::Listening(ListeningInfo::new(true)), + state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)), randomx_vm_cnt: 0, randomx_vm_flags: RandomXFlag::FLAG_DEFAULT, }); @@ -271,19 +271,19 @@ async fn propagate_and_forward_invalid_block_hash() { wait_until_online(&[&alice_node, &bob_node, &carol_node]).await; alice_node.mock_base_node_state_machine.publish_status(StatusInfo { bootstrapped: true, - state_info: StateInfo::Listening(ListeningInfo::new(true)), + state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)), randomx_vm_cnt: 0, randomx_vm_flags: RandomXFlag::FLAG_DEFAULT, }); bob_node.mock_base_node_state_machine.publish_status(StatusInfo { bootstrapped: true, - state_info: StateInfo::Listening(ListeningInfo::new(true)), + state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)), randomx_vm_cnt: 0, randomx_vm_flags: RandomXFlag::FLAG_DEFAULT, }); carol_node.mock_base_node_state_machine.publish_status(StatusInfo { bootstrapped: true, - state_info: StateInfo::Listening(ListeningInfo::new(true)), + state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)), randomx_vm_cnt: 0, randomx_vm_flags: RandomXFlag::FLAG_DEFAULT, }); @@ -427,25 +427,25 @@ async fn propagate_and_forward_invalid_block() { alice_node.mock_base_node_state_machine.publish_status(StatusInfo { bootstrapped: true, - state_info: StateInfo::Listening(ListeningInfo::new(true)), + state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)), randomx_vm_cnt: 0, randomx_vm_flags: RandomXFlag::FLAG_DEFAULT, }); bob_node.mock_base_node_state_machine.publish_status(StatusInfo { bootstrapped: true, - state_info: StateInfo::Listening(ListeningInfo::new(true)), + state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)), randomx_vm_cnt: 0, randomx_vm_flags: RandomXFlag::FLAG_DEFAULT, }); carol_node.mock_base_node_state_machine.publish_status(StatusInfo { bootstrapped: true, - state_info: StateInfo::Listening(ListeningInfo::new(true)), + state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)), randomx_vm_cnt: 0, randomx_vm_flags: RandomXFlag::FLAG_DEFAULT, }); dan_node.mock_base_node_state_machine.publish_status(StatusInfo { bootstrapped: true, - state_info: StateInfo::Listening(ListeningInfo::new(true)), + state_info: StateInfo::Listening(ListeningInfo::new(true, 0, 0)), randomx_vm_cnt: 0, randomx_vm_flags: RandomXFlag::FLAG_DEFAULT, }); diff --git a/common/config/presets/c_base_node_c.toml b/common/config/presets/c_base_node_c.toml index 536ab3a6b7..36d9c6b5fc 100644 --- a/common/config/presets/c_base_node_c.toml +++ b/common/config/presets/c_base_node_c.toml @@ -117,6 +117,8 @@ blockchain_sync_config.rpc_deadline = 240 # intensive. Be careful of setting this higher than the block time, which would potentially cause it # to always be behind the network (default = 10) (in seconds) #time_before_considered_lagging = 10 +#This is the amount of metadata events that a node will wait for before decide to start syncing for a peer, choosing the best peer out of the list +#initial_sync_peer_count = 5, [base_node.p2p] # The node's publicly-accessible hostname. This is the host name that is advertised on the network so that