diff --git a/comms/dht/src/dht.rs b/comms/dht/src/dht.rs index 15e444a3da..2e5d6c2182 100644 --- a/comms/dht/src/dht.rs +++ b/comms/dht/src/dht.rs @@ -315,7 +315,7 @@ mod test { use futures::{channel::mpsc, StreamExt}; use std::{sync::Arc, time::Duration}; use tari_comms::{ - message::{MessageExt, MessageFlags}, + message::MessageExt, pipeline::SinkService, test_utils::mocks::create_connection_manager_mock, wrap_in_envelope_body, @@ -354,11 +354,8 @@ mod test { msg.to_encoded_bytes().unwrap(), DhtMessageFlags::empty(), ); - let inbound_message = make_comms_inbound_message( - &node_identity, - dht_envelope.to_encoded_bytes().unwrap().into(), - MessageFlags::empty(), - ); + let inbound_message = + make_comms_inbound_message(&node_identity, dht_envelope.to_encoded_bytes().unwrap().into()); let msg = { service.call(inbound_message).await.unwrap(); @@ -400,11 +397,8 @@ mod test { let ecdh_key = crypt::generate_ecdh_secret(node_identity.secret_key(), node_identity.public_key()); let encrypted_bytes = crypt::encrypt(&ecdh_key, &msg.to_encoded_bytes().unwrap()).unwrap(); let dht_envelope = make_dht_envelope(&node_identity, encrypted_bytes, DhtMessageFlags::ENCRYPTED); - let inbound_message = make_comms_inbound_message( - &node_identity, - dht_envelope.to_encoded_bytes().unwrap().into(), - MessageFlags::empty(), - ); + let inbound_message = + make_comms_inbound_message(&node_identity, dht_envelope.to_encoded_bytes().unwrap().into()); let msg = { service.call(inbound_message).await.unwrap(); @@ -460,11 +454,8 @@ mod test { .unwrap() .signature .clone(); - let inbound_message = make_comms_inbound_message( - &node_identity, - dht_envelope.to_encoded_bytes().unwrap().into(), - MessageFlags::empty(), - ); + let inbound_message = + make_comms_inbound_message(&node_identity, dht_envelope.to_encoded_bytes().unwrap().into()); service.call(inbound_message).await.unwrap(); @@ -513,11 +504,8 @@ mod test { header.message_type = DhtMessageType::SafStoredMessages as i32; Some(header) }); - let inbound_message = make_comms_inbound_message( - &node_identity, - dht_envelope.to_encoded_bytes().unwrap().into(), - MessageFlags::empty(), - ); + let inbound_message = + make_comms_inbound_message(&node_identity, dht_envelope.to_encoded_bytes().unwrap().into()); service.call(inbound_message).await.unwrap_err(); // This seems like the best way to tell that an open channel is empty without the test blocking indefinitely diff --git a/comms/dht/src/inbound/deserialize.rs b/comms/dht/src/inbound/deserialize.rs index d8fef9ec8b..cf94c2d471 100644 --- a/comms/dht/src/inbound/deserialize.rs +++ b/comms/dht/src/inbound/deserialize.rs @@ -132,7 +132,7 @@ mod test { test_utils::{make_comms_inbound_message, make_dht_envelope, make_node_identity, service_spy}, }; use futures::executor::block_on; - use tari_comms::message::{MessageExt, MessageFlags}; + use tari_comms::message::MessageExt; use tari_test_utils::panic_context; #[test] @@ -148,7 +148,6 @@ mod test { block_on(deserialize.call(make_comms_inbound_message( &node_identity, dht_envelope.to_encoded_bytes().unwrap().into(), - MessageFlags::empty(), ))) .unwrap(); diff --git a/comms/dht/src/outbound/broadcast.rs b/comms/dht/src/outbound/broadcast.rs index cf13ad23ff..33bc492f0b 100644 --- a/comms/dht/src/outbound/broadcast.rs +++ b/comms/dht/src/outbound/broadcast.rs @@ -43,7 +43,6 @@ use futures::{ use log::*; use std::{sync::Arc, task::Poll}; use tari_comms::{ - message::MessageFlags, peer_manager::{NodeId, NodeIdentity, Peer}, pipeline::PipelineError, types::CommsPublicKey, @@ -433,15 +432,7 @@ where S: Service // Construct a MessageEnvelope for each recipient let messages = selected_peers .into_iter() - .map(|peer| { - DhtOutboundMessage::new( - peer, - dht_header.clone(), - encryption.clone(), - MessageFlags::NONE, - body.clone(), - ) - }) + .map(|peer| DhtOutboundMessage::new(peer, dht_header.clone(), encryption.clone(), body.clone())) .collect::>(); Ok(messages) diff --git a/comms/dht/src/outbound/encryption.rs b/comms/dht/src/outbound/encryption.rs index cb3af89d76..659b6ca067 100644 --- a/comms/dht/src/outbound/encryption.rs +++ b/comms/dht/src/outbound/encryption.rs @@ -118,7 +118,6 @@ mod test { }; use futures::executor::block_on; use tari_comms::{ - message::MessageFlags, net_address::MultiaddressesWithStats, peer_manager::{NodeId, Peer, PeerFeatures, PeerFlags}, types::CommsPublicKey, @@ -146,7 +145,6 @@ mod test { ), make_dht_header(&node_identity, &body, DhtMessageFlags::empty()), OutboundEncryption::None, - MessageFlags::empty(), body.clone(), ); block_on(encryption.call(msg)).unwrap(); @@ -177,7 +175,6 @@ mod test { ), make_dht_header(&node_identity, &body, DhtMessageFlags::ENCRYPTED), OutboundEncryption::EncryptFor(Box::new(CommsPublicKey::default())), - MessageFlags::empty(), body.clone(), ); block_on(encryption.call(msg)).unwrap(); diff --git a/comms/dht/src/outbound/message.rs b/comms/dht/src/outbound/message.rs index 9bcb501929..a3f4229793 100644 --- a/comms/dht/src/outbound/message.rs +++ b/comms/dht/src/outbound/message.rs @@ -26,11 +26,7 @@ use crate::{ }; use futures::channel::oneshot; use std::{fmt, fmt::Display}; -use tari_comms::{ - message::{MessageFlags, MessageTag}, - peer_manager::Peer, - types::CommsPublicKey, -}; +use tari_comms::{message::MessageTag, peer_manager::Peer, types::CommsPublicKey}; use tari_crypto::tari_utilities::hex::Hex; /// Determines if an outbound message should be Encrypted and, if so, for which public key @@ -142,7 +138,6 @@ pub struct DhtOutboundMessage { pub tag: MessageTag, pub destination_peer: Peer, pub dht_header: DhtMessageHeader, - pub comms_flags: MessageFlags, pub encryption: OutboundEncryption, pub body: Vec, } @@ -153,7 +148,6 @@ impl DhtOutboundMessage { destination_peer: Peer, dht_header: DhtMessageHeader, encryption: OutboundEncryption, - comms_flags: MessageFlags, body: Vec, ) -> Self { @@ -162,7 +156,6 @@ impl DhtOutboundMessage { destination_peer, dht_header, encryption, - comms_flags, body, } } diff --git a/comms/dht/src/outbound/serialize.rs b/comms/dht/src/outbound/serialize.rs index 5dc07daf54..1815756046 100644 --- a/comms/dht/src/outbound/serialize.rs +++ b/comms/dht/src/outbound/serialize.rs @@ -84,7 +84,6 @@ where S: Service mut dht_header, body, destination_peer, - comms_flags, .. } = message; @@ -115,12 +114,7 @@ where S: Service let body = Bytes::from(envelope.to_encoded_bytes().map_err(PipelineError::from_debug)?); next_service - .oneshot(OutboundMessage::with_tag( - message.tag, - destination_peer.node_id, - comms_flags, - body, - )) + .oneshot(OutboundMessage::with_tag(message.tag, destination_peer.node_id, body)) .await } } @@ -154,7 +148,6 @@ mod test { use futures::executor::block_on; use prost::Message; use tari_comms::{ - message::MessageFlags, net_address::MultiaddressesWithStats, peer_manager::{NodeId, Peer, PeerFeatures, PeerFlags}, types::CommsPublicKey, @@ -182,7 +175,6 @@ mod test { ), make_dht_header(&node_identity, &body, DhtMessageFlags::empty()), OutboundEncryption::None, - MessageFlags::empty(), body, ); block_on(serialize.call(msg)).unwrap(); diff --git a/comms/dht/src/test_utils/makers.rs b/comms/dht/src/test_utils/makers.rs index 650de26e4c..862ecb474e 100644 --- a/comms/dht/src/test_utils/makers.rs +++ b/comms/dht/src/test_utils/makers.rs @@ -27,7 +27,7 @@ use crate::{ use rand::rngs::OsRng; use std::sync::Arc; use tari_comms::{ - message::{InboundMessage, MessageEnvelopeHeader, MessageFlags}, + message::InboundMessage, multiaddr::Multiaddr, peer_manager::{NodeIdentity, Peer, PeerFeatures, PeerFlags, PeerManager}, types::CommsDatabase, @@ -72,7 +72,7 @@ pub fn make_client_identity() -> Arc { ) } -pub fn make_comms_inbound_message(node_identity: &NodeIdentity, message: Bytes, flags: MessageFlags) -> InboundMessage { +pub fn make_comms_inbound_message(node_identity: &NodeIdentity, message: Bytes) -> InboundMessage { InboundMessage::new( Arc::new(Peer::new( node_identity.public_key().clone(), @@ -82,11 +82,6 @@ pub fn make_comms_inbound_message(node_identity: &NodeIdentity, message: Bytes, PeerFeatures::COMMUNICATION_NODE, &[], )), - MessageEnvelopeHeader { - public_key: node_identity.public_key().clone(), - signature: Bytes::new(), - flags, - }, message, ) } diff --git a/comms/examples/tor.rs b/comms/examples/tor.rs index b7a9a8c5f6..3845b49ed5 100644 --- a/comms/examples/tor.rs +++ b/comms/examples/tor.rs @@ -112,7 +112,6 @@ async fn run() -> Result<(), Error> { outbound_tx1 .send(OutboundMessage::new( comms_node2.node_identity().node_id().clone(), - Default::default(), Bytes::from_static(b"START"), )) .await?; @@ -272,5 +271,5 @@ async fn start_ping_ponger( fn make_msg(node_id: &NodeId, msg: String) -> OutboundMessage { let msg = Bytes::copy_from_slice(msg.as_bytes()); - OutboundMessage::new(node_id.clone(), Default::default(), msg) + OutboundMessage::new(node_id.clone(), msg) } diff --git a/comms/src/builder/tests.rs b/comms/src/builder/tests.rs index 4778a6d677..65576e4a40 100644 --- a/comms/src/builder/tests.rs +++ b/comms/src/builder/tests.rs @@ -206,7 +206,6 @@ async fn peer_to_peer_messaging() { for i in 0..NUM_MSGS { let outbound_msg = OutboundMessage::new( node_identity2.node_id().clone(), - Default::default(), format!("#{:0>3} - comms messaging is so hot right now!", i).into(), ); outbound_tx1.send(outbound_msg).await.unwrap(); @@ -227,7 +226,6 @@ async fn peer_to_peer_messaging() { for i in 0..NUM_MSGS { let outbound_msg = OutboundMessage::new( node_identity1.node_id().clone(), - Default::default(), format!("#{:0>3} - comms messaging is so hot right now!", i).into(), ); outbound_tx2.send(outbound_msg).await.unwrap(); @@ -295,7 +293,6 @@ async fn peer_to_peer_messaging_simultaneous() { for i in 0..NUM_MSGS { let outbound_msg = OutboundMessage::new( node_identity2.node_id().clone(), - Default::default(), format!("#{:0>3} - comms messaging is so hot right now!", i).into(), ); outbound_tx1.send(outbound_msg).await.unwrap(); @@ -306,7 +303,6 @@ async fn peer_to_peer_messaging_simultaneous() { for i in 0..NUM_MSGS { let outbound_msg = OutboundMessage::new( node_identity1.node_id().clone(), - Default::default(), format!("#{:0>3} - comms messaging is so hot right now!", i).into(), ); outbound_tx2.send(outbound_msg).await.unwrap(); diff --git a/comms/src/consts.rs b/comms/src/consts.rs index 0af8fdc2cc..3773cc15c0 100644 --- a/comms/src/consts.rs +++ b/comms/src/consts.rs @@ -28,7 +28,3 @@ pub const PEER_MANAGER_MAX_FLOOD_PEERS: usize = 1000; /// The amount of time to consider a peer to be offline (i.e. dial to peer will fail without trying) after a failed /// connection attempt pub const PEER_OFFLINE_COOLDOWN_PERIOD: Duration = Duration::from_secs(60); - -/// The envelope version. This should be increased any time a change is made to the -/// envelope proto files. -pub const ENVELOPE_VERSION: u32 = 0; diff --git a/comms/src/message/envelope.rs b/comms/src/message/envelope.rs index 79b574d50c..ec1076eb3f 100644 --- a/comms/src/message/envelope.rs +++ b/comms/src/message/envelope.rs @@ -20,101 +20,11 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use super::{MessageError, MessageFlags}; -use crate::{ - consts::ENVELOPE_VERSION, - types::{CommsPublicKey, CommsSecretKey}, - utils::signature, -}; -use bytes::Bytes; -use rand::rngs::OsRng; -use std::convert::TryInto; -use tari_crypto::tari_utilities::{message_format::MessageFormat, ByteArray}; +use super::MessageError; // Re-export protos pub use crate::proto::envelope::*; -/// Represents data that every message contains. -/// As described in [RFC-0172](https://rfc.tari.com/RFC-0172_PeerToPeerMessagingProtocol.html#messaging-structure) -#[derive(Clone, Debug, PartialEq)] -pub struct MessageEnvelopeHeader { - pub public_key: CommsPublicKey, - pub signature: Bytes, - pub flags: MessageFlags, -} - -impl Envelope { - /// Sign a message, construct an Envelope with a Header - pub fn construct_signed( - secret_key: &CommsSecretKey, - public_key: &CommsPublicKey, - body: Bytes, - flags: MessageFlags, - ) -> Result - { - // Sign this body - let header_signature = { - let sig = - signature::sign(&mut OsRng, secret_key.clone(), &body).map_err(MessageError::SchnorrSignatureError)?; - sig.to_binary().map_err(MessageError::MessageFormatError) - }?; - - Ok(Envelope { - version: ENVELOPE_VERSION, - header: Some(EnvelopeHeader { - public_key: public_key.to_vec(), - signature: header_signature, - flags: flags.bits(), - }), - body: body.to_vec(), - }) - } - - /// Verify that the signature provided is valid for the given body - pub fn verify_signature(&self) -> Result { - match self - .header - .as_ref() - .map(|header| (header, header.get_comms_public_key())) - { - Some((header, Some(public_key))) => signature::verify(&public_key, &header.signature, &self.body), - _ => Ok(false), - } - } - - /// Returns true if the message contains a valid public key in the header, otherwise - /// false - pub fn is_valid(&self) -> bool { - self.get_public_key().is_some() - } - - /// Returns a valid public key from the header of this envelope, or None if the - /// public key is invalid - pub fn get_public_key(&self) -> Option { - self.header.as_ref().and_then(|header| header.get_comms_public_key()) - } -} - -impl EnvelopeHeader { - pub fn get_comms_public_key(&self) -> Option { - CommsPublicKey::from_bytes(&self.public_key).ok() - } -} - -impl TryInto for EnvelopeHeader { - type Error = MessageError; - - fn try_into(self) -> Result { - Ok(MessageEnvelopeHeader { - public_key: self - .get_comms_public_key() - .ok_or_else(|| MessageError::InvalidHeaderPublicKey)?, - signature: self.signature.into(), - flags: MessageFlags::from_bits_truncate(self.flags), - }) - } -} - /// Wraps a number of `prost::Message`s in a EnvelopeBody #[macro_export] macro_rules! wrap_in_envelope_body { @@ -185,42 +95,3 @@ impl EnvelopeBody { } } } - -#[cfg(test)] -mod test { - use super::*; - use crate::message::MessageFlags; - use rand::rngs::OsRng; - use tari_crypto::keys::PublicKey; - - #[test] - fn construct_signed() { - let (sk, pk) = CommsPublicKey::random_keypair(&mut OsRng); - let envelope = Envelope::construct_signed(&sk, &pk, Bytes::new(), MessageFlags::all()).unwrap(); - assert_eq!(envelope.get_public_key().unwrap(), pk); - assert!(envelope.verify_signature().unwrap()); - } - - #[test] - fn header_try_into() { - let header = EnvelopeHeader { - public_key: CommsPublicKey::default().to_vec(), - flags: MessageFlags::all().bits(), - signature: vec![1, 2, 3], - }; - - let msg_header: MessageEnvelopeHeader = header.try_into().unwrap(); - assert_eq!(msg_header.public_key, CommsPublicKey::default()); - assert_eq!(msg_header.flags, MessageFlags::all()); - assert_eq!(msg_header.signature, vec![1, 2, 3]); - } - - #[test] - fn is_valid() { - let (sk, pk) = CommsPublicKey::random_keypair(&mut OsRng); - let mut envelope = Envelope::construct_signed(&sk, &pk, Bytes::new(), MessageFlags::all()).unwrap(); - assert_eq!(envelope.is_valid(), true); - envelope.header = None; - assert_eq!(envelope.is_valid(), false); - } -} diff --git a/comms/src/message/inbound.rs b/comms/src/message/inbound.rs index 59216fe62c..e4c76f0041 100644 --- a/comms/src/message/inbound.rs +++ b/comms/src/message/inbound.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use super::{MessageEnvelopeHeader, MessageTag}; +use super::MessageTag; use crate::peer_manager::Peer; use bytes::Bytes; use std::sync::Arc; @@ -29,8 +29,6 @@ use std::sync::Arc; #[derive(Clone, Debug)] pub struct InboundMessage { pub tag: MessageTag, - /// The deserialized message envelope header - pub envelope_header: MessageEnvelopeHeader, /// The connected peer which sent this message pub source_peer: Arc, /// The raw message envelope @@ -39,11 +37,10 @@ pub struct InboundMessage { impl InboundMessage { /// Construct a new InboundMessage - pub fn new(source_peer: Arc, envelope_header: MessageEnvelopeHeader, body: Bytes) -> Self { + pub fn new(source_peer: Arc, body: Bytes) -> Self { Self { tag: MessageTag::new(), source_peer, - envelope_header, body, } } diff --git a/comms/src/message/mod.rs b/comms/src/message/mod.rs index d56383b369..edeff8a439 100644 --- a/comms/src/message/mod.rs +++ b/comms/src/message/mod.rs @@ -59,12 +59,10 @@ //! [MessageHeader]: ./message/struct.MessageHeader.html //! [MessageData]: ./message/struct.MessageData.html //! [DomainConnector]: ../domain_connector/struct.DomainConnector.html -use bitflags::*; -use serde::{Deserialize, Serialize}; #[macro_use] mod envelope; -pub use envelope::{Envelope, EnvelopeBody, EnvelopeHeader, MessageEnvelopeHeader}; +pub use envelope::EnvelopeBody; mod error; pub use error::MessageError; @@ -88,13 +86,3 @@ pub trait MessageExt: prost::Message { } } impl MessageExt for T {} - -bitflags! { - /// Used to indicate characteristics of the incoming or outgoing message, such - /// as whether the message is encrypted. - #[derive(Default, Deserialize, Serialize)] - pub struct MessageFlags: u32 { - const NONE = 0b0000_0000; - const ENCRYPTED = 0b0000_0001; - } -} diff --git a/comms/src/message/outbound.rs b/comms/src/message/outbound.rs index 0b74e6f864..feff75facd 100644 --- a/comms/src/message/outbound.rs +++ b/comms/src/message/outbound.rs @@ -20,10 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use crate::{ - message::{MessageFlags, MessageTag}, - peer_manager::NodeId, -}; +use crate::{message::MessageTag, peer_manager::NodeId}; use bytes::Bytes; use std::{ fmt, @@ -36,22 +33,20 @@ use std::{ pub struct OutboundMessage { pub tag: MessageTag, pub peer_node_id: NodeId, - pub flags: MessageFlags, pub body: Bytes, } impl OutboundMessage { /// Create a new OutboundMessage - pub fn new(peer_node_id: NodeId, flags: MessageFlags, body: Bytes) -> OutboundMessage { - Self::with_tag(MessageTag::new(), peer_node_id, flags, body) + pub fn new(peer_node_id: NodeId, body: Bytes) -> OutboundMessage { + Self::with_tag(MessageTag::new(), peer_node_id, body) } /// Create a new OutboundMessage with the specified MessageTag - pub fn with_tag(tag: MessageTag, peer_node_id: NodeId, flags: MessageFlags, body: Bytes) -> OutboundMessage { + pub fn with_tag(tag: MessageTag, peer_node_id: NodeId, body: Bytes) -> OutboundMessage { OutboundMessage { tag, peer_node_id, - flags, body, } } @@ -78,7 +73,7 @@ mod test { static TEST_MSG: Bytes = Bytes::from_static(b"The ghost brigades"); let node_id = NodeId::new(); let tag = MessageTag::new(); - let subject = OutboundMessage::with_tag(tag, node_id.clone(), MessageFlags::empty(), TEST_MSG.clone()); + let subject = OutboundMessage::with_tag(tag, node_id.clone(), TEST_MSG.clone()); assert_eq!(tag, subject.tag); assert_eq!(subject.body, TEST_MSG); assert_eq!(subject.peer_node_id, node_id); diff --git a/comms/src/pipeline/outbound.rs b/comms/src/pipeline/outbound.rs index 8dc316771f..14c2785be9 100644 --- a/comms/src/pipeline/outbound.rs +++ b/comms/src/pipeline/outbound.rs @@ -128,13 +128,8 @@ mod test { #[tokio_macros::test_basic] async fn run() { const NUM_ITEMS: usize = 10; - let items = (0..NUM_ITEMS).map(|i| { - OutboundMessage::new( - Default::default(), - Default::default(), - Bytes::copy_from_slice(&i.to_be_bytes()), - ) - }); + let items = + (0..NUM_ITEMS).map(|i| OutboundMessage::new(Default::default(), Bytes::copy_from_slice(&i.to_be_bytes()))); let stream = stream::iter(items).fuse(); let (out_tx, out_rx) = mpsc::channel(NUM_ITEMS); let (msg_tx, msg_rx) = mpsc::channel(NUM_ITEMS); diff --git a/comms/src/proto/envelope.proto b/comms/src/proto/envelope.proto index 56d37f31b7..ca8f91867e 100644 --- a/comms/src/proto/envelope.proto +++ b/comms/src/proto/envelope.proto @@ -2,20 +2,6 @@ syntax = "proto3"; package tari.comms.envelope; -/// Represents a message which is about to go on or has just come off the wire. -/// As described in [RFC-0172](https://rfc.tari.com/RFC-0172_PeerToPeerMessagingProtocol.html#messaging-structure) -message Envelope { - uint32 version = 1; - EnvelopeHeader header = 3; - bytes body = 4; -} - -message EnvelopeHeader { - bytes public_key = 1; - bytes signature = 2; - uint32 flags = 3; -} - // Parts contained within an Envelope. This is used to tell if an encrypted // message was successfully decrypted, by decrypting the envelope body and checking // if deserialization succeeds. diff --git a/comms/src/proto/tari.comms.envelope.rs b/comms/src/proto/tari.comms.envelope.rs index f445fabea7..e72ec33b17 100644 --- a/comms/src/proto/tari.comms.envelope.rs +++ b/comms/src/proto/tari.comms.envelope.rs @@ -1,23 +1,3 @@ -//// Represents a message which is about to go on or has just come off the wire. -//// As described in [RFC-0172](https://rfc.tari.com/RFC-0172_PeerToPeerMessagingProtocol.html#messaging-structure) -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Envelope { - #[prost(uint32, tag = "1")] - pub version: u32, - #[prost(message, optional, tag = "3")] - pub header: ::std::option::Option, - #[prost(bytes, tag = "4")] - pub body: std::vec::Vec, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct EnvelopeHeader { - #[prost(bytes, tag = "1")] - pub public_key: std::vec::Vec, - #[prost(bytes, tag = "2")] - pub signature: std::vec::Vec, - #[prost(uint32, tag = "3")] - pub flags: u32, -} /// Parts contained within an Envelope. This is used to tell if an encrypted /// message was successfully decrypted, by decrypting the envelope body and checking /// if deserialization succeeds. diff --git a/comms/src/protocol/messaging/error.rs b/comms/src/protocol/messaging/error.rs index f8b6d14829..42f5255655 100644 --- a/comms/src/protocol/messaging/error.rs +++ b/comms/src/protocol/messaging/error.rs @@ -31,12 +31,6 @@ use derive_error::Error; #[derive(Debug, Error)] pub enum InboundMessagingError { PeerManagerError(PeerManagerError), - /// Inbound message signatures are invalid - InvalidMessageSignature, - /// The received envelope is invalid - InvalidEnvelope, - /// The connected peer sent a public key which did not match the public key of the connected peer - PeerPublicKeyMismatch, /// Failed to decode message MessageDecodeError(prost::DecodeError), MessageError(MessageError), diff --git a/comms/src/protocol/messaging/inbound.rs b/comms/src/protocol/messaging/inbound.rs deleted file mode 100644 index b66d294ab6..0000000000 --- a/comms/src/protocol/messaging/inbound.rs +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2020, The Tari Project -// -// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the -// following conditions are met: -// -// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following -// disclaimer. -// -// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the -// following disclaimer in the documentation and/or other materials provided with the distribution. -// -// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote -// products derived from this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE -// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -use crate::{ - message::{Envelope, InboundMessage}, - peer_manager::Peer, - protocol::messaging::error::InboundMessagingError, -}; -use bytes::Bytes; -use log::*; -use prost::Message; -use std::{convert::TryInto, sync::Arc}; - -const LOG_TARGET: &str = "comms::protocol::messaging::inbound"; - -pub struct InboundMessaging; - -impl InboundMessaging { - /// Process a single received message from its raw serialized form i.e. a FrameSet - pub async fn process_message( - &self, - source_peer: Arc, - msg: &mut Bytes, - ) -> Result - { - let envelope = Envelope::decode(msg)?; - - let public_key = envelope - .get_public_key() - .ok_or_else(|| InboundMessagingError::InvalidEnvelope)?; - - trace!( - target: LOG_TARGET, - "Received message envelope version {} from peer '{}'", - envelope.version, - source_peer.node_id.short_str() - ); - - if source_peer.public_key != public_key { - return Err(InboundMessagingError::PeerPublicKeyMismatch); - } - - if !envelope.verify_signature()? { - return Err(InboundMessagingError::InvalidMessageSignature); - } - - // -- Message is authenticated -- - let Envelope { header, body, .. } = envelope; - let header = header.expect("already checked").try_into().expect("already checked"); - - let inbound_message = InboundMessage::new(source_peer, header, body.into()); - - Ok(inbound_message) - } -} diff --git a/comms/src/protocol/messaging/mod.rs b/comms/src/protocol/messaging/mod.rs index 0cb78ccfc0..af1f6d20a8 100644 --- a/comms/src/protocol/messaging/mod.rs +++ b/comms/src/protocol/messaging/mod.rs @@ -21,10 +21,9 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. mod error; -mod inbound; mod outbound; -mod protocol; +mod protocol; pub use protocol::{ MessagingEvent, MessagingEventReceiver, diff --git a/comms/src/protocol/messaging/outbound.rs b/comms/src/protocol/messaging/outbound.rs index c41d4134da..54f6a9ea18 100644 --- a/comms/src/protocol/messaging/outbound.rs +++ b/comms/src/protocol/messaging/outbound.rs @@ -23,11 +23,10 @@ use super::{error::MessagingProtocolError, MessagingEvent, MessagingProtocol, SendFailReason, MESSAGING_PROTOCOL}; use crate::{ connection_manager::{ConnectionManagerError, ConnectionManagerRequester, NegotiatedSubstream, PeerConnection}, - message::{Envelope, MessageExt, OutboundMessage}, + message::OutboundMessage, peer_manager::{NodeId, NodeIdentity}, types::CommsSubstream, }; -use bytes::Bytes; use futures::{channel::mpsc, SinkExt, StreamExt}; use log::*; use std::sync::Arc; @@ -137,57 +136,37 @@ impl OutboundMessaging { async fn start_forwarding_messages(mut self, substream: CommsSubstream) -> Result<(), MessagingProtocolError> { let mut framed = MessagingProtocol::framed(substream); while let Some(out_msg) = self.request_rx.next().await { - match self.to_envelope_bytes(&out_msg).await { - Ok(body) => { - trace!( - target: LOG_TARGET, - "Sending message ({} bytes) ({:?}) on outbound messaging substream", - body.len(), - out_msg.tag, - ); - if let Err(err) = framed.send(body).await { - debug!( - target: LOG_TARGET, - "[ThisNode={}] OutboundMessaging failed to send message to peer '{}' because '{}'", - self.node_identity.node_id().short_str(), - self.peer_node_id.short_str(), - err - ); - let _ = self - .messaging_events_tx - .send(MessagingEvent::SendMessageFailed( - out_msg, - SendFailReason::SubstreamSendFailed, - )) - .await; - // FATAL: Failed to send on the substream - self.flush_all_messages_to_failed_event(SendFailReason::SubstreamSendFailed) - .await; - return Err(MessagingProtocolError::OutboundSubstreamFailure); - } - - let _ = self - .messaging_events_tx - .send(MessagingEvent::MessageSent(out_msg.tag)) - .await; - }, - Err(err) => { - debug!( - target: LOG_TARGET, - "Failed to send message to peer '{}' because '{:?}'", - out_msg.peer_node_id.short_str(), - err - ); - - let _ = self - .messaging_events_tx - .send(MessagingEvent::SendMessageFailed( - out_msg, - SendFailReason::EnvelopeFailedToSerialize, - )) - .await; - }, + trace!( + target: LOG_TARGET, + "Sending message ({} bytes) ({:?}) on outbound messaging substream", + out_msg.body.len(), + out_msg.tag, + ); + if let Err(err) = framed.send(out_msg.body.clone()).await { + debug!( + target: LOG_TARGET, + "[ThisNode={}] OutboundMessaging failed to send message to peer '{}' because '{}'", + self.node_identity.node_id().short_str(), + self.peer_node_id.short_str(), + err + ); + let _ = self + .messaging_events_tx + .send(MessagingEvent::SendMessageFailed( + out_msg, + SendFailReason::SubstreamSendFailed, + )) + .await; + // FATAL: Failed to send on the substream + self.flush_all_messages_to_failed_event(SendFailReason::SubstreamSendFailed) + .await; + return Err(MessagingProtocolError::OutboundSubstreamFailure); } + + let _ = self + .messaging_events_tx + .send(MessagingEvent::MessageSent(out_msg.tag)) + .await; } Ok(()) @@ -204,31 +183,4 @@ impl OutboundMessaging { .await; } } - - async fn to_envelope_bytes(&self, out_msg: &OutboundMessage) -> Result { - let OutboundMessage { - flags, - body, - peer_node_id, - .. - } = out_msg; - - let envelope = Envelope::construct_signed( - self.node_identity.secret_key(), - self.node_identity.public_key(), - body.clone(), - *flags, - )?; - let body = envelope.to_encoded_bytes()?; - - trace!( - target: LOG_TARGET, - "[Node={}] Sending message ({} bytes) to peer '{}'", - self.node_identity.node_id().short_str(), - body.len(), - peer_node_id.short_str(), - ); - - Ok(body.into()) - } } diff --git a/comms/src/protocol/messaging/protocol.rs b/comms/src/protocol/messaging/protocol.rs index 0c3855869c..43fd21ec8f 100644 --- a/comms/src/protocol/messaging/protocol.rs +++ b/comms/src/protocol/messaging/protocol.rs @@ -26,11 +26,7 @@ use crate::{ connection_manager::{ConnectionManagerEvent, ConnectionManagerRequester}, message::{InboundMessage, MessageTag, OutboundMessage}, peer_manager::{NodeId, NodeIdentity, Peer, PeerManagerError}, - protocol::{ - messaging::{inbound::InboundMessaging, outbound::OutboundMessaging}, - ProtocolEvent, - ProtocolNotification, - }, + protocol::{messaging::outbound::OutboundMessaging, ProtocolEvent, ProtocolNotification}, runtime::current_executor, types::CommsSubstream, PeerManager, @@ -363,7 +359,6 @@ impl MessagingProtocol { let messaging_events_tx = self.messaging_events_tx.clone(); let mut inbound_message_tx = self.inbound_message_tx.clone(); let mut framed_substream = Self::framed(substream); - let inbound = InboundMessaging; self.executor.spawn(async move { while let Some(result) = framed_substream.next().await { @@ -376,42 +371,23 @@ impl MessagingProtocol { raw_msg.len() ); - let mut raw_msg = raw_msg.freeze(); - let (event, in_msg) = match inbound.process_message(Arc::clone(&peer), &mut raw_msg).await { - Ok(inbound_msg) => ( - MessagingEvent::MessageReceived( - Box::new(inbound_msg.source_peer.node_id.clone()), - inbound_msg.tag, - ), - Some(inbound_msg), - ), - Err(err) => { - // TODO: #banheuristic - warn!( - target: LOG_TARGET, - "Received invalid message from peer '{}' ({})", - peer.node_id.short_str(), - err - ); - ( - MessagingEvent::InvalidMessageReceived(Box::new(peer.node_id.clone())), - None, - ) - }, - }; - - if let Some(in_msg) = in_msg { - if let Err(err) = inbound_message_tx.send(in_msg).await { - warn!( - target: LOG_TARGET, - "Failed to send InboundMessage for peer '{}' because '{}'", - peer.node_id.short_str(), - err - ); - - if err.is_disconnected() { - break; - } + let inbound_msg = InboundMessage::new(Arc::clone(&peer), raw_msg.freeze()); + + let event = MessagingEvent::MessageReceived( + Box::new(inbound_msg.source_peer.node_id.clone()), + inbound_msg.tag, + ); + + if let Err(err) = inbound_message_tx.send(inbound_msg).await { + warn!( + target: LOG_TARGET, + "Failed to send InboundMessage for peer '{}' because '{}'", + peer.node_id.short_str(), + err + ); + + if err.is_disconnected() { + break; } } diff --git a/comms/src/protocol/messaging/test.rs b/comms/src/protocol/messaging/test.rs index 6d579c5680..7a4d711fef 100644 --- a/comms/src/protocol/messaging/test.rs +++ b/comms/src/protocol/messaging/test.rs @@ -28,10 +28,9 @@ use super::protocol::{ MESSAGING_PROTOCOL, }; use crate::{ - message::{InboundMessage, MessageExt, MessageFlags, MessageTag, OutboundMessage}, + message::{InboundMessage, MessageTag, OutboundMessage}, net_address::MultiaddressesWithStats, peer_manager::{NodeId, NodeIdentity, Peer, PeerFeatures, PeerFlags, PeerManager}, - proto::envelope::Envelope, protocol::{messaging::SendFailReason, ProtocolEvent, ProtocolNotification}, test_utils::{ mocks::{create_connection_manager_mock, create_peer_connection_mock_pair, ConnectionManagerMockState}, @@ -43,7 +42,6 @@ use crate::{ }; use bytes::Bytes; use futures::{channel::mpsc, SinkExt, StreamExt}; -use prost::Message; use rand::rngs::OsRng; use std::{sync::Arc, time::Duration}; use tari_crypto::keys::PublicKey; @@ -110,7 +108,7 @@ async fn new_inbound_substream_handling() { spawn_messaging_protocol().await; let expected_node_id = node_id::random(); - let (sk, pk) = CommsPublicKey::random_keypair(&mut OsRng); + let (_, pk) = CommsPublicKey::random_keypair(&mut OsRng); peer_manager .add_peer(Peer::new( pk.clone(), @@ -139,12 +137,7 @@ async fn new_inbound_substream_handling() { let stream_theirs = muxer_theirs.incoming_mut().next().await.unwrap(); let mut framed_theirs = MessagingProtocol::framed(stream_theirs); - let envelope = Envelope::construct_signed(&sk, &pk, TEST_MSG1, MessageFlags::empty()).unwrap(); - - framed_theirs - .send(Bytes::copy_from_slice(&envelope.to_encoded_bytes().unwrap())) - .await - .unwrap(); + framed_theirs.send(TEST_MSG1).await.unwrap(); let in_msg = time::timeout(Duration::from_secs(5), inbound_msg_rx.next()) .await @@ -177,15 +170,14 @@ async fn send_message_request() { conn_man_mock.add_active_connection(peer_node_id.clone(), conn1).await; // Send a message to node - let out_msg = OutboundMessage::new(peer_node_id, MessageFlags::NONE, TEST_MSG1); + let out_msg = OutboundMessage::new(peer_node_id, TEST_MSG1); request_tx.send(MessagingRequest::SendMessage(out_msg)).await.unwrap(); // Check that node got the message let stream = peer_conn_mock2.next_incoming_substream().await.unwrap(); let mut framed = MessagingProtocol::framed(stream); let msg = framed.next().await.unwrap().unwrap(); - let msg = Envelope::decode(msg).unwrap(); - assert_eq!(msg.body, TEST_MSG1); + assert_eq!(msg, TEST_MSG1); // Got the call to create a substream assert_eq!(peer_conn_mock1.call_count(), 1); @@ -196,7 +188,7 @@ async fn send_message_dial_failed() { let (_, _, conn_manager_mock, _, mut request_tx, _, mut event_tx, _shutdown) = spawn_messaging_protocol().await; let node_id = node_id::random(); - let out_msg = OutboundMessage::new(node_id, MessageFlags::NONE, TEST_MSG1); + let out_msg = OutboundMessage::new(node_id, TEST_MSG1); let expected_out_msg_tag = out_msg.tag; // Send a message to node 2 request_tx.send(MessagingRequest::SendMessage(out_msg)).await.unwrap(); @@ -228,7 +220,7 @@ async fn send_message_substream_bulk_failure() { .await; async fn send_msg(request_tx: &mut mpsc::Sender, node_id: NodeId) -> MessageTag { - let out_msg = OutboundMessage::new(node_id, MessageFlags::NONE, TEST_MSG1); + let out_msg = OutboundMessage::new(node_id, TEST_MSG1); let msg_tag = out_msg.tag; // Send a message to node 2 request_tx.send(MessagingRequest::SendMessage(out_msg)).await.unwrap(); @@ -275,7 +267,7 @@ async fn many_concurrent_send_message_requests() { // Send many messages to node let mut msg_tags = Vec::with_capacity(NUM_MSGS); for _ in 0..NUM_MSGS { - let out_msg = OutboundMessage::new(node_id2.clone(), MessageFlags::NONE, TEST_MSG1); + let out_msg = OutboundMessage::new(node_id2.clone(), TEST_MSG1); msg_tags.push(out_msg.tag); request_tx.send(MessagingRequest::SendMessage(out_msg)).await.unwrap(); } @@ -311,7 +303,7 @@ async fn many_concurrent_send_message_requests_that_fail() { // Send many messages to node let mut msg_tags = Vec::with_capacity(NUM_MSGS); for _ in 0..NUM_MSGS { - let out_msg = OutboundMessage::new(node_id2.clone(), MessageFlags::NONE, TEST_MSG1); + let out_msg = OutboundMessage::new(node_id2.clone(), TEST_MSG1); msg_tags.push(out_msg.tag); request_tx.send(MessagingRequest::SendMessage(out_msg)).await.unwrap(); }