From 594e03eada389c1a131d5877f42f8c43b85a9fbe Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Fri, 1 Sep 2023 16:40:53 +0400 Subject: [PATCH] fix(dht): add SAF bans (#5711) Description --- Added bans for deviant peer behaviour when processing SAF messages Ban if peer sends more than a maximum number of SAF messages Specify locally configured message limit over the wire to ensure that peers can comply with this limit Motivation and Context --- Implement bans for bad peer responses from SAF messages A peer could send many tiny messages in order to stay within the max message size. Another buffering vec is allocated per message, which allows the SAF responder to allocate a large vec in the recipient. The addition of a limit to the SAF request is not a breaking change (0 == no limit which implies use the remote peer's configured limit). This only matters if peers have non-default values. How Has This Been Tested? --- New unit test for bad message semantics. Existing SAF unit tests provide ok coverage incl bad paths. What process can a PR reviewer use to test or verify this change? --- Test SAF Breaking Changes --- - [x] None - [ ] Requires data directory on base node to be deleted - [ ] Requires hard fork - [ ] Other - Please specify --- comms/dht/src/discovery/error.rs | 2 + comms/dht/src/discovery/service.rs | 5 + comms/dht/src/envelope.rs | 32 +- comms/dht/src/inbound/decryption.rs | 37 +- comms/dht/src/inbound/forward.rs | 18 +- comms/dht/src/inbound/message.rs | 16 + comms/dht/src/message_signature.rs | 2 +- comms/dht/src/proto/store_forward.proto | 1 + comms/dht/src/store_forward/error.rs | 38 +- comms/dht/src/store_forward/message.rs | 3 +- .../dht/src/store_forward/saf_handler/task.rs | 461 +++++++++++++----- comms/dht/src/store_forward/service.rs | 31 +- comms/dht/src/store_forward/store.rs | 63 ++- comms/dht/tests/attacks.rs | 2 +- 14 files changed, 487 insertions(+), 224 deletions(-) diff --git a/comms/dht/src/discovery/error.rs b/comms/dht/src/discovery/error.rs index f5951873e7..50c7777f9f 100644 --- a/comms/dht/src/discovery/error.rs +++ b/comms/dht/src/discovery/error.rs @@ -55,6 +55,8 @@ pub enum DhtDiscoveryError { InvalidDiscoveryResponse { details: anyhow::Error }, #[error("DHT peer validator error: {0}")] PeerValidatorError(#[from] DhtPeerValidatorError), + #[error("Cannot send discovery for this node")] + CannotDiscoverThisNode, } impl DhtDiscoveryError { diff --git a/comms/dht/src/discovery/service.rs b/comms/dht/src/discovery/service.rs index fe1319dc82..469520322a 100644 --- a/comms/dht/src/discovery/service.rs +++ b/comms/dht/src/discovery/service.rs @@ -303,6 +303,11 @@ impl DhtDiscoveryService { reply_tx: oneshot::Sender>, ) -> Result<(), DhtDiscoveryError> { let nonce = OsRng.next_u64(); + if *dest_pubkey == *self.node_identity.public_key() { + let _result = reply_tx.send(Err(DhtDiscoveryError::CannotDiscoverThisNode)); + return Ok(()); + } + if let Err(err) = self.send_discover(nonce, destination, dest_pubkey.clone()).await { let _result = reply_tx.send(Err(err)); return Ok(()); diff --git a/comms/dht/src/envelope.rs b/comms/dht/src/envelope.rs index e507cfc00a..28c87e5be1 100644 --- a/comms/dht/src/envelope.rs +++ b/comms/dht/src/envelope.rs @@ -67,6 +67,7 @@ pub(crate) fn epochtime_to_datetime(datetime: EpochTime) -> DateTime { DateTime::from_utc(dt, Utc) } +/// Message errors that should be verified by every node #[derive(Debug, Error)] pub enum DhtMessageError { #[error("Invalid node destination")] @@ -83,8 +84,10 @@ pub enum DhtMessageError { InvalidMessageFlags, #[error("Invalid ephemeral public key")] InvalidEphemeralPublicKey, - #[error("Header was omitted from the message")] + #[error("Header is omitted from the message")] HeaderOmitted, + #[error("Message Body is empty")] + BodyEmpty, } impl fmt::Display for DhtMessageType { @@ -157,12 +160,31 @@ pub struct DhtMessageHeader { } impl DhtMessageHeader { - pub fn is_valid(&self) -> bool { + /// Checks if the DHT header is semantically valid. For example, if the message is flagged as encrypted, but sets a + /// empty signature or provides no ephemeral public key, this returns false. + pub fn is_semantically_valid(&self) -> bool { + // If the message is encrypted: + // - it needs a destination + // - it needs an ephemeral public key + // - it needs a signature if self.flags.is_encrypted() { - !self.message_signature.is_empty() && self.ephemeral_public_key.is_some() - } else { - true + // Must have a destination + if self.destination.is_unknown() { + return false; + } + + // Must have an ephemeral public key + if self.ephemeral_public_key.is_none() { + return false; + } + + // Must have a signature + if self.message_signature.is_empty() { + return false; + } } + + true } } diff --git a/comms/dht/src/inbound/decryption.rs b/comms/dht/src/inbound/decryption.rs index d08185dc50..f54843b634 100644 --- a/comms/dht/src/inbound/decryption.rs +++ b/comms/dht/src/inbound/decryption.rs @@ -56,6 +56,8 @@ enum DecryptionError { MessageRejectDecryptionFailed, #[error("Failed to decode envelope body")] EnvelopeBodyDecodeFailed, + #[error("Bad clear-text message semantics")] + BadClearTextMessageSemantics, } /// This layer is responsible for attempting to decrypt inbound messages. @@ -294,36 +296,17 @@ where S: Service /// /// These failure modes are detectable by any node, so it is generally safe to ban an offending peer. fn initial_validation(message: DhtInboundMessage) -> Result { - // If an unencrypted message has no signature, it passes this validation automatically - if !message.dht_header.flags.is_encrypted() && message.dht_header.message_signature.is_empty() { - return Ok(ValidatedDhtInboundMessage::new(message, None)); - } - - // If the message is encrypted: - // - it must be nonempty - // - it needs a destination - // - it needs an ephemeral public key - // - it needs a signature - if message.dht_header.flags.is_encrypted() { - // Must be nonempty - if message.body.is_empty() { - return Err(DecryptionError::BadEncryptedMessageSemantics); - } - - // Must have a destination - if message.dht_header.destination.is_unknown() { - return Err(DecryptionError::BadEncryptedMessageSemantics); - } - - // Must have an ephemeral public key - if message.dht_header.ephemeral_public_key.is_none() { + if !message.is_semantically_valid() { + if message.dht_header.flags.is_encrypted() { return Err(DecryptionError::BadEncryptedMessageSemantics); + } else { + return Err(DecryptionError::BadClearTextMessageSemantics); } + } - // Must have a signature - if message.dht_header.message_signature.is_empty() { - return Err(DecryptionError::BadEncryptedMessageSemantics); - } + // If a signature is not present, the message is valid at this point + if message.dht_header.message_signature.is_empty() { + return Ok(ValidatedDhtInboundMessage::new(message, None)); } // If a signature is present, it must be valid diff --git a/comms/dht/src/inbound/forward.rs b/comms/dht/src/inbound/forward.rs index 37e04129d2..0b34f76785 100644 --- a/comms/dht/src/inbound/forward.rs +++ b/comms/dht/src/inbound/forward.rs @@ -186,26 +186,12 @@ where S: Service dht_header, is_saf_stored, is_already_forwarded, - authenticated_origin, .. } = message; if self.destination_matches_source(&dht_header.destination, source_peer) { - // #banheuristic - the origin of this message was the destination. Two things are wrong here: - // 1. The origin/destination should not have forwarded this (the destination node didnt do this - // destination_matches_source check) - // 1. The origin sent a message that the destination could not decrypt - // The authenticated source should be banned (malicious), and origin should be temporarily banned - // (bug?) - if let Some(authenticated_origin) = authenticated_origin { - self.dht - .ban_peer( - authenticated_origin.clone(), - OffenceSeverity::High, - "Received message from peer that is destined for that peer. This peer originally sent it.", - ) - .await; - } + // The origin/destination should not have forwarded this (the source node didnt do this + // destination_matches_source check) self.dht .ban_peer( source_peer.public_key.clone(), diff --git a/comms/dht/src/inbound/message.rs b/comms/dht/src/inbound/message.rs index 5d89f98e98..0d811b2557 100644 --- a/comms/dht/src/inbound/message.rs +++ b/comms/dht/src/inbound/message.rs @@ -91,6 +91,22 @@ impl DhtInboundMessage { body, } } + + pub fn is_semantically_valid(&self) -> bool { + if !self.dht_header.is_semantically_valid() { + return false; + } + + // If the message is encrypted: + // - it must be nonempty + if self.dht_header.flags.is_encrypted() { + // Body must be nonempty + if self.body.is_empty() { + return false; + } + } + true + } } impl Display for DhtInboundMessage { diff --git a/comms/dht/src/message_signature.rs b/comms/dht/src/message_signature.rs index 975b5d9208..fa71033e93 100644 --- a/comms/dht/src/message_signature.rs +++ b/comms/dht/src/message_signature.rs @@ -123,7 +123,7 @@ pub struct ProtoMessageSignature { #[derive(Debug, thiserror::Error, PartialEq)] pub enum MessageSignatureError { - #[error("Failed to validate message signature")] + #[error("Message signature does not contain valid scalar bytes")] InvalidSignatureBytes, #[error("Message signature contained an invalid public nonce")] InvalidPublicNonceBytes, diff --git a/comms/dht/src/proto/store_forward.proto b/comms/dht/src/proto/store_forward.proto index be5d0dbe76..830004e5ea 100644 --- a/comms/dht/src/proto/store_forward.proto +++ b/comms/dht/src/proto/store_forward.proto @@ -15,6 +15,7 @@ package tari.dht.store_forward; message StoredMessagesRequest { google.protobuf.Timestamp since = 1; uint32 request_id = 2; + uint32 limit = 3; } // Storage for a single message envelope, including the date and time when the element was stored diff --git a/comms/dht/src/store_forward/error.rs b/comms/dht/src/store_forward/error.rs index 6d202cd6e5..d7f47ff225 100644 --- a/comms/dht/src/store_forward/error.rs +++ b/comms/dht/src/store_forward/error.rs @@ -27,14 +27,12 @@ use tari_comms::{ message::MessageError, peer_manager::{NodeId, PeerManagerError}, }; -use tari_utilities::{byte_array::ByteArrayError, epoch_time::EpochTime}; use thiserror::Error; use crate::{ actor::DhtActorError, envelope::DhtMessageError, error::DhtEncryptError, - inbound::DhtInboundError, message_signature::MessageSignatureError, outbound::DhtOutboundError, storage::StorageError, @@ -55,14 +53,12 @@ pub enum StoreAndForwardError { DhtEncryptError(#[from] DhtEncryptError), #[error("Received stored message has an invalid destination")] InvalidDestination, - #[error("DhtInboundError: {0}")] - DhtInboundError(#[from] DhtInboundError), #[error("Received stored message has an invalid origin signature: {0}")] InvalidMessageSignature(#[from] MessageSignatureError), - #[error("Invalid envelope body")] - InvalidEnvelopeBody, - #[error("DHT header is invalid")] - InvalidDhtHeader, + #[error("Envelope body is missing a required message part")] + EnvelopeBodyMissingMessagePart, + #[error("DHT header did not pass semantic validation rules")] + BadDhtHeaderSemanticallyInvalid, #[error("Unable to decrypt received stored message")] DecryptionFailed, #[error("DhtActorError: {0}")] @@ -71,10 +67,8 @@ pub enum StoreAndForwardError { DuplicateMessage, #[error("Unable to decode message: {0}")] DecodeError(#[from] DecodeError), - #[error("Dht header was not provided")] - DhtHeaderNotProvided, - #[error("The message was malformed")] - MalformedMessage, + #[error("The message envelope was malformed: {0}")] + MalformedEnvelopeBody(DecodeError), #[error("StorageError: {0}")] StorageError(#[from] StorageError), #[error("The store and forward service requester channel closed")] @@ -83,24 +77,16 @@ pub enum StoreAndForwardError { RequestCancelled, #[error("The {field} field was not valid, discarding SAF response: {details}")] InvalidSafResponseMessage { field: &'static str, details: String }, - #[error("The message has expired, not storing message in SAF db (expiry: {expired}, now: {now})")] - NotStoringExpiredMessage { expired: EpochTime, now: EpochTime }, - #[error("MalformedNodeId: {0}")] - MalformedNodeId(String), - #[error("DHT message type should not have been forwarded")] - InvalidDhtMessageType, - #[error("Failed to send request for store and forward messages: {0}")] - RequestMessagesFailed(DhtOutboundError), + #[error("DHT message type should not have been stored/forwarded")] + PeerSentDhtMessageViaSaf, + #[error("SAF message type should not have been stored/forwarded")] + PeerSentSafMessageViaSaf, #[error("Received SAF messages that were not requested")] ReceivedUnrequestedSafMessages, #[error("SAF messages received from peer {peer} after deadline. Received after {message_age:.2?}")] SafMessagesReceivedAfterDeadline { peer: NodeId, message_age: Duration }, #[error("Invalid SAF request: `stored_at` cannot be in the future")] StoredAtWasInFuture, -} - -impl From for StoreAndForwardError { - fn from(e: ByteArrayError) -> Self { - StoreAndForwardError::MalformedNodeId(e.to_string()) - } + #[error("Invariant error (POSSIBLE BUG): {0}")] + InvariantError(String), } diff --git a/comms/dht/src/store_forward/message.rs b/comms/dht/src/store_forward/message.rs index f74af32c61..bdce545204 100644 --- a/comms/dht/src/store_forward/message.rs +++ b/comms/dht/src/store_forward/message.rs @@ -40,14 +40,15 @@ impl StoredMessagesRequest { Self { since: None, request_id: OsRng.next_u32(), + limit: 0, } } - #[allow(unused)] pub fn since(since: DateTime) -> Self { Self { since: Some(datetime_to_timestamp(since)), request_id: OsRng.next_u32(), + limit: 0, } } } diff --git a/comms/dht/src/store_forward/saf_handler/task.rs b/comms/dht/src/store_forward/saf_handler/task.rs index 1247cc8236..4c987c5111 100644 --- a/comms/dht/src/store_forward/saf_handler/task.rs +++ b/comms/dht/src/store_forward/saf_handler/task.rs @@ -21,6 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::{ + cmp, convert::{TryFrom, TryInto}, sync::Arc, }; @@ -41,10 +42,10 @@ use tokio::sync::mpsc; use tower::{Service, ServiceExt}; use crate::{ - actor::DhtRequester, + actor::{DhtRequester, OffenceSeverity}, crypt, dedup, - envelope::{timestamp_to_datetime, DhtMessageHeader, NodeDestination}, + envelope::{timestamp_to_datetime, DhtMessageError, DhtMessageHeader, NodeDestination}, inbound::{DecryptedDhtMessage, DhtInboundMessage}, message_signature::{MessageSignature, MessageSignatureError, ProtoMessageSignature}, outbound::{OutboundMessageRequester, SendMessageParams}, @@ -185,7 +186,7 @@ where S: Service let retrieve_msgs = msg .decode_part::(0)? - .ok_or(StoreAndForwardError::InvalidEnvelopeBody)?; + .ok_or(StoreAndForwardError::EnvelopeBodyMissingMessagePart)?; let source_pubkey = Box::new(message.source_peer.public_key.clone()); let source_node_id = Box::new(message.source_peer.node_id.clone()); @@ -193,6 +194,19 @@ where S: Service // Compile a set of stored messages for the requesting peer let mut query = FetchStoredMessageQuery::new(source_pubkey, source_node_id.clone()); + let max = u32::try_from(self.config.max_returned_messages).unwrap_or_else(|_| { + warn!(target: LOG_TARGET,"Your node is configured with an extremely high number for max_returned_messages. This will likely be disregarded by peers."); + u32::MAX + }); + // limit of 0 means no hard limit, though we still limit to our configured limit + if retrieve_msgs.limit == 0 { + query.with_limit(max); + } else { + // Return up to the limit. The limit cannot exceed our locally configured max_returned_messages setting. + // Returning less than requested is completely expected. + query.with_limit(cmp::min(retrieve_msgs.limit, max)); + } + let since = match retrieve_msgs.since.and_then(timestamp_to_datetime) { Some(since) => { debug!( @@ -273,7 +287,8 @@ where S: Service let message_tag = message.dht_header.message_tag; if let Err(err) = self.check_saf_messages_were_requested(&source_node_id).await { - // Peer sent SAF messages we didn't request?? #banheuristics + // Peer sent SAF messages we didn't request, it was cancelled locally or sent it more than 4 to 10 minutes + // late?? #banheuristics warn!(target: LOG_TARGET, "SAF response check failed: {}", err); return Ok(()); } @@ -283,7 +298,31 @@ where S: Service .expect("already checked that this message decrypted successfully"); let response = msg .decode_part::(0)? - .ok_or(StoreAndForwardError::InvalidEnvelopeBody)?; + .ok_or(StoreAndForwardError::EnvelopeBodyMissingMessagePart)?; + + if response.messages.len() > self.config.max_returned_messages { + warn!( + target: LOG_TARGET, + "Peer '{}' sent {} stored messages which is more than the maximum allowed of {}. Discarding \ + messages.", + source_node_id.short_str(), + response.messages.len(), + self.config.max_returned_messages + ); + self.dht_requester + .ban_peer( + message.source_peer.public_key.clone(), + OffenceSeverity::High, + format!( + "Peer sent too many stored messages ({} of {})", + response.messages.len(), + self.config.max_returned_messages + ), + ) + .await; + return Ok(()); + } + let source_peer = message.source_peer.clone(); debug!( @@ -298,65 +337,10 @@ where S: Service message_tag ); - let results = self + let successful_messages = self .process_incoming_stored_messages(source_peer.clone(), response.messages) .await?; - let successful_msgs_iter = results - .into_iter() - .map(|result| { - match &result { - Ok(msg) => { - trace!(target: LOG_TARGET, "Recv SAF message: {}", msg); - }, - // Failed decryption is acceptable, the message wasn't for this node so we - // simply discard the message. - Err(err @ StoreAndForwardError::DecryptionFailed) => { - debug!( - target: LOG_TARGET, - "Unable to decrypt stored message sent by {}: {}", - source_peer.node_id.short_str(), - err - ); - }, - // The peer that originally sent this message is not known to us. - Err(StoreAndForwardError::PeerManagerError(PeerManagerError::PeerNotFoundError)) => { - debug!(target: LOG_TARGET, "Origin peer not found. Discarding stored message."); - }, - - // Failed to send request to Dht Actor, something has gone very wrong - Err(StoreAndForwardError::DhtActorError(err)) => { - error!( - target: LOG_TARGET, - "DhtActor returned an error. {}. This could indicate a system malfunction.", err - ); - }, - // Duplicate message detected, no problem it happens. - Err(StoreAndForwardError::DuplicateMessage) => { - debug!( - target: LOG_TARGET, - "Store and forward received a duplicate message. Message discarded." - ); - }, - - // Every other error shouldn't happen if the sending node is behaving - Err(err) => { - // #banheuristics - warn!( - target: LOG_TARGET, - "SECURITY: invalid store and forward message was discarded from NodeId={}. Reason: {}. \ - These messages should never have been forwarded. This is a sign of a badly behaving node.", - source_peer.node_id.short_str(), - err - ); - }, - } - - result - }) - .filter(Result::is_ok) - .map(Result::unwrap); - // Let the SAF Service know we got a SAF response. let _ = self .saf_response_signal_sender @@ -365,7 +349,7 @@ where S: Service .map_err(|e| warn!(target: LOG_TARGET, "Error sending SAF response signal; {:?}", e)); self.next_service - .call_all(stream::iter(successful_msgs_iter)) + .call_all(stream::iter(successful_messages)) .unordered() .for_each(|service_result| { if let Err(err) = service_result { @@ -382,25 +366,33 @@ where S: Service &mut self, source_peer: Arc, messages: Vec, - ) -> Result>, StoreAndForwardError> { + ) -> Result, StoreAndForwardError> { let mut last_saf_received = self .dht_requester .get_metadata::>(DhtMetadataKey::LastSafMessageReceived) .await?; + // Allocations: the number of messages has already been bounds checked to be <= + // self.config.max_returned_messages let mut results = Vec::with_capacity(messages.len()); for msg in messages { let result = self .validate_and_decrypt_incoming_stored_message(Arc::clone(&source_peer), msg) .await; - if let Ok((_, stored_at)) = result.as_ref() { - if last_saf_received.as_ref().map(|dt| stored_at > dt).unwrap_or(true) { - last_saf_received = Some(*stored_at); - } + let Some(result) = self.process_saf_message_validation_result(&source_peer.public_key, result).await else { + // Logging of problems and banning are done inside process_saf_message. We can simply continue + continue; + }; + + // If the messages should no longer be processed because we banned the peer, we exit here on Err + let (msg, stored_at) = result?; + + if last_saf_received.as_ref().map(|dt| stored_at > *dt).unwrap_or(true) { + last_saf_received = Some(stored_at); } - results.push(result.map(|(msg, _)| msg)); + results.push(msg); } if let Some(last_saf_received) = last_saf_received { @@ -418,8 +410,12 @@ where S: Service message: ProtoStoredMessage, ) -> Result<(DecryptedDhtMessage, DateTime), StoreAndForwardError> { let node_identity = &self.node_identity; - if message.dht_header.is_none() { - return Err(StoreAndForwardError::DhtHeaderNotProvided); + let Some(dht_header) = message.dht_header else { + return Err(StoreAndForwardError::DhtMessageError(DhtMessageError::HeaderOmitted)); + }; + + if message.body.is_empty() { + return Err(StoreAndForwardError::DhtMessageError(DhtMessageError::BodyEmpty)); } let stored_at = message @@ -429,7 +425,7 @@ where S: Service NaiveDateTime::from_timestamp_opt(t.seconds, 0).ok_or_else(|| { StoreAndForwardError::InvalidSafResponseMessage { field: "stored_at", - details: "number of seconds provided represents more days than can fit in a u32" + details: "number of seconds provided represents more days than can fit in a NaiveDateTime" .to_string(), } })?, @@ -443,48 +439,37 @@ where S: Service return Err(StoreAndForwardError::StoredAtWasInFuture); } - let msg_hash = dedup::create_message_hash( - message - .dht_header - .as_ref() - .map(|h| h.message_signature.as_slice()) - .unwrap_or(&[]), - &message.body, - ); + let msg_hash = dedup::create_message_hash(&dht_header.message_signature, &message.body); - let dht_header: DhtMessageHeader = message - .dht_header - .expect("previously checked") - .try_into() - .map_err(StoreAndForwardError::DhtMessageError)?; + let dht_header: DhtMessageHeader = dht_header.try_into().map_err(StoreAndForwardError::DhtMessageError)?; - if !dht_header.is_valid() { - return Err(StoreAndForwardError::InvalidDhtHeader); + if !dht_header.is_semantically_valid() { + return Err(StoreAndForwardError::BadDhtHeaderSemanticallyInvalid); } let message_type = dht_header.message_type; if message_type.is_dht_message() { - if !message_type.is_dht_discovery() { - debug!( - target: LOG_TARGET, - "Discarding {} message from peer '{}'", - message_type, - source_peer.node_id.short_str() - ); - return Err(StoreAndForwardError::InvalidDhtMessageType); - } - if dht_header.destination.is_unknown() { - debug!( - target: LOG_TARGET, - "Discarding anonymous discovery message from peer '{}'", - source_peer.node_id.short_str() - ); - return Err(StoreAndForwardError::InvalidDhtMessageType); - } + debug!( + target: LOG_TARGET, + "Discarding {} message from peer '{}'", + message_type, + source_peer.node_id.short_str() + ); + return Err(StoreAndForwardError::PeerSentDhtMessageViaSaf); + } + + if message_type.is_saf_message() { + debug!( + target: LOG_TARGET, + "Discarding {} message from peer '{}'", + message_type, + source_peer.node_id.short_str() + ); + return Err(StoreAndForwardError::PeerSentSafMessageViaSaf); } // Check that the destination is either undisclosed, for us or for our network region - Self::check_destination(node_identity, &dht_header).await?; + Self::check_destination_for(node_identity.public_key(), &dht_header).await?; // Attempt to decrypt the message (if applicable), and deserialize it let (authenticated_pk, decrypted_body) = @@ -521,13 +506,13 @@ where S: Service } } - async fn check_destination( - node_identity: &NodeIdentity, + async fn check_destination_for( + public_key: &CommsPublicKey, dht_header: &DhtMessageHeader, ) -> Result<(), StoreAndForwardError> { let is_valid_destination = match &dht_header.destination { NodeDestination::Unknown => true, - NodeDestination::PublicKey(pk) => node_identity.public_key() == &**pk, + NodeDestination::PublicKey(pk) => *public_key == **pk, }; if is_valid_destination { @@ -568,11 +553,12 @@ where S: Service let envelope_body = EnvelopeBody::decode(decrypted_bytes.freeze()).map_err(|_| StoreAndForwardError::DecryptionFailed)?; if envelope_body.is_empty() { - return Err(StoreAndForwardError::InvalidEnvelopeBody); + return Err(StoreAndForwardError::EnvelopeBodyMissingMessagePart); } // Unmask the sender public key - let mask = crypt::generate_key_mask(&shared_ephemeral_secret)?; + let mask = crypt::generate_key_mask(&shared_ephemeral_secret) + .map_err(|e| StoreAndForwardError::InvariantError(e.to_string()))?; let mask_inverse = mask.invert().ok_or(StoreAndForwardError::DecryptionFailed)?; Ok((Some(mask_inverse * masked_sender_public_key), envelope_body)) } else { @@ -581,7 +567,7 @@ where S: Service } else { Some(Self::authenticate_message(&header.message_signature, header, body)?) }; - let envelope_body = EnvelopeBody::decode(body).map_err(|_| StoreAndForwardError::MalformedMessage)?; + let envelope_body = EnvelopeBody::decode(body).map_err(StoreAndForwardError::MalformedEnvelopeBody)?; Ok((authenticated_pk, envelope_body)) } } @@ -615,6 +601,163 @@ where S: Service None => Err(StoreAndForwardError::ReceivedUnrequestedSafMessages), } } + + #[allow(clippy::too_many_lines)] + pub async fn process_saf_message_validation_result( + &mut self, + source_peer: &CommsPublicKey, + result: Result, + ) -> Option> { + match result { + Ok(t) => Some(Ok(t)), + // Failed decryption is acceptable, the message wasn't for this node so we + // simply discard the message. + Err(err @ StoreAndForwardError::DhtEncryptError(_)) | Err(err @ StoreAndForwardError::DecryptionFailed) => { + debug!( + target: LOG_TARGET, + "Unable to decrypt stored message sent by {}: {}", + source_peer, + err + ); + None + }, + // The peer that originally sent this message is not known to us. + Err(StoreAndForwardError::PeerManagerError(PeerManagerError::PeerNotFoundError)) => { + debug!(target: LOG_TARGET, "Origin peer not found. Discarding stored message."); + None + }, + Err(StoreAndForwardError::PeerManagerError(PeerManagerError::BannedPeer)) => { + debug!(target: LOG_TARGET, "Origin peer was banned. Discarding stored message."); + None + }, + + // These aren't be possible in this function if the code is correct. + Err(err @ StoreAndForwardError::InvariantError(_)) | + Err(err @ StoreAndForwardError::SafMessagesReceivedAfterDeadline { .. }) | + Err(err @ StoreAndForwardError::ReceivedUnrequestedSafMessages) => { + error!(target: LOG_TARGET, "BUG: unreachable error reached! {}", err); + None + }, + + // Internal errors + Err(err @ StoreAndForwardError::RequestCancelled) | + Err(err @ StoreAndForwardError::RequesterChannelClosed) | + Err(err @ StoreAndForwardError::DhtOutboundError(_)) | + Err(err @ StoreAndForwardError::StorageError(_)) | + Err(err @ StoreAndForwardError::PeerManagerError(_)) => { + error!(target: LOG_TARGET, "Internal error: {}", err); + None + }, + + // Failed to send request to Dht Actor, something has gone very wrong + Err(StoreAndForwardError::DhtActorError(err)) => { + error!( + target: LOG_TARGET, + "DhtActor returned an error. {}. This could indicate a system malfunction.", err + ); + None + }, + // Duplicate message detected, no problem it happens. + Err(StoreAndForwardError::DuplicateMessage) => { + debug!( + target: LOG_TARGET, + "Store and forward received a duplicate message. Message discarded." + ); + None + }, + + // The decrypted message did not contain a required message part. The sender has no way to know this + // so we can just ignore the message + Err(StoreAndForwardError::EnvelopeBodyMissingMessagePart) => { + debug!( + target: LOG_TARGET, + "Received stored message from peer `{}` that is missing a required message part. Message \ + discarded.", + source_peer + ); + None + }, + + // Peer sent an invalid SAF reply + Err(err @ StoreAndForwardError::StoredAtWasInFuture) | + Err(err @ StoreAndForwardError::InvalidSafResponseMessage { .. }) => { + warn!( + target: LOG_TARGET, + "SECURITY: invalid store and forward message was discarded from NodeId={}. Reason: {}. \ + This is a sign of a badly behaving node.", + source_peer, + err + ); + self.dht_requester + .ban_peer(source_peer.clone(), OffenceSeverity::High, &err) + .await; + Some(Err(err)) + }, + + // Ban - peer sent us a message containing an invalid DhtHeader or encoded signature. They should + // have discarded this message. + Err(err @ StoreAndForwardError::DecodeError(_)) | + Err(err @ StoreAndForwardError::MessageError(_)) | + Err(err @ StoreAndForwardError::MalformedEnvelopeBody(_)) | + Err(err @ StoreAndForwardError::DhtMessageError(_)) => { + warn!( + target: LOG_TARGET, + "SECURITY: invalid store and forward message was discarded from NodeId={}. Reason: {}. \ + These messages should never have been forwarded. This is a sign of a badly behaving node.", + source_peer, + err + ); + self.dht_requester + .ban_peer(source_peer.clone(), OffenceSeverity::Medium, &err) + .await; + Some(Err(err)) + }, + + Err(err @ StoreAndForwardError::BadDhtHeaderSemanticallyInvalid) | + Err(err @ StoreAndForwardError::InvalidMessageSignature(_)) => { + warn!( + target: LOG_TARGET, + "SECURITY: invalid store and forward message was discarded from NodeId={}. Reason: {}. \ + These messages should never have been forwarded. This is a sign of a badly behaving node.", + source_peer, + err + ); + self.dht_requester + .ban_peer(source_peer.clone(), OffenceSeverity::High, &err) + .await; + Some(Err(err)) + }, + + // The destination for this message is not this node, so the sender should not have sent it + Err(err @ StoreAndForwardError::InvalidDestination) => { + warn!( + target: LOG_TARGET, + "SECURITY: invalid store and forward message was discarded from NodeId={}. Reason: {}. \ + These messages should never have been forwarded. This is a sign of a badly behaving node.", + source_peer, + err + ); + self.dht_requester + .ban_peer(source_peer.clone(), OffenceSeverity::High, &err) + .await; + Some(Err(err)) + }, + Err(err @ StoreAndForwardError::PeerSentDhtMessageViaSaf) | + Err(err @ StoreAndForwardError::PeerSentSafMessageViaSaf) => { + warn!( + target: LOG_TARGET, + "SECURITY: invalid store and forward message was discarded from NodeId={}. Reason: {}. \ + These messages should never have been forwarded. This is a sign of a badly behaving node.", + source_peer, + err + ); + self.dht_requester + .ban_peer(source_peer.clone(), OffenceSeverity::High, &err) + .await; + Some(Err(err)) + }, + } + } } #[cfg(test)] @@ -832,7 +975,7 @@ mod test { let msg_a = wrap_in_envelope_body!(&b"A".to_vec()); let inbound_msg_a = - make_dht_inbound_message(&node_identity, &msg_a, DhtMessageFlags::ENCRYPTED, true, false).unwrap(); + make_dht_inbound_message(&node_identity, &msg_a, DhtMessageFlags::ENCRYPTED, true, true).unwrap(); // Need to know the peer to process a stored message peer_manager .add_peer(Clone::clone(&*inbound_msg_a.source_peer)) @@ -841,7 +984,7 @@ mod test { let msg_b = wrap_in_envelope_body!(b"B".to_vec()); let inbound_msg_b = - make_dht_inbound_message(&node_identity, &msg_b, DhtMessageFlags::ENCRYPTED, true, false).unwrap(); + make_dht_inbound_message(&node_identity, &msg_b, DhtMessageFlags::ENCRYPTED, true, true).unwrap(); // Need to know the peer to process a stored message peer_manager .add_peer(Clone::clone(&*inbound_msg_b.source_peer)) @@ -936,6 +1079,98 @@ mod test { assert_eq!(last_saf_received.second(), msg2_time.second()); } + #[tokio::test] + #[allow(clippy::similar_names, clippy::too_many_lines)] + async fn rejected_with_bad_message_semantics() { + let spy = service_spy(); + let (saf_requester, saf_mock_state) = create_store_and_forward_mock(); + + let peer_manager = build_peer_manager(); + let (oms_tx, _) = mpsc::channel(1); + + let node_identity = make_node_identity(); + + let msg_a = wrap_in_envelope_body!(&b"A".to_vec()); + + let inbound_msg_a = + make_dht_inbound_message(&node_identity, &msg_a, DhtMessageFlags::ENCRYPTED, true, false).unwrap(); + // Need to know the peer to process a stored message + peer_manager + .add_peer(Clone::clone(&*inbound_msg_a.source_peer)) + .await + .unwrap(); + + let msg_b = wrap_in_envelope_body!(b"B".to_vec()); + let inbound_msg_b = + make_dht_inbound_message(&node_identity, &msg_b, DhtMessageFlags::ENCRYPTED, false, true).unwrap(); + // Need to know the peer to process a stored message + peer_manager + .add_peer(Clone::clone(&*inbound_msg_b.source_peer)) + .await + .unwrap(); + + let msg1_time = Utc::now() + .checked_sub_signed(chrono::Duration::from_std(Duration::from_secs(60)).unwrap()) + .unwrap(); + let msg1 = ProtoStoredMessage::new(0, inbound_msg_a.dht_header.clone(), inbound_msg_a.body, msg1_time); + let msg2_time = Utc::now() + .checked_sub_signed(chrono::Duration::from_std(Duration::from_secs(30)).unwrap()) + .unwrap(); + let msg2 = ProtoStoredMessage::new(0, inbound_msg_b.dht_header, inbound_msg_b.body, msg2_time); + + let mut message = DecryptedDhtMessage::succeeded( + wrap_in_envelope_body!(StoredMessagesResponse { + messages: vec![msg1.clone(), msg2], + request_id: 123, + response_type: 0 + }), + None, + make_dht_inbound_message( + &node_identity, + &b"Stored message".to_vec(), + DhtMessageFlags::ENCRYPTED, + false, + false, + ) + .unwrap(), + ); + message.dht_header.message_type = DhtMessageType::SafStoredMessages; + + let (mut dht_requester, mock) = create_dht_actor_mock(1); + task::spawn(mock.run()); + let (saf_response_signal_sender, _) = mpsc::channel(20); + + assert!(dht_requester + .get_metadata::>(DhtMetadataKey::LastSafMessageReceived) + .await + .unwrap() + .is_none()); + + // Allow request inflight check to pass + saf_mock_state.set_request_inflight(Some(Duration::from_secs(10))).await; + + let task = MessageHandlerTask::new( + Default::default(), + spy.to_service::(), + saf_requester, + dht_requester.clone(), + OutboundMessageRequester::new(oms_tx), + node_identity, + message, + saf_response_signal_sender, + ); + + let err = task.run().await.unwrap_err(); + matches!( + err.downcast_ref::().unwrap(), + StoreAndForwardError::BadDhtHeaderSemanticallyInvalid + ); + + assert_eq!(spy.call_count(), 0); + let requests = spy.take_requests(); + assert_eq!(requests.len(), 0); + } + #[tokio::test] async fn stored_at_in_future() { let spy = service_spy(); @@ -1021,7 +1256,7 @@ mod test { let msg_a = wrap_in_envelope_body!(&b"A".to_vec()); let inbound_msg_a = - make_dht_inbound_message(&node_identity, &msg_a, DhtMessageFlags::ENCRYPTED, true, false).unwrap(); + make_dht_inbound_message(&node_identity, &msg_a, DhtMessageFlags::ENCRYPTED, true, true).unwrap(); peer_manager .add_peer(Clone::clone(&*inbound_msg_a.source_peer)) .await diff --git a/comms/dht/src/store_forward/service.rs b/comms/dht/src/store_forward/service.rs index 3285795c87..d39132ff76 100644 --- a/comms/dht/src/store_forward/service.rs +++ b/comms/dht/src/store_forward/service.rs @@ -20,7 +20,11 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{convert::TryFrom, sync::Arc, time::Duration}; +use std::{ + convert::{TryFrom, TryInto}, + sync::Arc, + time::Duration, +}; use chrono::{DateTime, NaiveDateTime, Utc}; use log::*; @@ -67,6 +71,7 @@ pub struct FetchStoredMessageQuery { node_id: Box, since: Option>, response_type: SafResponseType, + limit: Option, } impl FetchStoredMessageQuery { @@ -77,9 +82,16 @@ impl FetchStoredMessageQuery { node_id, since: None, response_type: SafResponseType::Anonymous, + limit: None, } } + /// Limit the number of messages returned + pub fn with_limit(&mut self, limit: u32) -> &mut Self { + self.limit = Some(limit); + self + } + /// Modify query to only include messages since the given date. pub fn with_messages_since(&mut self, since: DateTime) -> &mut Self { self.since = Some(since); @@ -401,8 +413,7 @@ impl StoreAndForwardService { .finish(), request, ) - .await - .map_err(StoreAndForwardError::RequestMessagesFailed)?; + .await?; Ok(()) } @@ -428,20 +439,21 @@ impl StoreAndForwardService { .finish(), request, ) - .await - .map_err(StoreAndForwardError::RequestMessagesFailed)?; + .await?; Ok(()) } async fn get_saf_request(&mut self) -> SafResult { - let request = self + let mut request = self .dht_requester .get_metadata(DhtMetadataKey::LastSafMessageReceived) .await? .map(StoredMessagesRequest::since) .unwrap_or_else(StoredMessagesRequest::new); + request.limit = self.config.max_returned_messages.try_into().unwrap_or(u32::MAX); + Ok(request) } @@ -473,9 +485,10 @@ impl StoreAndForwardService { fn handle_fetch_message_query(&self, query: &FetchStoredMessageQuery) -> SafResult> { use SafResponseType::{Anonymous, Discovery, ForMe, Join}; - let limit = i64::try_from(self.config.max_returned_messages) - .ok() - .unwrap_or(std::i64::MAX); + let limit = query + .limit + .and_then(|v| i64::try_from(v).ok()) + .unwrap_or(self.config.max_returned_messages as i64); let db = &self.database; let messages = match query.response_type { ForMe => db.find_messages_for_peer(&query.public_key, &query.node_id, query.since, limit)?, diff --git a/comms/dht/src/store_forward/store.rs b/comms/dht/src/store_forward/store.rs index 6c177cecfb..16283bede2 100644 --- a/comms/dht/src/store_forward/store.rs +++ b/comms/dht/src/store_forward/store.rs @@ -34,13 +34,7 @@ use tower::{layer::Layer, Service, ServiceExt}; use super::StoreAndForwardRequester; use crate::{ inbound::DecryptedDhtMessage, - store_forward::{ - database::NewStoredMessage, - error::StoreAndForwardError, - message::StoredMessagePriority, - SafConfig, - SafResult, - }, + store_forward::{database::NewStoredMessage, message::StoredMessagePriority, SafConfig, SafResult}, }; const LOG_TARGET: &str = "comms::dht::storeforward::store"; @@ -205,10 +199,12 @@ where S: Service + Se } message.set_saf_stored(false); - if let Some(priority) = self.get_storage_priority(&message).await? { - message.set_saf_stored(true); - let existing = self.store(priority, message.clone()).await?; - message.set_already_forwarded(existing); + if self.is_valid_for_storage(&message) { + if let Some(priority) = self.get_storage_priority(&message).await? { + message.set_saf_stored(true); + let existing = self.store(priority, message.clone()).await?; + message.set_already_forwarded(existing); + } } trace!( @@ -222,6 +218,35 @@ where S: Service + Se service.oneshot(message).await } + fn is_valid_for_storage(&self, message: &DecryptedDhtMessage) -> bool { + if message.body_len() == 0 { + debug!( + target: LOG_TARGET, + "Message {} from peer '{}' not eligible for SAF storage because it has no body (Trace: {})", + message.tag, + message.source_peer.node_id.short_str(), + message.dht_header.message_tag + ); + return false; + } + + if let Some(expires) = message.dht_header.expires { + let now = EpochTime::now(); + if expires < now { + debug!( + target: LOG_TARGET, + "Message {} from peer '{}' not eligible for SAF storage because it has expired (Trace: {})", + message.tag, + message.source_peer.node_id.short_str(), + message.dht_header.message_tag + ); + return false; + } + } + + true + } + async fn get_storage_priority(&self, message: &DecryptedDhtMessage) -> SafResult> { let log_not_eligible = |reason: &str| { debug!( @@ -248,13 +273,8 @@ where S: Service + Se return Ok(None); } - if message.dht_header.message_type.is_dht_join() { - log_not_eligible("it is a join message"); - return Ok(None); - } - - if message.dht_header.message_type.is_dht_discovery() { - log_not_eligible("it is a discovery message"); + if message.dht_header.message_type.is_dht_message() { + log_not_eligible(&format!("it is a DHT {} message", message.dht_header.message_type)); return Ok(None); } @@ -389,13 +409,6 @@ where S: Service + Se message.dht_header.message_tag, ); - if let Some(expires) = message.dht_header.expires { - let now = EpochTime::now(); - if expires < now { - return Err(StoreAndForwardError::NotStoringExpiredMessage { expired: expires, now }); - } - } - let stored_message = NewStoredMessage::new(message, priority); self.saf_requester.insert_message(stored_message).await } diff --git a/comms/dht/tests/attacks.rs b/comms/dht/tests/attacks.rs index c7c460fc87..319e17168c 100644 --- a/comms/dht/tests/attacks.rs +++ b/comms/dht/tests/attacks.rs @@ -99,7 +99,7 @@ async fn large_join_messages_with_many_addresses() { .await .unwrap(), expect = true, - max_attempts = 10, + max_attempts = 20, interval = Duration::from_secs(1) ); // Node B did not propagate