diff --git a/applications/tari_base_node/src/commands/command/get_peer.rs b/applications/tari_base_node/src/commands/command/get_peer.rs index 46fe8d9c49..0b1e3c2274 100644 --- a/applications/tari_base_node/src/commands/command/get_peer.rs +++ b/applications/tari_base_node/src/commands/command/get_peer.rs @@ -90,6 +90,7 @@ impl CommandContext { }); println!("User agent: {}", peer.user_agent); println!("Features: {:?}", peer.features); + println!("Flags: {:?}", peer.flags); println!("Supported protocols:"); peer.supported_protocols.iter().for_each(|p| { println!("- {}", String::from_utf8_lossy(p)); diff --git a/applications/tari_base_node/src/commands/command/list_peers.rs b/applications/tari_base_node/src/commands/command/list_peers.rs index 1a0685a777..7587b28e3e 100644 --- a/applications/tari_base_node/src/commands/command/list_peers.rs +++ b/applications/tari_base_node/src/commands/command/list_peers.rs @@ -63,7 +63,9 @@ impl CommandContext { for peer in peers { let info_str = { let mut s = vec![]; - + if peer.is_seed() { + s.push("SEED".to_string()); + } if peer.is_offline() { if !peer.is_banned() { s.push("OFFLINE".to_string()); diff --git a/base_layer/p2p/src/initialization.rs b/base_layer/p2p/src/initialization.rs index f17e4afde3..648ae58419 100644 --- a/base_layer/p2p/src/initialization.rs +++ b/base_layer/p2p/src/initialization.rs @@ -39,7 +39,7 @@ use rand::{distributions::Alphanumeric, thread_rng, Rng}; use tari_common::configuration::Network; use tari_comms::{ backoff::ConstantBackoff, - peer_manager::{NodeIdentity, Peer, PeerFeatures, PeerManagerError}, + peer_manager::{NodeIdentity, Peer, PeerFeatures, PeerFlags, PeerManagerError}, pipeline, protocol::{ messaging::{MessagingEventSender, MessagingProtocolExtension}, @@ -143,7 +143,7 @@ pub async fn initialize_local_test_comms>( .with_shutdown_signal(shutdown_signal) .build()?; - add_all_peers(&comms.peer_manager(), &comms.node_identity(), seed_peers).await?; + add_seed_peers(&comms.peer_manager(), &comms.node_identity(), seed_peers).await?; // Create outbound channel let (outbound_tx, outbound_rx) = mpsc::channel(10); @@ -384,12 +384,12 @@ fn acquire_exclusive_file_lock(db_path: &Path) -> Result, ) -> Result<(), CommsInitializationError> { - for peer in peers { + for mut peer in peers { if &peer.public_key == node_identity.public_key() { debug!( target: LOG_TARGET, @@ -397,6 +397,7 @@ async fn add_all_peers( ); continue; } + peer.add_flags(PeerFlags::SEED); debug!(target: LOG_TARGET, "Adding seed peer [{}]", peer); peer_manager @@ -544,12 +545,12 @@ impl ServiceInitializer for P2pInitializer { Vec::new() }, }; - add_all_peers(&peer_manager, &node_identity, peers).await?; + add_seed_peers(&peer_manager, &node_identity, peers).await?; // TODO: Use serde let peers = Self::try_parse_seed_peers(&self.seed_config.peer_seeds)?; - add_all_peers(&peer_manager, &node_identity, peers).await?; + add_seed_peers(&peer_manager, &node_identity, peers).await?; context.register_handle(comms.connectivity()); context.register_handle(peer_manager); diff --git a/comms/core/Cargo.toml b/comms/core/Cargo.toml index 675a70c380..79d496dd84 100644 --- a/comms/core/Cargo.toml +++ b/comms/core/Cargo.toml @@ -21,7 +21,7 @@ async-trait = "0.1.36" bitflags = "1.0.4" blake2 = "0.9.0" bytes = { version = "1", features = ["serde"] } -chrono = { version = "0.4.19", default-features = false, features = ["serde"] } +chrono = { version = "0.4.19", default-features = false, features = ["serde", "clock"] } cidr = "0.1.0" clear_on_drop = "=0.2.4" data-encoding = "2.2.0" diff --git a/comms/core/src/peer_manager/peer.rs b/comms/core/src/peer_manager/peer.rs index 6dabf87e17..208f215229 100644 --- a/comms/core/src/peer_manager/peer.rs +++ b/comms/core/src/peer_manager/peer.rs @@ -53,6 +53,7 @@ bitflags! { #[derive(Default, Deserialize, Serialize)] pub struct PeerFlags: u8 { const NONE = 0x00; + const SEED = 0x01; } } @@ -159,11 +160,6 @@ impl Peer { .map(|since| Duration::from_millis(u64::try_from(since.num_milliseconds()).unwrap())) } - /// TODO: Remove once we don't have to sync wallet and base node db - pub fn unset_id(&mut self) { - self.id = None; - } - pub(super) fn set_id(&mut self, id: PeerId) { self.id = Some(id); } @@ -174,7 +170,6 @@ impl Peer { } #[allow(clippy::option_option)] - pub fn update( &mut self, net_addresses: Option>, @@ -305,6 +300,15 @@ impl Peer { self } + pub fn add_flags(&mut self, flags: PeerFlags) -> &mut Self { + self.flags |= flags; + self + } + + pub fn is_seed(&self) -> bool { + self.flags.contains(PeerFlags::SEED) + } + pub fn to_short_string(&self) -> String { format!( "{}::{}", diff --git a/comms/dht/src/peer_validator.rs b/comms/dht/src/peer_validator.rs index d9e1d71c42..a2fe111ad8 100644 --- a/comms/dht/src/peer_validator.rs +++ b/comms/dht/src/peer_validator.rs @@ -88,11 +88,14 @@ impl<'a> PeerValidator<'a> { .map(|i| i.updated_at()) .expect("unreachable panic"); - // Update if new_peer has newer timestamp than current_peer + // Update if new_peer has newer timestamp than current_peer, and if the newer timestamp is after the + // added date current_peer .identity_signature .as_ref() - .map(|i| i.updated_at() < new_dt) + .map(|i| i.updated_at() < new_dt && ( + !current_peer.is_seed() || + current_peer.added_at < new_dt.naive_utc())) // If None, update to peer with valid signature .unwrap_or(true) }; @@ -134,3 +137,114 @@ fn validate_node_id(public_key: &CommsPublicKey, node_id: &NodeId) -> Result { const peers = await client.getPeers(); - expect(peers.length).to.be.greaterThanOrEqual(peerCount); - } -); + return peers.length >= peerCount; + }, 10 * 1000); +}); When("I print the world", function () { console.log(this);