From e246d5a73fb18ff8dbf2b2418abdd3d185dfe8c9 Mon Sep 17 00:00:00 2001 From: brianp Date: Mon, 3 Apr 2023 10:32:16 +0200 Subject: [PATCH] Move the encrypted send direct into the requester --- .../contacts/src/contacts_service/service.rs | 23 +++++----------- .../core/src/base_node/service/service.rs | 2 +- base_layer/core/tests/mempool.rs | 4 +-- .../p2p/src/services/liveness/service.rs | 2 +- .../protocols/transaction_send_protocol.rs | 2 +- .../tasks/send_finalized_transaction.rs | 2 +- .../tasks/send_transaction_cancelled.rs | 2 +- .../tasks/send_transaction_reply.rs | 2 +- comms/dht/src/outbound/requester.rs | 27 +++++++++++++++++++ 9 files changed, 41 insertions(+), 25 deletions(-) diff --git a/base_layer/contacts/src/contacts_service/service.rs b/base_layer/contacts/src/contacts_service/service.rs index 821c7e340fc..6e1cb24d555 100644 --- a/base_layer/contacts/src/contacts_service/service.rs +++ b/base_layer/contacts/src/contacts_service/service.rs @@ -32,12 +32,7 @@ use futures::{pin_mut, StreamExt}; use log::*; use tari_common_types::tari_address::TariAddress; use tari_comms::connectivity::{ConnectivityEvent, ConnectivityRequester}; -use tari_comms_dht::{ - domain_message::OutboundDomainMessage, - envelope::NodeDestination, - outbound::{DhtOutboundError, OutboundEncryption, SendMessageParams}, - Dht, -}; +use tari_comms_dht::{domain_message::OutboundDomainMessage, outbound::OutboundEncryption, Dht}; use tari_p2p::{ comms_connector::SubscriptionFactory, domain_message::DomainMessage, @@ -290,19 +285,13 @@ where T: ContactsBackend + 'static let mut comms_outbound = self.dht.outbound_requester(); comms_outbound - .send_message( - SendMessageParams::new() - .with_debug_info(format!("Send direct to {}", &address.public_key())) - .direct_public_key(address.public_key().clone()) - .with_encryption(encryption) - .with_destination(NodeDestination::from(address.public_key().clone())) - .finish(), + .send_direct( + address.public_key().clone(), ob_message, + encryption, + "contact service messaging".to_string(), ) - .await? - .resolve() - .await - .map_err(Into::::into)?; + .await?; }, Err(e) => return Err(e), _ => { diff --git a/base_layer/core/src/base_node/service/service.rs b/base_layer/core/src/base_node/service/service.rs index 5622e6a5f4a..0ff14288eab 100644 --- a/base_layer/core/src/base_node/service/service.rs +++ b/base_layer/core/src/base_node/service/service.rs @@ -391,7 +391,7 @@ async fn handle_incoming_request( ); let send_message_response = outbound_message_service - .send_direct( + .send_direct_unencrypted( origin_public_key, OutboundDomainMessage::new(&TariMessageType::BaseNodeResponse, message), "Outbound response message from base node".to_string(), diff --git a/base_layer/core/tests/mempool.rs b/base_layer/core/tests/mempool.rs index a0cc0be8e96..b1dc9aa9c55 100644 --- a/base_layer/core/tests/mempool.rs +++ b/base_layer/core/tests/mempool.rs @@ -854,7 +854,7 @@ async fn receive_and_propagate_transaction() { alice_node .outbound_message_service - .send_direct( + .send_direct_unencrypted( bob_node.node_identity.public_key().clone(), OutboundDomainMessage::new( &TariMessageType::NewTransaction, @@ -866,7 +866,7 @@ async fn receive_and_propagate_transaction() { .unwrap(); alice_node .outbound_message_service - .send_direct( + .send_direct_unencrypted( carol_node.node_identity.public_key().clone(), OutboundDomainMessage::new( &TariMessageType::NewTransaction, diff --git a/base_layer/p2p/src/services/liveness/service.rs b/base_layer/p2p/src/services/liveness/service.rs index 08448722729..2d63c27eb6a 100644 --- a/base_layer/p2p/src/services/liveness/service.rs +++ b/base_layer/p2p/src/services/liveness/service.rs @@ -226,7 +226,7 @@ where async fn send_pong(&mut self, nonce: u64, dest: CommsPublicKey) -> Result<(), LivenessError> { let msg = PingPongMessage::pong_with_metadata(nonce, self.state.metadata().clone()); self.outbound_messaging - .send_direct( + .send_direct_unencrypted( dest, OutboundDomainMessage::new(&TariMessageType::PingPong, msg), "Sending pong".to_string(), diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs index 3c475f2df25..794dc8d86df 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs @@ -665,7 +665,7 @@ where match self .resources .outbound_message_service - .send_direct( + .send_direct_unencrypted( self.dest_address.public_key().clone(), OutboundDomainMessage::new(&TariMessageType::SenderPartialTransaction, proto_message.clone()), "transaction send".to_string(), diff --git a/base_layer/wallet/src/transaction_service/tasks/send_finalized_transaction.rs b/base_layer/wallet/src/transaction_service/tasks/send_finalized_transaction.rs index 66ca04aab27..2b438de1e5e 100644 --- a/base_layer/wallet/src/transaction_service/tasks/send_finalized_transaction.rs +++ b/base_layer/wallet/src/transaction_service/tasks/send_finalized_transaction.rs @@ -108,7 +108,7 @@ pub async fn send_finalized_transaction_message_direct( let mut store_and_forward_send_result = false; let mut direct_send_result = false; match outbound_message_service - .send_direct( + .send_direct_unencrypted( destination_public_key.clone(), OutboundDomainMessage::new( &TariMessageType::TransactionFinalized, diff --git a/base_layer/wallet/src/transaction_service/tasks/send_transaction_cancelled.rs b/base_layer/wallet/src/transaction_service/tasks/send_transaction_cancelled.rs index 5cac558ee4c..da51dd654ce 100644 --- a/base_layer/wallet/src/transaction_service/tasks/send_transaction_cancelled.rs +++ b/base_layer/wallet/src/transaction_service/tasks/send_transaction_cancelled.rs @@ -40,7 +40,7 @@ pub async fn send_transaction_cancelled_message( // Send both direct and SAF we are not going to monitor the progress on these messages for potential resend as // they are just courtesy messages let _send_message_response = outbound_message_service - .send_direct( + .send_direct_unencrypted( destination_public_key.clone(), OutboundDomainMessage::new(&TariMessageType::TransactionCancelled, proto_message.clone()), "transaction cancelled".to_string(), diff --git a/base_layer/wallet/src/transaction_service/tasks/send_transaction_reply.rs b/base_layer/wallet/src/transaction_service/tasks/send_transaction_reply.rs index cb822904a5c..462ba452e13 100644 --- a/base_layer/wallet/src/transaction_service/tasks/send_transaction_reply.rs +++ b/base_layer/wallet/src/transaction_service/tasks/send_transaction_reply.rs @@ -96,7 +96,7 @@ pub async fn send_transaction_reply_direct( .try_into() .map_err(TransactionServiceError::ServiceError)?; match outbound_message_service - .send_direct( + .send_direct_unencrypted( inbound_transaction.source_address.public_key().clone(), OutboundDomainMessage::new(&TariMessageType::ReceiverPartialTransactionReply, proto_message.clone()), "wallet transaction reply".to_string(), diff --git a/comms/dht/src/outbound/requester.rs b/comms/dht/src/outbound/requester.rs index 0ac3e2e619e..e9fdacfe19f 100644 --- a/comms/dht/src/outbound/requester.rs +++ b/comms/dht/src/outbound/requester.rs @@ -52,6 +52,33 @@ impl OutboundMessageRequester { /// Send directly to a peer. If the peer does not exist in the peer list, a discovery will be initiated. pub async fn send_direct( + &mut self, + dest_public_key: CommsPublicKey, + message: OutboundDomainMessage, + encryption: OutboundEncryption, + source_info: String, + ) -> Result + where + T: prost::Message, + { + self.send_message( + SendMessageParams::new() + .with_debug_info(format!("Send direct to {} from {}", &dest_public_key, source_info)) + .direct_public_key(dest_public_key.clone()) + .with_discovery(true) + .with_encryption(encryption) + .with_destination(dest_public_key.into()) + .finish(), + message, + ) + .await? + .resolve() + .await + .map_err(Into::into) + } + + /// Send directly to a peer unencrypted. If the peer does not exist in the peer list, a discovery will be initiated. + pub async fn send_direct_unencrypted( &mut self, dest_public_key: CommsPublicKey, message: OutboundDomainMessage,