Skip to content

Commit

Permalink
This change enables tracking messages across nodes during testnet, e.…
Browse files Browse the repository at this point in the history
…g. `msg tag` <> `TxID`.
  • Loading branch information
hansieodendaal committed May 19, 2020
1 parent 824dfe0 commit d426174
Show file tree
Hide file tree
Showing 17 changed files with 140 additions and 58 deletions.
7 changes: 4 additions & 3 deletions base_layer/p2p/src/services/liveness/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ where
let excluded = self
.neighbours
.node_ids()
.into_iter()
.iter()
.chain(vec![node_id])
.cloned()
.collect();
Expand Down Expand Up @@ -441,7 +441,7 @@ where
}

async fn refresh_random_peer_pool(&mut self) -> Result<(), LivenessError> {
let excluded = self.neighbours.node_ids().into_iter().cloned().collect();
let excluded = self.neighbours.node_ids().to_vec();

// Select a pool of random peers the same length as neighbouring peers
let random_peers = self
Expand Down Expand Up @@ -562,7 +562,7 @@ mod test {
services::liveness::{handle::LivenessHandle, state::Metadata},
};
use futures::{channel::mpsc, stream, FutureExt};
use rand::rngs::OsRng;
use rand::{rngs::OsRng, RngCore};
use std::time::Duration;
use tari_comms::{
multiaddr::Multiaddr,
Expand Down Expand Up @@ -694,6 +694,7 @@ mod test {
message_type: DhtMessageType::None,
network: Network::LocalTest,
flags: Default::default(),
message_trace: OsRng.next_u64(),
},
authenticated_origin: None,
source_peer,
Expand Down
8 changes: 5 additions & 3 deletions base_layer/p2p/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub fn make_node_identity() -> Arc<NodeIdentity> {
)
}

pub fn make_dht_header() -> DhtMessageHeader {
pub fn make_dht_header(trace: u64) -> DhtMessageHeader {
DhtMessageHeader {
version: 0,
destination: NodeDestination::Unknown,
Expand All @@ -76,13 +76,15 @@ pub fn make_dht_header() -> DhtMessageHeader {
message_type: DhtMessageType::None,
network: Network::LocalTest,
flags: DhtMessageFlags::NONE,
message_trace: trace,
}
}

pub fn make_dht_inbound_message(node_identity: &NodeIdentity, message: Vec<u8>) -> DhtInboundMessage {
let msg_tag = MessageTag::new();
DhtInboundMessage::new(
MessageTag::new(),
make_dht_header(),
msg_tag,
make_dht_header(msg_tag.value()),
Arc::new(Peer::new(
node_identity.public_key().clone(),
node_identity.node_id().clone(),
Expand Down
4 changes: 2 additions & 2 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ where
},
// Incoming messages from the Comms layer
msg = base_node_response_stream.select_next_some() => {
trace!(target: LOG_TARGET, "Handling Base Node Response");
let (origin_public_key, inner_msg) = msg.into_origin_and_inner();
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Base Node Response, Trace: Tag#{}", msg.dht_header.message_trace);
let result = self.handle_base_node_response(inner_msg).await.or_else(|resp| {
error!(target: LOG_TARGET, "Error handling base node service response from {}: {:?}", origin_public_key, resp);
Err(resp)
Expand Down
60 changes: 36 additions & 24 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,44 +275,46 @@ where
},
// Incoming messages from the Comms layer
msg = transaction_stream.select_next_some() => {
trace!(target: LOG_TARGET, "Handling Transaction Message");
let (origin_public_key, inner_msg) = msg.into_origin_and_inner();
let result = self.accept_transaction(origin_public_key, inner_msg).await;
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Transaction Message, Trace: Tag#{}", msg.dht_header.message_trace);

let result = self.accept_transaction(origin_public_key, inner_msg, msg.dht_header.message_trace).await;


match result {
Err(TransactionServiceError::RepeatedMessageError) => {
trace!(target: LOG_TARGET, "A repeated Transaction message was received");
trace!(target: LOG_TARGET, "A repeated Transaction message was received, Trace: Tag#{}", msg.dht_header.message_trace);
}
Err(e) => {
error!(target: LOG_TARGET, "Failed to handle incoming Transaction message: {:?} for NodeID: {}", e, self.node_identity.node_id().short_str());
error!(target: LOG_TARGET, "Failed to handle incoming Transaction message: {:?} for NodeID: {}, Trace: Tag#{}", e, self.node_identity.node_id().short_str(), msg.dht_header.message_trace);
let _ = self.event_publisher.send(Arc::new(TransactionEvent::Error(format!("Error handling Transaction Sender message: {:?}", e).to_string())));
}
_ => (),
}
},
// Incoming messages from the Comms layer
msg = transaction_reply_stream.select_next_some() => {
trace!(target: LOG_TARGET, "Handling Transaction Reply Message");
let (origin_public_key, inner_msg) = msg.into_origin_and_inner();
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Transaction Reply Message, Trace: Tag#{}", msg.dht_header.message_trace);
let result = self.accept_recipient_reply(origin_public_key, inner_msg).await;

match result {
Err(TransactionServiceError::TransactionDoesNotExistError) => {
debug!(target: LOG_TARGET, "Unable to handle incoming Transaction Reply message from NodeId: {} due to Transaction not Existing. This usually means the message was a repeated message from Store and Forward", self.node_identity.node_id().short_str());
debug!(target: LOG_TARGET, "Unable to handle incoming Transaction Reply message from NodeId: {} due to Transaction not Existing. This usually means the message was a repeated message from Store and Forward, Trace: Tag#{}", self.node_identity.node_id().short_str(), msg.dht_header.message_trace);
},
Err(e) => {
error!(target: LOG_TARGET, "Failed to handle incoming Transaction Reply message: {:?} for NodeId: {}", e, self.node_identity.node_id().short_str());
error!(target: LOG_TARGET, "Failed to handle incoming Transaction Reply message: {:?} for NodeId: {}, Trace: Tag#{}", e, self.node_identity.node_id().short_str(), msg.dht_header.message_trace);
let _ = self.event_publisher.send(Arc::new(TransactionEvent::Error("Error handling Transaction Recipient Reply message".to_string())));
},
Ok(_) => (),
}
},
// Incoming messages from the Comms layer
msg = transaction_finalized_stream.select_next_some() => {
trace!(target: LOG_TARGET, "Handling Transaction Finalized Message");
let (origin_public_key, inner_msg) = msg.into_origin_and_inner();
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Transaction Finalized Message, Trace: Tag#{}", msg.dht_header.message_trace);
let result = self.accept_finalized_transaction(origin_public_key, inner_msg, &mut transaction_broadcast_protocol_handles).await.or_else(|err| {
error!(target: LOG_TARGET, "Failed to handle incoming Transaction Finalized message: {:?} for NodeID: {}", err , self.node_identity.node_id().short_str());
error!(target: LOG_TARGET, "Failed to handle incoming Transaction Finalized message: {:?} for NodeID: {}, Trace: Tag#{}", err , self.node_identity.node_id().short_str(), msg.dht_header.message_trace);
Err(err)
});

Expand All @@ -322,19 +324,19 @@ where
},
// Incoming messages from the Comms layer
msg = mempool_response_stream.select_next_some() => {
trace!(target: LOG_TARGET, "Handling Mempool Response");
let (origin_public_key, inner_msg) = msg.into_origin_and_inner();
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Mempool Response, Trace: Tag#{}", msg.dht_header.message_trace);
let _ = self.handle_mempool_response(inner_msg).await.or_else(|resp| {
error!(target: LOG_TARGET, "Error handling mempool service response: {:?}", resp);
error!(target: LOG_TARGET, "Error handling mempool service response: {:?}, Trace: Tag#{}", resp, msg.dht_header.message_trace);
Err(resp)
});
}
// Incoming messages from the Comms layer
msg = base_node_response_stream.select_next_some() => {
trace!(target: LOG_TARGET, "Handling Base Node Response");
let (origin_public_key, inner_msg) = msg.into_origin_and_inner();
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Base Node Response, Trace: Tag#{}", msg.dht_header.message_trace);
let _ = self.handle_base_node_response(inner_msg).await.or_else(|resp| {
error!(target: LOG_TARGET, "Error handling base node service response from {}: {:?} for NodeID: {}", origin_public_key, resp, self.node_identity.node_id().short_str());
error!(target: LOG_TARGET, "Error handling base node service response from {}: {:?} for NodeID: {}, Trace: Tag#{}", origin_public_key, resp, self.node_identity.node_id().short_str(), msg.dht_header.message_trace);
Err(resp)
});
}
Expand Down Expand Up @@ -700,6 +702,7 @@ where
&mut self,
source_pubkey: CommsPublicKey,
sender_message: proto::TransactionSenderMessage,
message_trace: u64,
) -> Result<(), TransactionServiceError>
{
let sender_message: TransactionSenderMessage = sender_message
Expand All @@ -710,17 +713,19 @@ where
if let TransactionSenderMessage::Single(data) = sender_message.clone() {
trace!(
target: LOG_TARGET,
"Transaction (TxId: {}) received from {}",
"Transaction (TxId: {}) received from {}, Trace: Tag#{}",
data.tx_id,
source_pubkey
source_pubkey,
message_trace
);
// Check this is not a repeat message i.e. tx_id doesn't already exist in our pending or completed
// transactions
if self.db.transaction_exists(data.tx_id).await? {
trace!(
target: LOG_TARGET,
"Transaction (TxId: {}) already present in database.",
data.tx_id
"Transaction (TxId: {}) already present in database, Trace: Tag#{}.",
data.tx_id,
message_trace
);
return Err(TransactionServiceError::RepeatedMessageError);
}
Expand Down Expand Up @@ -777,11 +782,18 @@ where

info!(
target: LOG_TARGET,
"Transaction with TX_ID = {} received from {}. Reply Sent", tx_id, source_pubkey,
"Transaction with TX_ID = {} received from {}, Trace: Tag#{}. Reply Sent",
tx_id,
source_pubkey,
message_trace
);
info!(
target: LOG_TARGET,
"Transaction (TX_ID: {}) - Amount: {} - Message: {}", tx_id, amount, data.message
"Transaction (TX_ID: {}) - Amount: {} - Message: {}, Trace: Tag#{}",
tx_id,
amount,
data.message,
message_trace
);

let _ = self
Expand Down
2 changes: 2 additions & 0 deletions base_layer/wallet/tests/support/comms_and_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use futures::Sink;
use rand::{rngs::OsRng, RngCore};
use std::{error::Error, sync::Arc, time::Duration};
use tari_comms::{
multiaddr::Multiaddr,
Expand Down Expand Up @@ -84,6 +85,7 @@ pub fn create_dummy_message<T>(inner: T, public_key: &CommsPublicKey) -> DomainM
flags: Default::default(),
network: Network::LocalTest,
destination: Default::default(),
message_trace: OsRng.next_u64(),
},
authenticated_origin: None,
source_peer: peer_source,
Expand Down
34 changes: 29 additions & 5 deletions comms/dht/src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ mod test {
DhtBuilder,
};
use futures::{channel::mpsc, StreamExt};
use rand::{rngs::OsRng, RngCore};
use std::{sync::Arc, time::Duration};
use tari_comms::{
message::MessageExt,
Expand Down Expand Up @@ -380,7 +381,13 @@ mod test {
let mut service = dht.inbound_middleware_layer().layer(SinkService::new(out_tx));

let msg = wrap_in_envelope_body!(b"secret".to_vec());
let dht_envelope = make_dht_envelope(&node_identity, msg.to_encoded_bytes(), DhtMessageFlags::empty(), false);
let dht_envelope = make_dht_envelope(
&node_identity,
msg.to_encoded_bytes(),
DhtMessageFlags::empty(),
false,
OsRng.next_u64(),
);
let inbound_message = make_comms_inbound_message(&node_identity, dht_envelope.to_encoded_bytes().into());

let msg = {
Expand Down Expand Up @@ -422,7 +429,13 @@ mod test {

let msg = wrap_in_envelope_body!(b"secret".to_vec());
// Encrypt for self
let dht_envelope = make_dht_envelope(&node_identity, msg.to_encoded_bytes(), DhtMessageFlags::ENCRYPTED, true);
let dht_envelope = make_dht_envelope(
&node_identity,
msg.to_encoded_bytes(),
DhtMessageFlags::ENCRYPTED,
true,
OsRng.next_u64(),
);
let inbound_message = make_comms_inbound_message(&node_identity, dht_envelope.to_encoded_bytes().into());

let msg = {
Expand Down Expand Up @@ -470,7 +483,13 @@ mod test {
let node_identity2 = make_node_identity();
let ecdh_key = crypt::generate_ecdh_secret(node_identity2.secret_key(), node_identity2.public_key());
let encrypted_bytes = crypt::encrypt(&ecdh_key, &msg.to_encoded_bytes()).unwrap();
let dht_envelope = make_dht_envelope(&node_identity, encrypted_bytes, DhtMessageFlags::ENCRYPTED, true);
let dht_envelope = make_dht_envelope(
&node_identity,
encrypted_bytes,
DhtMessageFlags::ENCRYPTED,
true,
OsRng.next_u64(),
);

let origin_mac = dht_envelope.header.as_ref().unwrap().origin_mac.clone();
assert_eq!(origin_mac.is_empty(), false);
Expand Down Expand Up @@ -514,8 +533,13 @@ mod test {
let mut service = dht.inbound_middleware_layer().layer(SinkService::new(next_service_tx));

let msg = wrap_in_envelope_body!(b"secret".to_vec());
let mut dht_envelope =
make_dht_envelope(&node_identity, msg.to_encoded_bytes(), DhtMessageFlags::empty(), false);
let mut dht_envelope = make_dht_envelope(
&node_identity,
msg.to_encoded_bytes(),
DhtMessageFlags::empty(),
false,
OsRng.next_u64(),
);
dht_envelope.header.as_mut().and_then(|header| {
header.message_type = DhtMessageType::SafStoredMessages as i32;
Some(header)
Expand Down
7 changes: 5 additions & 2 deletions comms/dht/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ pub struct DhtMessageHeader {
pub message_type: DhtMessageType,
pub network: Network,
pub flags: DhtMessageFlags,
pub message_trace: u64,
}

impl DhtMessageHeader {
Expand All @@ -134,8 +135,8 @@ impl Display for DhtMessageHeader {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(
f,
"DhtMessageHeader (Dest:{}, Type:{:?}, Network:{:?}, Flags:{:?})",
self.destination, self.message_type, self.network, self.flags
"DhtMessageHeader (Dest:{}, Type:{:?}, Network:{:?}, Flags:{:?}, Trace:Tag#{:?})",
self.destination, self.message_type, self.network, self.flags, self.message_trace
)
}
}
Expand Down Expand Up @@ -169,6 +170,7 @@ impl TryFrom<DhtHeader> for DhtMessageHeader {
.ok_or_else(|| DhtMessageError::InvalidMessageType)?,
network: Network::from_i32(header.network).ok_or_else(|| DhtMessageError::InvalidNetwork)?,
flags: DhtMessageFlags::from_bits(header.flags).ok_or_else(|| DhtMessageError::InvalidMessageFlags)?,
message_trace: header.message_trace,
})
}
}
Expand Down Expand Up @@ -198,6 +200,7 @@ impl From<DhtMessageHeader> for DhtHeader {
message_type: header.message_type as i32,
network: header.network as i32,
flags: header.flags.bits(),
message_trace: header.message_trace,
}
}
}
Expand Down
20 changes: 14 additions & 6 deletions comms/dht/src/inbound/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,18 @@ where S: Service<DhtInboundMessage, Response = (), Error = PipelineError> + Clon

match DhtEnvelope::decode(&mut body) {
Ok(dht_envelope) => {
debug!(
target: LOG_TARGET,
"Deserialization succeeded. Passing message {} onto next service", tag
);

let inbound_msg = DhtInboundMessage::new(
tag,
dht_envelope.header.try_into().map_err(PipelineError::from_debug)?,
source_peer,
dht_envelope.body,
);
debug!(
target: LOG_TARGET,
"Deserialization succeeded. Passing message {} onto next service (Trace: Tag#{})",
tag,
inbound_msg.dht_header.message_trace
);

next_service.oneshot(inbound_msg).await
},
Expand Down Expand Up @@ -126,6 +127,7 @@ mod test {
test_utils::{make_comms_inbound_message, make_dht_envelope, make_node_identity, service_spy},
};
use futures::executor::block_on;
use rand::{rngs::OsRng, RngCore};
use tari_comms::message::MessageExt;
use tari_test_utils::panic_context;

Expand All @@ -138,7 +140,13 @@ mod test {

assert!(deserialize.poll_ready(&mut cx).is_ready());
let node_identity = make_node_identity();
let dht_envelope = make_dht_envelope(&node_identity, b"A".to_vec(), DhtMessageFlags::empty(), false);
let dht_envelope = make_dht_envelope(
&node_identity,
b"A".to_vec(),
DhtMessageFlags::empty(),
false,
OsRng.next_u64(),
);
block_on(deserialize.call(make_comms_inbound_message(
&node_identity,
dht_envelope.to_encoded_bytes().into(),
Expand Down
1 change: 1 addition & 0 deletions comms/dht/src/outbound/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ where S: Service<DhtOutboundMessage, Response = (), Error = PipelineError>
ephemeral_public_key: ephemeral_public_key.clone(),
origin_mac: origin_mac.clone(),
is_broadcast,
message_trace: tag.value(),
},
send_state,
)
Expand Down
Loading

0 comments on commit d426174

Please sign in to comment.