diff --git a/comms/dht/src/network_discovery/error.rs b/comms/dht/src/network_discovery/error.rs index db92cfc805..0b9a227908 100644 --- a/comms/dht/src/network_discovery/error.rs +++ b/comms/dht/src/network_discovery/error.rs @@ -34,6 +34,6 @@ pub enum NetworkDiscoveryError { ConnectivityError(#[from] ConnectivityError), #[error("No sync peers available")] NoSyncPeers, - #[error("Sync peer sent invalid peer")] + #[error("Sync peer sent invalid peer: {0}")] PeerValidationError(#[from] PeerValidatorError), } diff --git a/comms/dht/src/peer_validator.rs b/comms/dht/src/peer_validator.rs index 5e03a2f7bc..6bec6e4f00 100644 --- a/comms/dht/src/peer_validator.rs +++ b/comms/dht/src/peer_validator.rs @@ -34,12 +34,12 @@ const LOG_TARGET: &str = "dht::network_discovery::peer_validator"; #[derive(Debug, thiserror::Error)] pub enum PeerValidatorError { - #[error("Node ID was invalid")] - InvalidNodeId, - #[error("Peer signature was invalid")] - InvalidPeerSignature, - #[error("One or more peer addresses were invalid")] - InvalidPeerAddresses, + #[error("Node ID was invalid for peer '{peer}'")] + InvalidNodeId { peer: NodeId }, + #[error("Peer signature was invalid for peer '{peer}'")] + InvalidPeerSignature { peer: NodeId }, + #[error("One or more peer addresses were invalid for '{peer}'")] + InvalidPeerAddresses { peer: NodeId }, #[error("Peer manager error: {0}")] PeerManagerError(#[from] PeerManagerError), } @@ -61,13 +61,13 @@ impl<'a> PeerValidator<'a> { if let Err(err) = validate_peer_addresses(new_peer.addresses.iter(), self.config.allow_test_addresses) { warn!(target: LOG_TARGET, "Invalid peer address: {}", err); - return Err(PeerValidatorError::InvalidPeerAddresses); + return Err(PeerValidatorError::InvalidPeerAddresses { peer: new_peer.node_id }); } let can_update = match new_peer.is_valid_identity_signature() { // Update/insert peer Some(true) => true, - Some(false) => return Err(PeerValidatorError::InvalidPeerSignature), + Some(false) => return Err(PeerValidatorError::InvalidPeerSignature { peer: new_peer.node_id }), // Insert new peer if it doesn't exist, do not update None => false, }; @@ -124,6 +124,6 @@ fn validate_node_id(public_key: &CommsPublicKey, node_id: &NodeId) -> Result for IdentitySignature { let signature = CommsSecretKey::from_bytes(&value.signature)?; let updated_at = NaiveDateTime::from_timestamp_opt(value.updated_at, 0) .ok_or_else(|| anyhow::anyhow!("updated_at overflowed"))?; + let updated_at = DateTime::::from_utc(updated_at, Utc); Ok(Self::new(version, Signature::new(public_nonce, signature), updated_at)) } diff --git a/comms/src/net_address/mutliaddresses_with_stats.rs b/comms/src/net_address/mutliaddresses_with_stats.rs index 1cb9cfaa62..f640e5e3a7 100644 --- a/comms/src/net_address/mutliaddresses_with_stats.rs +++ b/comms/src/net_address/mutliaddresses_with_stats.rs @@ -73,12 +73,12 @@ impl MultiaddressesWithStats { .collect(); let to_add = addresses - .iter() - .filter(|addr| !self.addresses.iter().any(|a| a.address == **addr)) + .into_iter() + .filter(|addr| !self.addresses.iter().any(|a| a.address == *addr)) .collect::>(); for address in to_add { - self.addresses.push(address.clone().into()); + self.addresses.push(address.into()); } self.addresses.sort(); @@ -90,6 +90,16 @@ impl MultiaddressesWithStats { self.addresses.iter().map(|addr| &addr.address) } + pub fn to_lexicographically_sorted(&self) -> Vec { + let mut addresses = self.iter().cloned().collect::>(); + addresses.sort_by(|a, b| { + let bytes_a = a.as_ref(); + let bytes_b = b.as_ref(); + bytes_a.cmp(bytes_b) + }); + addresses + } + /// Finds the specified address in the set and allow updating of its variables such as its usage stats fn find_address_mut(&mut self, address: &Multiaddr) -> Option<&mut MutliaddrWithStats> { self.addresses.iter_mut().find(|a| &a.address == address) diff --git a/comms/src/peer_manager/identity_signature.rs b/comms/src/peer_manager/identity_signature.rs index 902fb59cf8..9643517a45 100644 --- a/comms/src/peer_manager/identity_signature.rs +++ b/comms/src/peer_manager/identity_signature.rs @@ -22,7 +22,7 @@ use std::convert::{TryFrom, TryInto}; -use chrono::{NaiveDateTime, Utc}; +use chrono::{DateTime, NaiveDateTime, Utc}; use digest::Digest; use prost::Message; use rand::rngs::OsRng; @@ -42,13 +42,13 @@ use crate::{ pub struct IdentitySignature { version: u8, signature: Signature, - updated_at: NaiveDateTime, + updated_at: DateTime, } impl IdentitySignature { pub const LATEST_VERSION: u8 = 0; - pub fn new(version: u8, signature: Signature, updated_at: NaiveDateTime) -> Self { + pub fn new(version: u8, signature: Signature, updated_at: DateTime) -> Self { Self { version, signature, @@ -56,11 +56,11 @@ impl IdentitySignature { } } - pub fn sign_new<'a, I: IntoIterator>( + pub(crate) fn sign_new<'a, I: IntoIterator>( secret_key: &CommsSecretKey, features: PeerFeatures, addresses: I, - updated_at: NaiveDateTime, + updated_at: DateTime, ) -> Self { let challenge = Self::construct_challenge(Self::LATEST_VERSION, features, addresses, updated_at); let nonce = CommsSecretKey::random(&mut OsRng); @@ -77,7 +77,7 @@ impl IdentitySignature { &self.signature } - pub fn updated_at(&self) -> NaiveDateTime { + pub fn updated_at(&self) -> DateTime { self.updated_at } @@ -86,7 +86,11 @@ impl IdentitySignature { } pub fn is_valid_for_peer(&self, peer: &Peer) -> bool { - self.is_valid(&peer.public_key, peer.features, peer.addresses.iter()) + self.is_valid( + &peer.public_key, + peer.features, + peer.addresses.to_lexicographically_sorted().iter(), + ) } pub fn is_valid<'a, I: IntoIterator>( @@ -100,7 +104,7 @@ impl IdentitySignature { return false; } // Do not accept timestamp more than 1 day in the future - if self.updated_at.timestamp() > (Utc::now().timestamp() + 24 * 60 * 60) { + if self.updated_at > Utc::now() + chrono::Duration::days(1) { return false; } @@ -112,11 +116,11 @@ impl IdentitySignature { version: u8, features: PeerFeatures, addresses: I, - updated_at: NaiveDateTime, + updated_at: DateTime, ) -> Challenge { let challenge = Challenge::new() .chain(version.to_le_bytes()) - .chain(u64::try_from(updated_at.timestamp()).unwrap_or(0).to_le_bytes()) + .chain((updated_at.timestamp() as u64).to_le_bytes()) .chain(features.bits().to_le_bytes()); addresses .into_iter() @@ -146,6 +150,7 @@ impl TryFrom for IdentitySignature { CommsSecretKey::from_bytes(&value.signature).map_err(|_| PeerManagerError::InvalidIdentitySignature)?; let updated_at = NaiveDateTime::from_timestamp_opt(value.updated_at, 0).ok_or(PeerManagerError::InvalidIdentitySignature)?; + let updated_at = DateTime::::from_utc(updated_at, Utc); Ok(Self { version, @@ -183,7 +188,7 @@ mod test { let secret = CommsSecretKey::random(&mut OsRng); let public_key = CommsPublicKey::from_secret_key(&secret); let address = Multiaddr::from_str("/ip4/127.0.0.1/tcp/1234").unwrap(); - let updated_at = Utc::now().naive_utc(); + let updated_at = Utc::now(); let identity = IdentitySignature::sign_new(&secret, PeerFeatures::COMMUNICATION_NODE, [&address], updated_at); let node_id = NodeId::from_public_key(&public_key); @@ -205,7 +210,7 @@ mod test { let secret = CommsSecretKey::random(&mut OsRng); let public_key = CommsPublicKey::from_secret_key(&secret); let address = Multiaddr::from_str("/ip4/127.0.0.1/tcp/1234").unwrap(); - let updated_at = Utc::now().naive_utc(); + let updated_at = Utc::now(); let identity = IdentitySignature::sign_new(&secret, PeerFeatures::COMMUNICATION_NODE, [&address], updated_at); let node_id = NodeId::from_public_key(&public_key); @@ -229,7 +234,7 @@ mod test { let secret = CommsSecretKey::random(&mut OsRng); let public_key = CommsPublicKey::from_secret_key(&secret); let address = Multiaddr::from_str("/ip4/127.0.0.1/tcp/1234").unwrap(); - let updated_at = Utc::now().naive_utc(); + let updated_at = Utc::now(); let identity = IdentitySignature::sign_new(&secret, PeerFeatures::COMMUNICATION_NODE, [&address], updated_at); let node_id = NodeId::from_public_key(&public_key); diff --git a/comms/src/peer_manager/migrations.rs b/comms/src/peer_manager/migrations.rs index c05ae4dfe9..9118874bc8 100644 --- a/comms/src/peer_manager/migrations.rs +++ b/comms/src/peer_manager/migrations.rs @@ -20,8 +20,8 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -mod v4; mod v5; +mod v6; use log::*; use tari_storage::lmdb_store::{LMDBDatabase, LMDBError}; @@ -32,7 +32,7 @@ pub(super) const MIGRATION_VERSION_KEY: u64 = u64::MAX; pub fn migrate(database: &LMDBDatabase) -> Result<(), LMDBError> { // Add migrations here in version order - let migrations = vec![v4::Migration.boxed(), v5::Migration.boxed()]; + let migrations = vec![v5::Migration.boxed(), v6::Migration.boxed()]; if migrations.is_empty() { return Ok(()); } diff --git a/comms/src/peer_manager/migrations/v4.rs b/comms/src/peer_manager/migrations/v6.rs similarity index 78% rename from comms/src/peer_manager/migrations/v4.rs rename to comms/src/peer_manager/migrations/v6.rs index eaddc1385a..cf874fdbce 100644 --- a/comms/src/peer_manager/migrations/v4.rs +++ b/comms/src/peer_manager/migrations/v6.rs @@ -37,6 +37,7 @@ use crate::{ connection_stats::PeerConnectionStats, migrations::MIGRATION_VERSION_KEY, node_id::deserialize_node_id_from_hex, + IdentitySignature, NodeId, PeerFeatures, PeerFlags, @@ -46,30 +47,10 @@ use crate::{ types::CommsPublicKey, }; -const LOG_TARGET: &str = "comms::peer_manager::migrations::v4"; +const LOG_TARGET: &str = "comms::peer_manager::migrations::v6"; #[derive(Debug, Deserialize, Serialize)] -pub struct PeerV3 { - pub(super) id: Option, - pub public_key: CommsPublicKey, - #[serde(serialize_with = "serialize_to_hex")] - #[serde(deserialize_with = "deserialize_node_id_from_hex")] - pub node_id: NodeId, - pub addresses: MultiaddressesWithStats, - pub flags: PeerFlags, - pub banned_until: Option, - pub banned_reason: String, - pub offline_at: Option, - pub features: PeerFeatures, - pub connection_stats: PeerConnectionStats, - pub supported_protocols: Vec, - pub added_at: NaiveDateTime, - pub user_agent: String, - pub metadata: HashMap>, -} - -#[derive(Debug, Deserialize, Serialize)] -pub struct PeerV4 { +pub struct PeerV5 { pub(super) id: Option, pub public_key: CommsPublicKey, #[serde(serialize_with = "serialize_to_hex")] @@ -87,41 +68,44 @@ pub struct PeerV4 { pub added_at: NaiveDateTime, pub user_agent: String, pub metadata: HashMap>, + pub identity_signature: Option, } +/// No structural changes, just clears the identity signatures pub struct Migration; impl super::Migration for Migration { type Error = LMDBError; fn get_version(&self) -> u32 { - 4 + 6 } fn migrate(&self, db: &LMDBDatabase) -> Result<(), Self::Error> { - let result = db.for_each::(|old_peer| { + db.for_each::(|old_peer| { let result = old_peer.and_then(|(key, peer)| { if key == MIGRATION_VERSION_KEY { return Ok(()); } debug!(target: LOG_TARGET, "Migrating peer `{}`", peer.node_id.short_str()); - db.insert(&key, &PeerV4 { + db.insert(&key, &PeerV5 { id: peer.id, public_key: peer.public_key, node_id: peer.node_id, - last_seen: peer.addresses.last_seen().map(|ts| ts.naive_utc()), addresses: peer.addresses, flags: peer.flags, banned_until: peer.banned_until, banned_reason: peer.banned_reason, offline_at: peer.offline_at, + last_seen: peer.last_seen, features: peer.features, connection_stats: peer.connection_stats, supported_protocols: peer.supported_protocols, added_at: peer.added_at, user_agent: peer.user_agent, metadata: peer.metadata, + identity_signature: None, }) .map_err(Into::into) }); @@ -133,14 +117,7 @@ impl super::Migration for Migration { ); } IterationResult::Continue - }); - - if let Err(err) = result { - error!( - target: LOG_TARGET, - "Error reading peer pd: {} ** Database may be corrupt **", err - ); - } + })?; Ok(()) } diff --git a/comms/src/peer_manager/node_identity.rs b/comms/src/peer_manager/node_identity.rs index 7d72aa0713..b319326b63 100644 --- a/comms/src/peer_manager/node_identity.rs +++ b/comms/src/peer_manager/node_identity.rs @@ -165,7 +165,7 @@ impl NodeIdentity { self.secret_key(), self.features, Some(&*acquire_read_lock!(self.public_address)), - Utc::now().naive_utc(), + Utc::now(), ); *acquire_write_lock!(self.identity_signature) = Some(identity_sig); diff --git a/comms/src/stream_id.rs b/comms/src/stream_id.rs index 4363d29d1f..4f314340a6 100644 --- a/comms/src/stream_id.rs +++ b/comms/src/stream_id.rs @@ -41,6 +41,6 @@ impl Id { impl fmt::Display for Id { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0) + write!(f, "{}", self.as_u32()) } }