Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed Sep 10, 2024
1 parent 3788ecb commit 3f38620
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 46 deletions.
5 changes: 1 addition & 4 deletions base_layer/contacts/tests/contacts_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub fn setup_contacts_service<T: ContactsBackend + 'static>(
auto_request: true,
..Default::default()
},
excluded_dial_addresses: vec![],
..Default::default()
},
allow_test_addresses: true,
Expand All @@ -96,10 +97,6 @@ pub fn setup_contacts_service<T: ContactsBackend + 'static>(
rpc_max_simultaneous_sessions: 0,
rpc_max_sessions_per_peer: 0,
listener_self_liveness_check_interval: None,
excluded_dial_addresses: vec![
"/ip4/172.2.3.4/tcp/18188".parse::<Multiaddr>().expect("will not fail"),
"/ip4/172.2.3.4/tcp/18189".parse::<Multiaddr>().expect("will not fail"),
],
};
let peer_message_subscription_factory = Arc::new(subscription_factory);
let shutdown = Shutdown::new();
Expand Down
6 changes: 0 additions & 6 deletions base_layer/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@ pub struct P2pConfig {
/// The maximum allowed RPC sessions per peer.
/// Default: 10
pub rpc_max_sessions_per_peer: usize,
/// Addresses that should never be dialed
pub excluded_dial_addresses: Vec<Multiaddr>,
}

