From d426174b675f0113a56934806e898916ca1b8aef Mon Sep 17 00:00:00 2001 From: Hansie Odendaal Date: Mon, 18 May 2020 16:35:00 +0200 Subject: [PATCH] This change enables tracking messages across nodes during testnet, e.g. `msg tag` <> `TxID`. --- .../p2p/src/services/liveness/service.rs | 7 ++- base_layer/p2p/src/test_utils.rs | 8 ++- .../src/output_manager_service/service.rs | 4 +- .../wallet/src/transaction_service/service.rs | 60 +++++++++++-------- .../tests/support/comms_and_services.rs | 2 + comms/dht/src/dht.rs | 34 +++++++++-- comms/dht/src/envelope.rs | 7 ++- comms/dht/src/inbound/deserialize.rs | 20 +++++-- comms/dht/src/outbound/broadcast.rs | 1 + comms/dht/src/outbound/message.rs | 5 +- comms/dht/src/outbound/serialize.rs | 2 + comms/dht/src/proto/envelope.proto | 3 + comms/dht/src/proto/tari.dht.envelope.rs | 4 ++ .../dht/src/store_forward/saf_handler/task.rs | 13 +++- comms/dht/src/test_utils/makers.rs | 14 +++-- comms/src/message/tag.rs | 4 ++ comms/src/protocol/messaging/inbound.rs | 10 ++-- 17 files changed, 140 insertions(+), 58 deletions(-) diff --git a/base_layer/p2p/src/services/liveness/service.rs b/base_layer/p2p/src/services/liveness/service.rs index 8eac99e20f..b70e6c4527 100644 --- a/base_layer/p2p/src/services/liveness/service.rs +++ b/base_layer/p2p/src/services/liveness/service.rs @@ -367,7 +367,7 @@ where let excluded = self .neighbours .node_ids() - .into_iter() + .iter() .chain(vec![node_id]) .cloned() .collect(); @@ -441,7 +441,7 @@ where } async fn refresh_random_peer_pool(&mut self) -> Result<(), LivenessError> { - let excluded = self.neighbours.node_ids().into_iter().cloned().collect(); + let excluded = self.neighbours.node_ids().to_vec(); // Select a pool of random peers the same length as neighbouring peers let random_peers = self @@ -562,7 +562,7 @@ mod test { services::liveness::{handle::LivenessHandle, state::Metadata}, }; use futures::{channel::mpsc, stream, FutureExt}; - use rand::rngs::OsRng; + use rand::{rngs::OsRng, RngCore}; use std::time::Duration; use tari_comms::{ multiaddr::Multiaddr, @@ -694,6 +694,7 @@ mod test { message_type: DhtMessageType::None, network: Network::LocalTest, flags: Default::default(), + message_trace: OsRng.next_u64(), }, authenticated_origin: None, source_peer, diff --git a/base_layer/p2p/src/test_utils.rs b/base_layer/p2p/src/test_utils.rs index 97fbebe7e3..2038f4b772 100644 --- a/base_layer/p2p/src/test_utils.rs +++ b/base_layer/p2p/src/test_utils.rs @@ -67,7 +67,7 @@ pub fn make_node_identity() -> Arc { ) } -pub fn make_dht_header() -> DhtMessageHeader { +pub fn make_dht_header(trace: u64) -> DhtMessageHeader { DhtMessageHeader { version: 0, destination: NodeDestination::Unknown, @@ -76,13 +76,15 @@ pub fn make_dht_header() -> DhtMessageHeader { message_type: DhtMessageType::None, network: Network::LocalTest, flags: DhtMessageFlags::NONE, + message_trace: trace, } } pub fn make_dht_inbound_message(node_identity: &NodeIdentity, message: Vec) -> DhtInboundMessage { + let msg_tag = MessageTag::new(); DhtInboundMessage::new( - MessageTag::new(), - make_dht_header(), + msg_tag, + make_dht_header(msg_tag.value()), Arc::new(Peer::new( node_identity.public_key().clone(), node_identity.node_id().clone(), diff --git a/base_layer/wallet/src/output_manager_service/service.rs b/base_layer/wallet/src/output_manager_service/service.rs index 9d9736c7db..05a9fa6c42 100644 --- a/base_layer/wallet/src/output_manager_service/service.rs +++ b/base_layer/wallet/src/output_manager_service/service.rs @@ -189,8 +189,8 @@ where }, // Incoming messages from the Comms layer msg = base_node_response_stream.select_next_some() => { - trace!(target: LOG_TARGET, "Handling Base Node Response"); - let (origin_public_key, inner_msg) = msg.into_origin_and_inner(); + let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner(); + trace!(target: LOG_TARGET, "Handling Base Node Response, Trace: Tag#{}", msg.dht_header.message_trace); let result = self.handle_base_node_response(inner_msg).await.or_else(|resp| { error!(target: LOG_TARGET, "Error handling base node service response from {}: {:?}", origin_public_key, resp); Err(resp) diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index ef4c7fedfb..cccdcd000c 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -275,16 +275,18 @@ where }, // Incoming messages from the Comms layer msg = transaction_stream.select_next_some() => { - trace!(target: LOG_TARGET, "Handling Transaction Message"); - let (origin_public_key, inner_msg) = msg.into_origin_and_inner(); - let result = self.accept_transaction(origin_public_key, inner_msg).await; + let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner(); + trace!(target: LOG_TARGET, "Handling Transaction Message, Trace: Tag#{}", msg.dht_header.message_trace); + + let result = self.accept_transaction(origin_public_key, inner_msg, msg.dht_header.message_trace).await; + match result { Err(TransactionServiceError::RepeatedMessageError) => { - trace!(target: LOG_TARGET, "A repeated Transaction message was received"); + trace!(target: LOG_TARGET, "A repeated Transaction message was received, Trace: Tag#{}", msg.dht_header.message_trace); } Err(e) => { - error!(target: LOG_TARGET, "Failed to handle incoming Transaction message: {:?} for NodeID: {}", e, self.node_identity.node_id().short_str()); + error!(target: LOG_TARGET, "Failed to handle incoming Transaction message: {:?} for NodeID: {}, Trace: Tag#{}", e, self.node_identity.node_id().short_str(), msg.dht_header.message_trace); let _ = self.event_publisher.send(Arc::new(TransactionEvent::Error(format!("Error handling Transaction Sender message: {:?}", e).to_string()))); } _ => (), @@ -292,16 +294,16 @@ where }, // Incoming messages from the Comms layer msg = transaction_reply_stream.select_next_some() => { - trace!(target: LOG_TARGET, "Handling Transaction Reply Message"); - let (origin_public_key, inner_msg) = msg.into_origin_and_inner(); + let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner(); + trace!(target: LOG_TARGET, "Handling Transaction Reply Message, Trace: Tag#{}", msg.dht_header.message_trace); let result = self.accept_recipient_reply(origin_public_key, inner_msg).await; match result { Err(TransactionServiceError::TransactionDoesNotExistError) => { - debug!(target: LOG_TARGET, "Unable to handle incoming Transaction Reply message from NodeId: {} due to Transaction not Existing. This usually means the message was a repeated message from Store and Forward", self.node_identity.node_id().short_str()); + debug!(target: LOG_TARGET, "Unable to handle incoming Transaction Reply message from NodeId: {} due to Transaction not Existing. This usually means the message was a repeated message from Store and Forward, Trace: Tag#{}", self.node_identity.node_id().short_str(), msg.dht_header.message_trace); }, Err(e) => { - error!(target: LOG_TARGET, "Failed to handle incoming Transaction Reply message: {:?} for NodeId: {}", e, self.node_identity.node_id().short_str()); + error!(target: LOG_TARGET, "Failed to handle incoming Transaction Reply message: {:?} for NodeId: {}, Trace: Tag#{}", e, self.node_identity.node_id().short_str(), msg.dht_header.message_trace); let _ = self.event_publisher.send(Arc::new(TransactionEvent::Error("Error handling Transaction Recipient Reply message".to_string()))); }, Ok(_) => (), @@ -309,10 +311,10 @@ where }, // Incoming messages from the Comms layer msg = transaction_finalized_stream.select_next_some() => { - trace!(target: LOG_TARGET, "Handling Transaction Finalized Message"); - let (origin_public_key, inner_msg) = msg.into_origin_and_inner(); + let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner(); + trace!(target: LOG_TARGET, "Handling Transaction Finalized Message, Trace: Tag#{}", msg.dht_header.message_trace); let result = self.accept_finalized_transaction(origin_public_key, inner_msg, &mut transaction_broadcast_protocol_handles).await.or_else(|err| { - error!(target: LOG_TARGET, "Failed to handle incoming Transaction Finalized message: {:?} for NodeID: {}", err , self.node_identity.node_id().short_str()); + error!(target: LOG_TARGET, "Failed to handle incoming Transaction Finalized message: {:?} for NodeID: {}, Trace: Tag#{}", err , self.node_identity.node_id().short_str(), msg.dht_header.message_trace); Err(err) }); @@ -322,19 +324,19 @@ where }, // Incoming messages from the Comms layer msg = mempool_response_stream.select_next_some() => { - trace!(target: LOG_TARGET, "Handling Mempool Response"); - let (origin_public_key, inner_msg) = msg.into_origin_and_inner(); + let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner(); + trace!(target: LOG_TARGET, "Handling Mempool Response, Trace: Tag#{}", msg.dht_header.message_trace); let _ = self.handle_mempool_response(inner_msg).await.or_else(|resp| { - error!(target: LOG_TARGET, "Error handling mempool service response: {:?}", resp); + error!(target: LOG_TARGET, "Error handling mempool service response: {:?}, Trace: Tag#{}", resp, msg.dht_header.message_trace); Err(resp) }); } // Incoming messages from the Comms layer msg = base_node_response_stream.select_next_some() => { - trace!(target: LOG_TARGET, "Handling Base Node Response"); - let (origin_public_key, inner_msg) = msg.into_origin_and_inner(); + let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner(); + trace!(target: LOG_TARGET, "Handling Base Node Response, Trace: Tag#{}", msg.dht_header.message_trace); let _ = self.handle_base_node_response(inner_msg).await.or_else(|resp| { - error!(target: LOG_TARGET, "Error handling base node service response from {}: {:?} for NodeID: {}", origin_public_key, resp, self.node_identity.node_id().short_str()); + error!(target: LOG_TARGET, "Error handling base node service response from {}: {:?} for NodeID: {}, Trace: Tag#{}", origin_public_key, resp, self.node_identity.node_id().short_str(), msg.dht_header.message_trace); Err(resp) }); } @@ -700,6 +702,7 @@ where &mut self, source_pubkey: CommsPublicKey, sender_message: proto::TransactionSenderMessage, + message_trace: u64, ) -> Result<(), TransactionServiceError> { let sender_message: TransactionSenderMessage = sender_message @@ -710,17 +713,19 @@ where if let TransactionSenderMessage::Single(data) = sender_message.clone() { trace!( target: LOG_TARGET, - "Transaction (TxId: {}) received from {}", + "Transaction (TxId: {}) received from {}, Trace: Tag#{}", data.tx_id, - source_pubkey + source_pubkey, + message_trace ); // Check this is not a repeat message i.e. tx_id doesn't already exist in our pending or completed // transactions if self.db.transaction_exists(data.tx_id).await? { trace!( target: LOG_TARGET, - "Transaction (TxId: {}) already present in database.", - data.tx_id + "Transaction (TxId: {}) already present in database, Trace: Tag#{}.", + data.tx_id, + message_trace ); return Err(TransactionServiceError::RepeatedMessageError); } @@ -777,11 +782,18 @@ where info!( target: LOG_TARGET, - "Transaction with TX_ID = {} received from {}. Reply Sent", tx_id, source_pubkey, + "Transaction with TX_ID = {} received from {}, Trace: Tag#{}. Reply Sent", + tx_id, + source_pubkey, + message_trace ); info!( target: LOG_TARGET, - "Transaction (TX_ID: {}) - Amount: {} - Message: {}", tx_id, amount, data.message + "Transaction (TX_ID: {}) - Amount: {} - Message: {}, Trace: Tag#{}", + tx_id, + amount, + data.message, + message_trace ); let _ = self diff --git a/base_layer/wallet/tests/support/comms_and_services.rs b/base_layer/wallet/tests/support/comms_and_services.rs index 13a6afd2da..77e6240925 100644 --- a/base_layer/wallet/tests/support/comms_and_services.rs +++ b/base_layer/wallet/tests/support/comms_and_services.rs @@ -21,6 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use futures::Sink; +use rand::{rngs::OsRng, RngCore}; use std::{error::Error, sync::Arc, time::Duration}; use tari_comms::{ multiaddr::Multiaddr, @@ -84,6 +85,7 @@ pub fn create_dummy_message(inner: T, public_key: &CommsPublicKey) -> DomainM flags: Default::default(), network: Network::LocalTest, destination: Default::default(), + message_trace: OsRng.next_u64(), }, authenticated_origin: None, source_peer: peer_source, diff --git a/comms/dht/src/dht.rs b/comms/dht/src/dht.rs index 656c58a0dd..131ee84fa7 100644 --- a/comms/dht/src/dht.rs +++ b/comms/dht/src/dht.rs @@ -342,6 +342,7 @@ mod test { DhtBuilder, }; use futures::{channel::mpsc, StreamExt}; + use rand::{rngs::OsRng, RngCore}; use std::{sync::Arc, time::Duration}; use tari_comms::{ message::MessageExt, @@ -380,7 +381,13 @@ mod test { let mut service = dht.inbound_middleware_layer().layer(SinkService::new(out_tx)); let msg = wrap_in_envelope_body!(b"secret".to_vec()); - let dht_envelope = make_dht_envelope(&node_identity, msg.to_encoded_bytes(), DhtMessageFlags::empty(), false); + let dht_envelope = make_dht_envelope( + &node_identity, + msg.to_encoded_bytes(), + DhtMessageFlags::empty(), + false, + OsRng.next_u64(), + ); let inbound_message = make_comms_inbound_message(&node_identity, dht_envelope.to_encoded_bytes().into()); let msg = { @@ -422,7 +429,13 @@ mod test { let msg = wrap_in_envelope_body!(b"secret".to_vec()); // Encrypt for self - let dht_envelope = make_dht_envelope(&node_identity, msg.to_encoded_bytes(), DhtMessageFlags::ENCRYPTED, true); + let dht_envelope = make_dht_envelope( + &node_identity, + msg.to_encoded_bytes(), + DhtMessageFlags::ENCRYPTED, + true, + OsRng.next_u64(), + ); let inbound_message = make_comms_inbound_message(&node_identity, dht_envelope.to_encoded_bytes().into()); let msg = { @@ -470,7 +483,13 @@ mod test { let node_identity2 = make_node_identity(); let ecdh_key = crypt::generate_ecdh_secret(node_identity2.secret_key(), node_identity2.public_key()); let encrypted_bytes = crypt::encrypt(&ecdh_key, &msg.to_encoded_bytes()).unwrap(); - let dht_envelope = make_dht_envelope(&node_identity, encrypted_bytes, DhtMessageFlags::ENCRYPTED, true); + let dht_envelope = make_dht_envelope( + &node_identity, + encrypted_bytes, + DhtMessageFlags::ENCRYPTED, + true, + OsRng.next_u64(), + ); let origin_mac = dht_envelope.header.as_ref().unwrap().origin_mac.clone(); assert_eq!(origin_mac.is_empty(), false); @@ -514,8 +533,13 @@ mod test { let mut service = dht.inbound_middleware_layer().layer(SinkService::new(next_service_tx)); let msg = wrap_in_envelope_body!(b"secret".to_vec()); - let mut dht_envelope = - make_dht_envelope(&node_identity, msg.to_encoded_bytes(), DhtMessageFlags::empty(), false); + let mut dht_envelope = make_dht_envelope( + &node_identity, + msg.to_encoded_bytes(), + DhtMessageFlags::empty(), + false, + OsRng.next_u64(), + ); dht_envelope.header.as_mut().and_then(|header| { header.message_type = DhtMessageType::SafStoredMessages as i32; Some(header) diff --git a/comms/dht/src/envelope.rs b/comms/dht/src/envelope.rs index 56aa246d1e..94919f4321 100644 --- a/comms/dht/src/envelope.rs +++ b/comms/dht/src/envelope.rs @@ -118,6 +118,7 @@ pub struct DhtMessageHeader { pub message_type: DhtMessageType, pub network: Network, pub flags: DhtMessageFlags, + pub message_trace: u64, } impl DhtMessageHeader { @@ -134,8 +135,8 @@ impl Display for DhtMessageHeader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { write!( f, - "DhtMessageHeader (Dest:{}, Type:{:?}, Network:{:?}, Flags:{:?})", - self.destination, self.message_type, self.network, self.flags + "DhtMessageHeader (Dest:{}, Type:{:?}, Network:{:?}, Flags:{:?}, Trace:Tag#{:?})", + self.destination, self.message_type, self.network, self.flags, self.message_trace ) } } @@ -169,6 +170,7 @@ impl TryFrom for DhtMessageHeader { .ok_or_else(|| DhtMessageError::InvalidMessageType)?, network: Network::from_i32(header.network).ok_or_else(|| DhtMessageError::InvalidNetwork)?, flags: DhtMessageFlags::from_bits(header.flags).ok_or_else(|| DhtMessageError::InvalidMessageFlags)?, + message_trace: header.message_trace, }) } } @@ -198,6 +200,7 @@ impl From for DhtHeader { message_type: header.message_type as i32, network: header.network as i32, flags: header.flags.bits(), + message_trace: header.message_trace, } } } diff --git a/comms/dht/src/inbound/deserialize.rs b/comms/dht/src/inbound/deserialize.rs index 67390a9c01..409d22c4ef 100644 --- a/comms/dht/src/inbound/deserialize.rs +++ b/comms/dht/src/inbound/deserialize.rs @@ -78,17 +78,18 @@ where S: Service + Clon match DhtEnvelope::decode(&mut body) { Ok(dht_envelope) => { - debug!( - target: LOG_TARGET, - "Deserialization succeeded. Passing message {} onto next service", tag - ); - let inbound_msg = DhtInboundMessage::new( tag, dht_envelope.header.try_into().map_err(PipelineError::from_debug)?, source_peer, dht_envelope.body, ); + debug!( + target: LOG_TARGET, + "Deserialization succeeded. Passing message {} onto next service (Trace: Tag#{})", + tag, + inbound_msg.dht_header.message_trace + ); next_service.oneshot(inbound_msg).await }, @@ -126,6 +127,7 @@ mod test { test_utils::{make_comms_inbound_message, make_dht_envelope, make_node_identity, service_spy}, }; use futures::executor::block_on; + use rand::{rngs::OsRng, RngCore}; use tari_comms::message::MessageExt; use tari_test_utils::panic_context; @@ -138,7 +140,13 @@ mod test { assert!(deserialize.poll_ready(&mut cx).is_ready()); let node_identity = make_node_identity(); - let dht_envelope = make_dht_envelope(&node_identity, b"A".to_vec(), DhtMessageFlags::empty(), false); + let dht_envelope = make_dht_envelope( + &node_identity, + b"A".to_vec(), + DhtMessageFlags::empty(), + false, + OsRng.next_u64(), + ); block_on(deserialize.call(make_comms_inbound_message( &node_identity, dht_envelope.to_encoded_bytes().into(), diff --git a/comms/dht/src/outbound/broadcast.rs b/comms/dht/src/outbound/broadcast.rs index f028781737..1cc0cb8aac 100644 --- a/comms/dht/src/outbound/broadcast.rs +++ b/comms/dht/src/outbound/broadcast.rs @@ -438,6 +438,7 @@ where S: Service ephemeral_public_key: ephemeral_public_key.clone(), origin_mac: origin_mac.clone(), is_broadcast, + message_trace: tag.value(), }, send_state, ) diff --git a/comms/dht/src/outbound/message.rs b/comms/dht/src/outbound/message.rs index a41f25ee17..56b5348d5f 100644 --- a/comms/dht/src/outbound/message.rs +++ b/comms/dht/src/outbound/message.rs @@ -179,6 +179,7 @@ pub struct DhtOutboundMessage { pub network: Network, pub dht_flags: DhtMessageFlags, pub is_broadcast: bool, + pub message_trace: u64, } impl fmt::Display for DhtOutboundMessage { @@ -189,8 +190,8 @@ impl fmt::Display for DhtOutboundMessage { .map(|h| format!("{} (Propagated)", h)) .unwrap_or_else(|| { format!( - "Network: {:?}, Flags: {:?}, Destination: {}", - self.network, self.dht_flags, self.destination + "Network: {:?}, Flags: {:?}, Destination: {}, Trace: Tag#{}", + self.network, self.dht_flags, self.destination, self.message_trace ) }); write!( diff --git a/comms/dht/src/outbound/serialize.rs b/comms/dht/src/outbound/serialize.rs index eaf5490bbd..cfe755aabb 100644 --- a/comms/dht/src/outbound/serialize.rs +++ b/comms/dht/src/outbound/serialize.rs @@ -89,6 +89,7 @@ where S: Service + Clone network: network as i32, flags: dht_flags.bits(), destination: Some(destination.into()), + message_trace: tag.value(), }); let envelope = DhtEnvelope::new(dht_header, body); @@ -106,6 +107,7 @@ where S: Service + Clone } } +#[derive(Default)] pub struct SerializeLayer; impl SerializeLayer { diff --git a/comms/dht/src/proto/envelope.proto b/comms/dht/src/proto/envelope.proto index d2a16d77dc..67b37bd91a 100644 --- a/comms/dht/src/proto/envelope.proto +++ b/comms/dht/src/proto/envelope.proto @@ -42,6 +42,9 @@ message DhtHeader { // The network for which this message is intended (e.g. TestNet, MainNet etc.) Network network = 8; uint32 flags = 9; + // Message trace ID + // TODO: Remove for mainnet or when testing message traces is not required + uint64 message_trace = 10; } enum Network { diff --git a/comms/dht/src/proto/tari.dht.envelope.rs b/comms/dht/src/proto/tari.dht.envelope.rs index b9dda194e5..e5c794e22a 100644 --- a/comms/dht/src/proto/tari.dht.envelope.rs +++ b/comms/dht/src/proto/tari.dht.envelope.rs @@ -19,6 +19,10 @@ pub struct DhtHeader { pub network: i32, #[prost(uint32, tag = "9")] pub flags: u32, + /// Message trace ID + /// TODO: Remove for mainnet or when testing message traces is not required + #[prost(uint64, tag = "10")] + pub message_trace: u64, #[prost(oneof = "dht_header::Destination", tags = "2, 3, 4")] pub destination: ::std::option::Option, } diff --git a/comms/dht/src/store_forward/saf_handler/task.rs b/comms/dht/src/store_forward/saf_handler/task.rs index 487939e62c..c26cba5528 100644 --- a/comms/dht/src/store_forward/saf_handler/task.rs +++ b/comms/dht/src/store_forward/saf_handler/task.rs @@ -275,7 +275,7 @@ where S: Service SafResponseType::from_i32(response.response_type) .as_ref() .map(|t| format!("{:?}", t)) - .unwrap_or("".to_string()), + .unwrap_or_else(|| "".to_string()), ); let tasks = response @@ -539,6 +539,7 @@ mod test { use chrono::Utc; use futures::channel::mpsc; use prost::Message; + use rand::{rngs::OsRng, RngCore}; use tari_comms::{message::MessageExt, wrap_in_envelope_body}; use tari_utilities::hex::Hex; use tokio::runtime::Handle; @@ -574,7 +575,15 @@ mod test { // Recent message let (e_sk, e_pk) = make_keypair(); - let dht_header = make_dht_header(&node_identity, &e_pk, &e_sk, &[], DhtMessageFlags::empty(), false); + let dht_header = make_dht_header( + &node_identity, + &e_pk, + &e_sk, + &[], + DhtMessageFlags::empty(), + false, + OsRng.next_u64(), + ); mock_state .add_message(make_stored_message(&node_identity, dht_header)) .await; diff --git a/comms/dht/src/test_utils/makers.rs b/comms/dht/src/test_utils/makers.rs index c49e154725..951e409606 100644 --- a/comms/dht/src/test_utils/makers.rs +++ b/comms/dht/src/test_utils/makers.rs @@ -85,6 +85,7 @@ pub fn make_dht_header( message: &[u8], flags: DhtMessageFlags, include_origin: bool, + trace: u64, ) -> DhtMessageHeader { DhtMessageHeader { @@ -99,6 +100,7 @@ pub fn make_dht_header( message_type: DhtMessageType::None, network: Network::LocalTest, flags, + message_trace: trace, } } @@ -132,9 +134,10 @@ pub fn make_dht_inbound_message( include_origin: bool, ) -> DhtInboundMessage { - let envelope = make_dht_envelope(node_identity, body, flags, include_origin); + let msg_tag = MessageTag::new(); + let envelope = make_dht_envelope(node_identity, body, flags, include_origin, msg_tag.value()); DhtInboundMessage::new( - MessageTag::new(), + msg_tag, envelope.header.unwrap().try_into().unwrap(), Arc::new(Peer::new( node_identity.public_key().clone(), @@ -157,6 +160,7 @@ pub fn make_dht_envelope( mut message: Vec, flags: DhtMessageFlags, include_origin: bool, + trace: u64, ) -> DhtEnvelope { let (e_sk, e_pk) = make_keypair(); @@ -164,7 +168,7 @@ pub fn make_dht_envelope( let shared_secret = crypt::generate_ecdh_secret(&e_sk, node_identity.public_key()); message = crypt::encrypt(&shared_secret, &message).unwrap(); } - let header = make_dht_header(node_identity, &e_pk, &e_sk, &message, flags, include_origin).into(); + let header = make_dht_header(node_identity, &e_pk, &e_sk, &message, flags, include_origin, trace).into(); DhtEnvelope::new(header, message.into()) } @@ -187,8 +191,9 @@ pub fn make_peer_manager() -> Arc { } pub fn create_outbound_message(body: &[u8]) -> DhtOutboundMessage { + let msg_tag = MessageTag::new(); DhtOutboundMessage { - tag: MessageTag::new(), + tag: msg_tag, destination_peer: Arc::new(Peer::new( CommsPublicKey::default(), NodeId::default(), @@ -207,5 +212,6 @@ pub fn create_outbound_message(body: &[u8]) -> DhtOutboundMessage { reply_tx: WrappedReplyTx::none(), origin_mac: None, is_broadcast: false, + message_trace: msg_tag.value(), } } diff --git a/comms/src/message/tag.rs b/comms/src/message/tag.rs index 42c64e57b1..dd0cec4bb2 100644 --- a/comms/src/message/tag.rs +++ b/comms/src/message/tag.rs @@ -31,6 +31,10 @@ impl MessageTag { pub fn new() -> Self { Self(OsRng.next_u64()) } + + pub fn value(self) -> u64 { + self.0 + } } impl fmt::Display for MessageTag { diff --git a/comms/src/protocol/messaging/inbound.rs b/comms/src/protocol/messaging/inbound.rs index adac9e894b..69d4a72b70 100644 --- a/comms/src/protocol/messaging/inbound.rs +++ b/comms/src/protocol/messaging/inbound.rs @@ -59,21 +59,21 @@ impl InboundMessaging { while let Some(result) = framed_socket.next().await { match result { Ok(raw_msg) => { + let inbound_msg = InboundMessage::new(Arc::clone(&peer), raw_msg.clone().freeze()); trace!( target: LOG_TARGET, - "Received message from peer '{}' ({} bytes)", + "Received message {} from peer '{}' ({} bytes)", + inbound_msg.clone().tag, peer.node_id.short_str(), raw_msg.len() ); - let inbound_msg = InboundMessage::new(Arc::clone(&peer), raw_msg.freeze()); - let event = MessagingEvent::MessageReceived( Box::new(inbound_msg.source_peer.node_id.clone()), - inbound_msg.tag, + inbound_msg.clone().tag, ); - if let Err(err) = self.inbound_message_tx.send(inbound_msg).await { + if let Err(err) = self.inbound_message_tx.send(inbound_msg.clone()).await { warn!( target: LOG_TARGET, "Failed to send InboundMessage for peer '{}' because '{}'",