Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add configurable exclude dialer addresses for universe #6543

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion base_layer/contacts/tests/contacts_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use rand::rngs::OsRng;
use tari_common::configuration::{MultiaddrList, Network, StringList};
use tari_common_sqlite::connection::{DbConnection, DbConnectionUrl};
use tari_common_types::{tari_address::TariAddress, types::PublicKey};
use tari_comms::{peer_manager::PeerFeatures, NodeIdentity};
use tari_comms::{multiaddr::Multiaddr, peer_manager::PeerFeatures, NodeIdentity};
use tari_comms_dht::{store_forward::SafConfig, DhtConfig};
use tari_contacts::contacts_service::{
error::{ContactsServiceError, ContactsServiceStorageError},
Expand Down Expand Up @@ -88,6 +88,7 @@ pub fn setup_contacts_service<T: ContactsBackend + 'static>(
auto_request: true,
..Default::default()
},
excluded_dial_addresses: vec![],
..Default::default()
},
allow_test_addresses: true,
Expand Down
3 changes: 2 additions & 1 deletion base_layer/p2p/src/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ async fn configure_comms_and_dht(
.with_listener_liveness_max_sessions(config.listener_liveness_max_sessions)
.with_listener_liveness_allowlist_cidrs(listener_liveness_allowlist_cidrs)
.with_dial_backoff(ConstantBackoff::new(Duration::from_millis(500)))
.with_peer_storage(peer_database, Some(file_lock));
.with_peer_storage(peer_database, Some(file_lock))
.with_excluded_dial_addresses(config.dht.excluded_dial_addresses.clone());

let mut comms = match config.auxiliary_tcp_listener_address {
Some(ref addr) => builder.with_auxiliary_tcp_listener_address(addr.clone()).build()?,
Expand Down
1 change: 1 addition & 0 deletions base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5327,6 +5327,7 @@ pub unsafe extern "C" fn comms_config_create(
minimum_desired_tcpv4_node_ratio: 0.0,
..Default::default()
},
excluded_dial_addresses: vec![],
..Default::default()
},
allow_test_addresses: true,
Expand Down
2 changes: 2 additions & 0 deletions common/config/presets/c_base_node_c.toml
Original file line number Diff line number Diff line change
Expand Up @@ -316,3 +316,5 @@ database_url = "data/base_node/dht.db"
# In a situation where a node is not well-connected and many nodes are locally marked as offline, we can retry
# peers that were previously tried. Default: 2 hours
#offline_peer_cooldown = 7_200 # 2 * 60 * 60
# Addresses that should never be dialed (default value = [])
#excluded_dial_addresses = ["/ip4/x.x.x.x/tcp/xxxx", "/ip4/x.y.x.y/tcp/xyxy"]
4 changes: 3 additions & 1 deletion common/config/presets/d_console_wallet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ event_channel_size = 3500
# peers can find you.
# _NOTE_: If using the `tor` transport type, public_address will be ignored and an onion address will be
# automatically configured
#public_addresses = ["/ip4/172.2.3.4/tcp/18189",]
#public_addresses = ["/ip4/172.2.3.4/tcp/18188",]

# Optionally bind an additional TCP socket for inbound Tari P2P protocol commms.
# Use cases include:
Expand Down Expand Up @@ -360,3 +360,5 @@ network_discovery.initial_peer_sync_delay = 25
# In a situation where a node is not well-connected and many nodes are locally marked as offline, we can retry
# peers that were previously tried. Default: 2 hours
#offline_peer_cooldown = 7_200 # 2 * 60 * 60
# Addresses that should never be dialed (default value = [])
#excluded_dial_addresses = ["/ip4/x.x.x.x/tcp/xxxx", "/ip4/x.y.x.y/tcp/xyxy"]
5 changes: 5 additions & 0 deletions comms/core/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ impl CommsBuilder {
self
}

pub fn with_excluded_dial_addresses(mut self, excluded_addresses: Vec<Multiaddr>) -> Self {
self.connection_manager_config.excluded_dial_addresses = excluded_addresses;
self
}

/// Restrict liveness sessions to certain address ranges (CIDR format).
pub fn with_listener_liveness_allowlist_cidrs(mut self, cidrs: Vec<cidr::AnyIpCidr>) -> Self {
self.connection_manager_config.liveness_cidr_allowlist = cidrs;
Expand Down
26 changes: 17 additions & 9 deletions comms/core/src/connection_manager/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ where
fn handle_request(&mut self, pending_dials: &mut DialFuturesUnordered, request: DialerRequest) {
use DialerRequest::{CancelPendingDial, Dial, NotifyNewInboundConnection};
debug!(target: LOG_TARGET, "Connection dialer got request: {:?}", request);

match request {
Dial(peer, reply_tx) => {
self.handle_dial_peer_request(pending_dials, peer, reply_tx);
Expand Down Expand Up @@ -515,7 +516,7 @@ where
tokio::select! {
_ = delay => {
debug!(target: LOG_TARGET, "[Attempt {}] Connecting to peer '{}'", current_state.num_attempts(), current_state.peer().node_id.short_str());
match Self::dial_peer(current_state, &noise_config, &current_transport, config.network_info.network_wire_byte).await {
match Self::dial_peer(current_state, &noise_config, &current_transport, config.network_info.network_wire_byte, config.excluded_dial_addresses.clone()).await {
(state, Ok((socket, addr))) => {
debug!(target: LOG_TARGET, "Dial succeeded for peer '{}' after {} attempt(s)", state.peer().node_id.short_str(), state.num_attempts());
break (state, Ok((socket, addr)));
Expand All @@ -524,6 +525,8 @@ where
(state, Err(ConnectionManagerError::NoiseHandshakeError(e))) => break (state, Err(ConnectionManagerError::NoiseHandshakeError(e))),
// Inflight dial was cancelled
(state, Err(ConnectionManagerError::DialCancelled)) => break (state, Err(ConnectionManagerError::DialCancelled)),
// All public addresses for this peer are excluded
(state, Err(ConnectionManagerError::AllPeerAddressesAreExcluded(e))) => break (state, Err(ConnectionManagerError::AllPeerAddressesAreExcluded(e))),
(state, Err(err)) => {
debug!(target: LOG_TARGET, "Failed to dial peer {} | Attempt {} | Error: {}", state.peer().node_id.short_str(), state.num_attempts(), err);
if state.num_attempts() >= config.max_dial_attempts {
Expand Down Expand Up @@ -554,6 +557,7 @@ where
noise_config: &NoiseConfig,
transport: &TTransport,
network_byte: u8,
excluded_dial_addresses: Vec<Multiaddr>,
) -> (
DialState,
Result<(NoiseSocket<TTransport::Output>, Multiaddr), ConnectionManagerError>,
Expand All @@ -564,10 +568,7 @@ where
.clone()
.into_vec()
.iter()
.filter(|&a| {
a == &"/memory/0".parse::<Multiaddr>().expect("will not fail") || // Used for tests, allowed
a != &ConnectionManagerConfig::default().listener_address // Not allowed to dial the default
})
.filter(|&a| !excluded_dial_addresses.iter().any(|excluded| a == excluded))
.cloned()
.collect::<Vec<_>>();
if addresses.is_empty() {
Expand All @@ -577,10 +578,17 @@ where
"Dial - No more contactable addresses for peer '{}'",
node_id_hex
);
return (
dial_state,
Err(ConnectionManagerError::NoContactableAddressesForPeer(node_id_hex)),
);
return if dial_state.peer().addresses.is_empty() {
(
dial_state,
Err(ConnectionManagerError::NoContactableAddressesForPeer(node_id_hex)),
)
} else {
(
dial_state,
Err(ConnectionManagerError::AllPeerAddressesAreExcluded(node_id_hex)),
)
};
}
let cancel_signal = dial_state.get_cancel_signal();
for address in addresses {
Expand Down
2 changes: 2 additions & 0 deletions comms/core/src/connection_manager/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ pub enum ConnectionManagerError {
PeerValidationError(#[from] PeerValidatorError),
#[error("No contactable addresses for peer {0} left")]
NoContactableAddressesForPeer(String),
#[error("All peer addresses are excluded for peer {0}")]
AllPeerAddressesAreExcluded(String),
#[error("Yamux error: {0}")]
YamuxControlError(#[from] YamuxControlError),
}
Expand Down
3 changes: 3 additions & 0 deletions comms/core/src/connection_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ pub struct ConnectionManagerConfig {
pub auxiliary_tcp_listener_address: Option<Multiaddr>,
/// Peer validation configuration. See [PeerValidatorConfig]
pub peer_validation_config: PeerValidatorConfig,
/// Addresses that should never be dialed
pub excluded_dial_addresses: Vec<Multiaddr>,
}

impl Default for ConnectionManagerConfig {
Expand All @@ -154,6 +156,7 @@ impl Default for ConnectionManagerConfig {
auxiliary_tcp_listener_address: None,
peer_validation_config: PeerValidatorConfig::default(),
noise_handshake_recv_timeout: Duration::from_secs(6),
excluded_dial_addresses: vec![],
}
}
}
Expand Down
135 changes: 135 additions & 0 deletions comms/core/src/connection_manager/tests/listener_dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,138 @@ async fn banned() {

timeout(Duration::from_secs(5), dialer_fut).await.unwrap().unwrap();
}

#[tokio::test]
async fn excluded_yes() {
let (event_tx, _event_rx) = mpsc::channel(10);
let mut shutdown = Shutdown::new();

let node_identity1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let noise_config1 = NoiseConfig::new(node_identity1.clone());
let expected_proto = ProtocolId::from_static(b"/tari/test-proto");
let supported_protocols = vec![expected_proto.clone()];
let peer_manager1 = build_peer_manager();
let mut listener = PeerListener::new(
Default::default(),
"/memory/0".parse().unwrap(),
MemoryTransport,
noise_config1.clone(),
event_tx.clone(),
peer_manager1.clone(),
node_identity1.clone(),
shutdown.to_signal(),
);
listener.set_supported_protocols(supported_protocols.clone());

// Get the listener address of the peer
let address = listener.listen().await.unwrap();

let node_identity2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let noise_config2 = NoiseConfig::new(node_identity2.clone());
let (request_tx, request_rx) = mpsc::channel(1);
let peer_manager2 = build_peer_manager();
let connection_manager_config = ConnectionManagerConfig {
excluded_dial_addresses: vec![address.clone()],
..Default::default()
};
let mut dialer = Dialer::new(
connection_manager_config,
node_identity2.clone(),
peer_manager2.clone(),
MemoryTransport,
noise_config2.clone(),
ConstantBackoff::new(Duration::from_millis(100)),
request_rx,
event_tx.clone(),
shutdown.to_signal(),
);
dialer.set_supported_protocols(supported_protocols.clone());

let dialer_fut = tokio::spawn(dialer.run());

let mut peer = node_identity1.to_peer();
peer.addresses = MultiaddressesWithStats::from_addresses_with_source(vec![address], &PeerAddressSource::Config);
peer.set_id_for_test(1);

let (reply_tx, reply_rx) = oneshot::channel();
request_tx
.send(DialerRequest::Dial(Box::new(peer), Some(reply_tx)))
.await
.unwrap();

// Check that the dial failed. We're checking that the dial attempt was never made.
let res = reply_rx.await.unwrap();
assert_eq!(
format!("{:?}", res),
format!("Err(AllPeerAddressesAreExcluded(\"{}\"))", node_identity1.node_id())
);

shutdown.trigger();
timeout(Duration::from_secs(5), dialer_fut).await.unwrap().unwrap();
}

#[tokio::test]
async fn excluded_no() {
let (event_tx, _event_rx) = mpsc::channel(10);
let mut shutdown = Shutdown::new();

let node_identity1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let noise_config1 = NoiseConfig::new(node_identity1.clone());
let expected_proto = ProtocolId::from_static(b"/tari/test-proto");
let supported_protocols = vec![expected_proto.clone()];
let peer_manager1 = build_peer_manager();
let mut listener = PeerListener::new(
Default::default(),
"/memory/0".parse().unwrap(),
MemoryTransport,
noise_config1.clone(),
event_tx.clone(),
peer_manager1.clone(),
node_identity1.clone(),
shutdown.to_signal(),
);
listener.set_supported_protocols(supported_protocols.clone());

// Get the listener address of the peer
let address = listener.listen().await.unwrap();

let node_identity2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let noise_config2 = NoiseConfig::new(node_identity2.clone());
let (request_tx, request_rx) = mpsc::channel(1);
let peer_manager2 = build_peer_manager();
let connection_manager_config = ConnectionManagerConfig {
excluded_dial_addresses: vec![],
..Default::default()
};
let mut dialer = Dialer::new(
connection_manager_config,
node_identity2.clone(),
peer_manager2.clone(),
MemoryTransport,
noise_config2.clone(),
ConstantBackoff::new(Duration::from_millis(100)),
request_rx,
event_tx.clone(),
shutdown.to_signal(),
);
dialer.set_supported_protocols(supported_protocols.clone());

let dialer_fut = tokio::spawn(dialer.run());

let mut peer = node_identity1.to_peer();
peer.addresses = MultiaddressesWithStats::from_addresses_with_source(vec![address], &PeerAddressSource::Config);
peer.set_id_for_test(1);

let (reply_tx, reply_rx) = oneshot::channel();
request_tx
.send(DialerRequest::Dial(Box::new(peer), Some(reply_tx)))
.await
.unwrap();

// Check that the dial failed. We're checking that the dial attempt was never made.
let res = reply_rx.await.unwrap();
assert!(res.is_ok());

shutdown.trigger();
timeout(Duration::from_secs(5), dialer_fut).await.unwrap().unwrap();
}
9 changes: 9 additions & 0 deletions comms/core/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,15 @@ impl ConnectivityManagerActor {
let (node_id, mut new_status, connection) = match event {
PeerDisconnected(_, node_id, minimized) => (node_id, ConnectionStatus::Disconnected(*minimized), None),
PeerConnected(conn) => (conn.peer_node_id(), ConnectionStatus::Connected, Some(conn.clone())),
PeerConnectFailed(node_id, ConnectionManagerError::AllPeerAddressesAreExcluded(msg)) => {
debug!(
target: LOG_TARGET,
"Peer '{}' contains only excluded addresses ({})",
node_id,
msg
);
(node_id, ConnectionStatus::Failed, None)
},
PeerConnectFailed(node_id, ConnectionManagerError::NoiseHandshakeError(msg)) => {
if let Some(conn) = self.pool.get_connection(node_id) {
warn!(
Expand Down
11 changes: 11 additions & 0 deletions comms/core/src/peer_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,17 @@ impl PeerManager {
Ok(peer.features)
}

pub async fn get_peer_multi_addresses(
&self,
node_id: &NodeId,
) -> Result<MultiaddressesWithStats, PeerManagerError> {
let peer = self
.find_by_node_id(node_id)
.await?
.ok_or(PeerManagerError::PeerNotFoundError)?;
Ok(peer.addresses)
}

/// This will store metadata inside of the metadata field in the peer provided by the nodeID.
/// It will return None if the value was empty and the old value if the value was updated
pub async fn set_peer_metadata(
Expand Down
Loading
Loading