impl Default for P2pConfig {
Expand All @@ -151,10 +149,6 @@ impl Default for P2pConfig {
auxiliary_tcp_listener_address: None,
rpc_max_simultaneous_sessions: 100,
rpc_max_sessions_per_peer: 10,
excluded_dial_addresses: vec![
"/ip4/172.2.3.4/tcp/18188".parse::<Multiaddr>().expect("will not fail"),
"/ip4/172.2.3.4/tcp/18189".parse::<Multiaddr>().expect("will not fail"),
],
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion base_layer/p2p/src/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ async fn configure_comms_and_dht(
.with_listener_liveness_allowlist_cidrs(listener_liveness_allowlist_cidrs)
.with_dial_backoff(ConstantBackoff::new(Duration::from_millis(500)))
.with_peer_storage(peer_database, Some(file_lock))
.with_excluded_dial_addresses(config.excluded_dial_addresses.clone());
.with_excluded_dial_addresses(config.dht.excluded_dial_addresses.clone());

let mut comms = match config.auxiliary_tcp_listener_address {
Some(ref addr) => builder.with_auxiliary_tcp_listener_address(addr.clone()).build()?,
Expand Down
5 changes: 1 addition & 4 deletions base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5327,6 +5327,7 @@ pub unsafe extern "C" fn comms_config_create(
minimum_desired_tcpv4_node_ratio: 0.0,
..Default::default()
},
excluded_dial_addresses: vec![],
..Default::default()
},
allow_test_addresses: true,
Expand All @@ -5335,10 +5336,6 @@ pub unsafe extern "C" fn comms_config_create(
rpc_max_simultaneous_sessions: 0,
rpc_max_sessions_per_peer: 0,
listener_self_liveness_check_interval: None,
excluded_dial_addresses: vec![
"/ip4/172.2.3.4/tcp/18188".parse::<Multiaddr>().expect("will not fail"),
"/ip4/172.2.3.4/tcp/18189".parse::<Multiaddr>().expect("will not fail"),
],
};

Box::into_raw(Box::new(config))
Expand Down
4 changes: 2 additions & 2 deletions common/config/presets/c_base_node_c.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,6 @@ listener_self_liveness_check_interval = 15
#rpc_max_simultaneous_sessions = 100
# The maximum comms RPC sessions allowed per peer (default value = 10).
#rpc_max_sessions_per_peer = 10
# Addresses that should never be dialed (default value = ["/ip4/172.2.3.4/tcp/18188", "/ip4/172.2.3.4/tcp/18189"])
#excluded_dial_addresses = ["/ip4/172.2.3.4/tcp/18188", "/ip4/172.2.3.4/tcp/18189"]

[base_node.p2p.transport]
# -------------- Transport configuration --------------
Expand Down Expand Up @@ -318,3 +316,5 @@ database_url = "data/base_node/dht.db"
# In a situation where a node is not well-connected and many nodes are locally marked as offline, we can retry
# peers that were previously tried. Default: 2 hours
#offline_peer_cooldown = 7_200 # 2 * 60 * 60
# Addresses that should never be dialed (default value = [])
#excluded_dial_addresses = ["/ip4/x.x.x.x/tcp/xxxx", "/ip4/x.y.x.y/tcp/xyxy"]
4 changes: 2 additions & 2 deletions common/config/presets/d_console_wallet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,6 @@ event_channel_size = 3500
#rpc_max_simultaneous_sessions = 100
# The maximum comms RPC sessions allowed per peer (default value = 10).
#rpc_max_sessions_per_peer = 10
# Addresses that should never be dialed (default value = ["/ip4/172.2.3.4/tcp/18188", "/ip4/172.2.3.4/tcp/18189"])
#excluded_dial_addresses = ["/ip4/172.2.3.4/tcp/18188", "/ip4/172.2.3.4/tcp/18189"]

[wallet.p2p.transport]
# -------------- Transport configuration --------------
Expand Down Expand Up @@ -362,3 +360,5 @@ network_discovery.initial_peer_sync_delay = 25
# In a situation where a node is not well-connected and many nodes are locally marked as offline, we can retry
# peers that were previously tried. Default: 2 hours
#offline_peer_cooldown = 7_200 # 2 * 60 * 60
# Addresses that should never be dialed (default value = [])
#excluded_dial_addresses = ["/ip4/x.x.x.x/tcp/xxxx", "/ip4/x.y.x.y/tcp/xyxy"]
8 changes: 1 addition & 7 deletions comms/core/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,18 +660,12 @@ impl ConnectivityManagerActor {
PeerDisconnected(_, node_id, minimized) => (node_id, ConnectionStatus::Disconnected(*minimized), None),
PeerConnected(conn) => (conn.peer_node_id(), ConnectionStatus::Connected, Some(conn.clone())),
PeerConnectFailed(node_id, ConnectionManagerError::AllPeerAddressesAreExcluded(msg)) => {
warn!(
debug!(
target: LOG_TARGET,
"Peer '{}' contains only excluded addresses ({})",
node_id,
msg
);
self.ban_peer(
node_id,
Duration::from_secs(3 * 24 * 60 * 60), // 3 days
"All peer addresses are excluded (User intervention)".to_string(),
)
.await?;
(node_id, ConnectionStatus::Failed, None)
},
PeerConnectFailed(node_id, ConnectionManagerError::NoiseHandshakeError(msg)) => {
Expand Down
11 changes: 11 additions & 0 deletions comms/core/src/peer_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,17 @@ impl PeerManager {
Ok(peer.features)
}

pub async fn get_peer_multi_addresses(
&self,
node_id: &NodeId,
) -> Result<MultiaddressesWithStats, PeerManagerError> {
let peer = self
.find_by_node_id(node_id)
.await?
.ok_or(PeerManagerError::PeerNotFoundError)?;
Ok(peer.addresses)
}

/// This will store metadata inside of the metadata field in the peer provided by the nodeID.
/// It will return None if the value was empty and the old value if the value was updated
pub async fn set_peer_metadata(
Expand Down
103 changes: 90 additions & 13 deletions comms/dht/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use log::*;
use tari_comms::{
connection_manager::ConnectionManagerError,
connectivity::{ConnectivityError, ConnectivityRequester, ConnectivitySelection},
multiaddr::Multiaddr,
peer_manager::{NodeId, NodeIdentity, PeerFeatures, PeerManager, PeerManagerError, PeerQuery, PeerQuerySortBy},
types::CommsPublicKey,
PeerConnection,
Expand Down Expand Up @@ -88,6 +89,8 @@ pub enum DhtActorError {
ConnectivityError(#[from] ConnectivityError),
#[error("Connectivity event stream closed")]
ConnectivityEventStreamClosed,
#[error("All peer addresses are excluded")]
AllPeerAddressesAreExcluded,
}

impl<T> From<mpsc::error::SendError<T>> for DhtActorError {
Expand Down Expand Up @@ -381,15 +384,44 @@ impl DhtActor {
}
}

// Helper function to check if all peer addresses are excluded
async fn check_if_addresses_excluded(
excluded_dial_addresses: Vec<Multiaddr>,
peer_manager: &PeerManager,
node_id: NodeId,
) -> Result<(), DhtActorError> {
if !excluded_dial_addresses.is_empty() {
let addresses = peer_manager.get_peer_multi_addresses(&node_id).await?;
if addresses
.iter()
.all(|addr| excluded_dial_addresses.contains(addr.address()))
{
warn!(
target: LOG_TARGET,
"All peer addresses are excluded. Not broadcasting join message."
);
return Err(DhtActorError::AllPeerAddressesAreExcluded);
}
}
Ok(())
}

#[allow(clippy::too_many_lines)]
fn request_handler(&mut self, request: DhtRequest) -> BoxFuture<'static, Result<(), DhtActorError>> {
#[allow(clippy::enum_glob_use)]
use DhtRequest::*;
match request {
SendJoin => {
let node_identity = Arc::clone(&self.node_identity);
let peer_manager = Arc::clone(&self.peer_manager);
let outbound_requester = self.outbound_requester.clone();
Box::pin(Self::broadcast_join(node_identity, outbound_requester))
let excluded_dial_addresses = self.config.excluded_dial_addresses.clone();
Box::pin(Self::broadcast_join(
node_identity,
peer_manager,
excluded_dial_addresses,
outbound_requester,
))
},
MsgHashCacheInsert {
message_hash,
Expand Down Expand Up @@ -465,7 +497,16 @@ impl DhtActor {
let connectivity = self.connectivity.clone();
let discovery = self.discovery.clone();
let peer_manager = self.peer_manager.clone();
let node_identity = self.node_identity.clone();
let excluded_dial_addresses = self.config.excluded_dial_addresses.clone();

Box::pin(async move {
DhtActor::check_if_addresses_excluded(
excluded_dial_addresses,
&peer_manager,
node_identity.node_id().clone(),
)
.await?;
let mut task = DiscoveryDialTask::new(connectivity, peer_manager, discovery);
let result = task.run(public_key).await;
let _result = reply.send(result);
Expand All @@ -491,8 +532,16 @@ impl DhtActor {

async fn broadcast_join(
node_identity: Arc<NodeIdentity>,
peer_manager: Arc<PeerManager>,
excluded_dial_addresses: Vec<Multiaddr>,
mut outbound_requester: OutboundMessageRequester,
) -> Result<(), DhtActorError> {
DhtActor::check_if_addresses_excluded(
excluded_dial_addresses,
peer_manager.as_ref(),
node_identity.node_id().clone(),
)
.await?;
let message = JoinMessage::from(&node_identity);

debug!(target: LOG_TARGET, "Sending Join message to closest peers");
Expand Down Expand Up @@ -524,31 +573,31 @@ impl DhtActor {
) -> Result<Vec<NodeId>, DhtActorError> {
#[allow(clippy::enum_glob_use)]
use BroadcastStrategy::*;
match broadcast_strategy {
let peers = match broadcast_strategy {
DirectNodeId(node_id) => {
// Send to a particular peer matching the given node ID
peer_manager
.direct_identity_node_id(&node_id)
.await
.map(|peer| peer.map(|p| vec![p.node_id]).unwrap_or_default())
.map_err(Into::into)
.map_err(Into::<DhtActorError>::into)?
},
DirectPublicKey(public_key) => {
// Send to a particular peer matching the given node ID
peer_manager
.direct_identity_public_key(&public_key)
.await
.map(|peer| peer.map(|p| vec![p.node_id]).unwrap_or_default())
.map_err(Into::into)
.map_err(Into::<DhtActorError>::into)?
},
Flood(exclude) => {
let peers = connectivity
.select_connections(ConnectivitySelection::all_nodes(exclude))
.await?;
Ok(peers.into_iter().map(|p| p.peer_node_id().clone()).collect())
peers.into_iter().map(|p| p.peer_node_id().clone()).collect()
},
ClosestNodes(closest_request) => {
Self::select_closest_node_connected(closest_request, config, connectivity, peer_manager).await
Self::select_closest_node_connected(closest_request, config, connectivity, peer_manager.clone()).await?
},
DirectOrClosestNodes(closest_request) => {
// First check if a direct connection exists
Expand All @@ -557,20 +606,22 @@ impl DhtActor {
.await?
.is_some()
{
return Ok(vec![closest_request.node_id.clone()]);
vec![closest_request.node_id.clone()]
} else {
Self::select_closest_node_connected(closest_request, config, connectivity, peer_manager.clone())
.await?
}
Self::select_closest_node_connected(closest_request, config, connectivity, peer_manager).await
},
Random(n, excluded) => {
// Send to a random set of peers of size n that are Communication Nodes
Ok(peer_manager
peer_manager
.random_peers(n, &excluded)
.await?
.into_iter()
.map(|p| p.node_id)
.collect())
.collect()
},
SelectedPeers(peers) => Ok(peers),
SelectedPeers(peers) => peers,
Broadcast(exclude) => {
let connections = connectivity
.select_connections(ConnectivitySelection::random_nodes(
Expand All @@ -597,7 +648,7 @@ impl DhtActor {
candidates.len()
);

Ok(candidates)
candidates
},
Propagate(destination, exclude) => {
let dest_node_id = destination.to_derived_node_id();
Expand Down Expand Up @@ -687,8 +738,34 @@ impl DhtActor {
candidates.iter().map(|n| n.short_str()).collect::<Vec<_>>().join(", ")
);

Ok(candidates)
candidates
},
};
if config.excluded_dial_addresses.is_empty() {
return Ok(peers);
};

let mut filtered_peers = Vec::with_capacity(peers.len());
for id in &peers {
let addresses = peer_manager.get_peer_multi_addresses(id).await?;
if addresses
.iter()
.all(|addr| config.excluded_dial_addresses.contains(addr.address()))
{
trace!(target: LOG_TARGET, "Peer '{}' has only excluded addresses. Skipping.", id);
} else {
filtered_peers.push(id.clone());
}
}

if filtered_peers.is_empty() {
warn!(
target: LOG_TARGET,
"All selected peers have only excluded addresses. No peers will be selected."
);
Err(DhtActorError::AllPeerAddressesAreExcluded)
} else {
Ok(filtered_peers)
}
}

Expand Down
5 changes: 4 additions & 1 deletion comms/dht/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{path::Path, time::Duration};

use serde::{Deserialize, Serialize};
use tari_common::configuration::serializers;
use tari_comms::peer_validator::PeerValidatorConfig;
use tari_comms::{multiaddr::Multiaddr, peer_validator::PeerValidatorConfig};

use crate::{
actor::OffenceSeverity,
Expand Down Expand Up @@ -115,6 +115,8 @@ pub struct DhtConfig {
/// Configuration for peer validation
/// See [PeerValidatorConfig]
pub peer_validator_config: PeerValidatorConfig,
/// Addresses that should never be dialed
pub excluded_dial_addresses: Vec<Multiaddr>,
}

impl DhtConfig {
Expand Down Expand Up @@ -193,6 +195,7 @@ impl Default for DhtConfig {
max_permitted_peer_claims: 5,
offline_peer_cooldown: Duration::from_secs(24 * 60 * 60),
peer_validator_config: Default::default(),
excluded_dial_addresses: vec![],
}
}
}
Expand Down
Loading

0 comments on commit 3f38620

Please sign in to comment.