Skip to content

Commit

Permalink
Add exclude dialler addresses
Browse files Browse the repository at this point in the history
Addded user configurable communication node addresses that should never be dialled.
  • Loading branch information
hansieodendaal committed Sep 10, 2024
1 parent ad144f7 commit 583d331
Show file tree
Hide file tree
Showing 12 changed files with 196 additions and 12 deletions.
6 changes: 5 additions & 1 deletion base_layer/contacts/tests/contacts_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use rand::rngs::OsRng;
use tari_common::configuration::{MultiaddrList, Network, StringList};
use tari_common_sqlite::connection::{DbConnection, DbConnectionUrl};
use tari_common_types::{tari_address::TariAddress, types::PublicKey};
use tari_comms::{peer_manager::PeerFeatures, NodeIdentity};
use tari_comms::{multiaddr::Multiaddr, peer_manager::PeerFeatures, NodeIdentity};
use tari_comms_dht::{store_forward::SafConfig, DhtConfig};
use tari_contacts::contacts_service::{
error::{ContactsServiceError, ContactsServiceStorageError},
Expand Down Expand Up @@ -96,6 +96,10 @@ pub fn setup_contacts_service<T: ContactsBackend + 'static>(
rpc_max_simultaneous_sessions: 0,
rpc_max_sessions_per_peer: 0,
listener_self_liveness_check_interval: None,
excluded_dial_addresses: vec![
"/ip4/172.2.3.4/tcp/18188".parse::<Multiaddr>().expect("will not fail"),
"/ip4/172.2.3.4/tcp/18189".parse::<Multiaddr>().expect("will not fail"),
],
};
let peer_message_subscription_factory = Arc::new(subscription_factory);
let shutdown = Shutdown::new();
Expand Down
6 changes: 6 additions & 0 deletions base_layer/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ pub struct P2pConfig {
/// The maximum allowed RPC sessions per peer.
/// Default: 10
pub rpc_max_sessions_per_peer: usize,
/// Addresses that should never be dialed
pub excluded_dial_addresses: Vec<Multiaddr>,
}

impl Default for P2pConfig {
Expand All @@ -149,6 +151,10 @@ impl Default for P2pConfig {
auxiliary_tcp_listener_address: None,
rpc_max_simultaneous_sessions: 100,
rpc_max_sessions_per_peer: 10,
excluded_dial_addresses: vec![
"/ip4/172.2.3.4/tcp/18188".parse::<Multiaddr>().expect("will not fail"),
"/ip4/172.2.3.4/tcp/18189".parse::<Multiaddr>().expect("will not fail"),
],
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion base_layer/p2p/src/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ async fn configure_comms_and_dht(
.with_listener_liveness_max_sessions(config.listener_liveness_max_sessions)
.with_listener_liveness_allowlist_cidrs(listener_liveness_allowlist_cidrs)
.with_dial_backoff(ConstantBackoff::new(Duration::from_millis(500)))
.with_peer_storage(peer_database, Some(file_lock));
.with_peer_storage(peer_database, Some(file_lock))
.with_excluded_dial_addresses(config.excluded_dial_addresses.clone());

let mut comms = match config.auxiliary_tcp_listener_address {
Some(ref addr) => builder.with_auxiliary_tcp_listener_address(addr.clone()).build()?,
Expand Down
4 changes: 4 additions & 0 deletions base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5335,6 +5335,10 @@ pub unsafe extern "C" fn comms_config_create(
rpc_max_simultaneous_sessions: 0,
rpc_max_sessions_per_peer: 0,
listener_self_liveness_check_interval: None,
excluded_dial_addresses: vec![
"/ip4/172.2.3.4/tcp/18188".parse::<Multiaddr>().expect("will not fail"),
"/ip4/172.2.3.4/tcp/18189".parse::<Multiaddr>().expect("will not fail"),
],
};

Box::into_raw(Box::new(config))
Expand Down
2 changes: 2 additions & 0 deletions common/config/presets/c_base_node_c.toml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ listener_self_liveness_check_interval = 15
#rpc_max_simultaneous_sessions = 100
# The maximum comms RPC sessions allowed per peer (default value = 10).
#rpc_max_sessions_per_peer = 10
# Addresses that should never be dialed (default value = ["/ip4/172.2.3.4/tcp/18188", "/ip4/172.2.3.4/tcp/18189"])
#excluded_dial_addresses = ["/ip4/172.2.3.4/tcp/18188", "/ip4/172.2.3.4/tcp/18189"]

[base_node.p2p.transport]
# -------------- Transport configuration --------------
Expand Down
4 changes: 3 additions & 1 deletion common/config/presets/d_console_wallet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ event_channel_size = 3500
# peers can find you.
# _NOTE_: If using the `tor` transport type, public_address will be ignored and an onion address will be
# automatically configured
#public_addresses = ["/ip4/172.2.3.4/tcp/18189",]
#public_addresses = ["/ip4/172.2.3.4/tcp/18188",]

# Optionally bind an additional TCP socket for inbound Tari P2P protocol commms.
# Use cases include:
Expand Down Expand Up @@ -208,6 +208,8 @@ event_channel_size = 3500
#rpc_max_simultaneous_sessions = 100
# The maximum comms RPC sessions allowed per peer (default value = 10).
#rpc_max_sessions_per_peer = 10
# Addresses that should never be dialed (default value = ["/ip4/172.2.3.4/tcp/18188", "/ip4/172.2.3.4/tcp/18189"])
#excluded_dial_addresses = ["/ip4/172.2.3.4/tcp/18188", "/ip4/172.2.3.4/tcp/18189"]

[wallet.p2p.transport]
# -------------- Transport configuration --------------
Expand Down
5 changes: 5 additions & 0 deletions comms/core/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ impl CommsBuilder {
self
}

pub fn with_excluded_dial_addresses(mut self, excluded_addresses: Vec<Multiaddr>) -> Self {
self.connection_manager_config.excluded_dial_addresses = excluded_addresses;
self
}

/// Restrict liveness sessions to certain address ranges (CIDR format).
pub fn with_listener_liveness_allowlist_cidrs(mut self, cidrs: Vec<cidr::AnyIpCidr>) -> Self {
self.connection_manager_config.liveness_cidr_allowlist = cidrs;
Expand Down
26 changes: 17 additions & 9 deletions comms/core/src/connection_manager/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ where
fn handle_request(&mut self, pending_dials: &mut DialFuturesUnordered, request: DialerRequest) {
use DialerRequest::{CancelPendingDial, Dial, NotifyNewInboundConnection};
debug!(target: LOG_TARGET, "Connection dialer got request: {:?}", request);

match request {
Dial(peer, reply_tx) => {
self.handle_dial_peer_request(pending_dials, peer, reply_tx);
Expand Down Expand Up @@ -515,7 +516,7 @@ where
tokio::select! {
_ = delay => {
debug!(target: LOG_TARGET, "[Attempt {}] Connecting to peer '{}'", current_state.num_attempts(), current_state.peer().node_id.short_str());
match Self::dial_peer(current_state, &noise_config, &current_transport, config.network_info.network_wire_byte).await {
match Self::dial_peer(current_state, &noise_config, &current_transport, config.network_info.network_wire_byte, config.excluded_dial_addresses.clone()).await {
(state, Ok((socket, addr))) => {
debug!(target: LOG_TARGET, "Dial succeeded for peer '{}' after {} attempt(s)", state.peer().node_id.short_str(), state.num_attempts());
break (state, Ok((socket, addr)));
Expand All @@ -524,6 +525,8 @@ where
(state, Err(ConnectionManagerError::NoiseHandshakeError(e))) => break (state, Err(ConnectionManagerError::NoiseHandshakeError(e))),
// Inflight dial was cancelled
(state, Err(ConnectionManagerError::DialCancelled)) => break (state, Err(ConnectionManagerError::DialCancelled)),
// All public addresses for this peer are excluded
(state, Err(ConnectionManagerError::AllPeerAddressesAreExcluded(e))) => break (state, Err(ConnectionManagerError::AllPeerAddressesAreExcluded(e))),
(state, Err(err)) => {
debug!(target: LOG_TARGET, "Failed to dial peer {} | Attempt {} | Error: {}", state.peer().node_id.short_str(), state.num_attempts(), err);
if state.num_attempts() >= config.max_dial_attempts {
Expand Down Expand Up @@ -554,6 +557,7 @@ where
noise_config: &NoiseConfig,
transport: &TTransport,
network_byte: u8,
excluded_dial_addresses: Vec<Multiaddr>,
) -> (
DialState,
Result<(NoiseSocket<TTransport::Output>, Multiaddr), ConnectionManagerError>,
Expand All @@ -564,10 +568,7 @@ where
.clone()
.into_vec()
.iter()
.filter(|&a| {
a == &"/memory/0".parse::<Multiaddr>().expect("will not fail") || // Used for tests, allowed
a != &ConnectionManagerConfig::default().listener_address // Not allowed to dial the default
})
.filter(|&a| !excluded_dial_addresses.iter().any(|excluded| a == excluded))
.cloned()
.collect::<Vec<_>>();
if addresses.is_empty() {
Expand All @@ -577,10 +578,17 @@ where
"Dial - No more contactable addresses for peer '{}'",
node_id_hex
);
return (
dial_state,
Err(ConnectionManagerError::NoContactableAddressesForPeer(node_id_hex)),
);
return if dial_state.peer().addresses.is_empty() {
(
dial_state,
Err(ConnectionManagerError::NoContactableAddressesForPeer(node_id_hex)),
)
} else {
(
dial_state,
Err(ConnectionManagerError::AllPeerAddressesAreExcluded(node_id_hex)),
)
};
}
let cancel_signal = dial_state.get_cancel_signal();
for address in addresses {
Expand Down
2 changes: 2 additions & 0 deletions comms/core/src/connection_manager/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ pub enum ConnectionManagerError {
PeerValidationError(#[from] PeerValidatorError),
#[error("No contactable addresses for peer {0} left")]
NoContactableAddressesForPeer(String),
#[error("All peer addresses are excluded for peer {0}")]
AllPeerAddressesAreExcluded(String),
#[error("Yamux error: {0}")]
YamuxControlError(#[from] YamuxControlError),
}
Expand Down
3 changes: 3 additions & 0 deletions comms/core/src/connection_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ pub struct ConnectionManagerConfig {
pub auxiliary_tcp_listener_address: Option<Multiaddr>,
/// Peer validation configuration. See [PeerValidatorConfig]
pub peer_validation_config: PeerValidatorConfig,
/// Addresses that should never be dialed
pub excluded_dial_addresses: Vec<Multiaddr>,
}

impl Default for ConnectionManagerConfig {
Expand All @@ -154,6 +156,7 @@ impl Default for ConnectionManagerConfig {
auxiliary_tcp_listener_address: None,
peer_validation_config: PeerValidatorConfig::default(),
noise_handshake_recv_timeout: Duration::from_secs(6),
excluded_dial_addresses: vec![],
}
}
}
Expand Down
132 changes: 132 additions & 0 deletions comms/core/src/connection_manager/tests/listener_dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,135 @@ async fn banned() {

timeout(Duration::from_secs(5), dialer_fut).await.unwrap().unwrap();
}

#[tokio::test]
async fn excluded_yes() {
let (event_tx, _event_rx) = mpsc::channel(10);
let mut shutdown = Shutdown::new();

let node_identity1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let noise_config1 = NoiseConfig::new(node_identity1.clone());
let expected_proto = ProtocolId::from_static(b"/tari/test-proto");
let supported_protocols = vec![expected_proto.clone()];
let peer_manager1 = build_peer_manager();
let mut listener = PeerListener::new(
Default::default(),
"/memory/0".parse().unwrap(),
MemoryTransport,
noise_config1.clone(),
event_tx.clone(),
peer_manager1.clone(),
node_identity1.clone(),
shutdown.to_signal(),
);
listener.set_supported_protocols(supported_protocols.clone());

// Get the listener address of the peer
let address = listener.listen().await.unwrap();

let node_identity2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let noise_config2 = NoiseConfig::new(node_identity2.clone());
let (request_tx, request_rx) = mpsc::channel(1);
let peer_manager2 = build_peer_manager();
let connection_manager_config = ConnectionManagerConfig {
excluded_dial_addresses: vec![address.clone()],
..Default::default()
};
let mut dialer = Dialer::new(
connection_manager_config,
node_identity2.clone(),
peer_manager2.clone(),
MemoryTransport,
noise_config2.clone(),
ConstantBackoff::new(Duration::from_millis(100)),
request_rx,
event_tx.clone(),
shutdown.to_signal(),
);
dialer.set_supported_protocols(supported_protocols.clone());

let dialer_fut = tokio::spawn(dialer.run());

let mut peer = node_identity1.to_peer();
peer.addresses = MultiaddressesWithStats::from_addresses_with_source(vec![address], &PeerAddressSource::Config);
peer.set_id_for_test(1);

let (reply_tx, reply_rx) = oneshot::channel();
request_tx
.send(DialerRequest::Dial(Box::new(peer), Some(reply_tx)))
.await
.unwrap();

// Check that the dial failed. We're checking that the dial attempt was never made.
let res = reply_rx.await.unwrap();
assert_eq!(format!("{:?}", res), format!("Err(AllPeerAddressesAreExcluded(\"{}\"))", node_identity1.node_id()));

shutdown.trigger();
timeout(Duration::from_secs(5), dialer_fut).await.unwrap().unwrap();
}

#[tokio::test]
async fn excluded_no() {
let (event_tx, _event_rx) = mpsc::channel(10);
let mut shutdown = Shutdown::new();

let node_identity1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let noise_config1 = NoiseConfig::new(node_identity1.clone());
let expected_proto = ProtocolId::from_static(b"/tari/test-proto");
let supported_protocols = vec![expected_proto.clone()];
let peer_manager1 = build_peer_manager();
let mut listener = PeerListener::new(
Default::default(),
"/memory/0".parse().unwrap(),
MemoryTransport,
noise_config1.clone(),
event_tx.clone(),
peer_manager1.clone(),
node_identity1.clone(),
shutdown.to_signal(),
);
listener.set_supported_protocols(supported_protocols.clone());

// Get the listener address of the peer
let address = listener.listen().await.unwrap();

let node_identity2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE);
let noise_config2 = NoiseConfig::new(node_identity2.clone());
let (request_tx, request_rx) = mpsc::channel(1);
let peer_manager2 = build_peer_manager();
let connection_manager_config = ConnectionManagerConfig {
excluded_dial_addresses: vec![],
..Default::default()
};
let mut dialer = Dialer::new(
connection_manager_config,
node_identity2.clone(),
peer_manager2.clone(),
MemoryTransport,
noise_config2.clone(),
ConstantBackoff::new(Duration::from_millis(100)),
request_rx,
event_tx.clone(),
shutdown.to_signal(),
);
dialer.set_supported_protocols(supported_protocols.clone());

let dialer_fut = tokio::spawn(dialer.run());

let mut peer = node_identity1.to_peer();
peer.addresses = MultiaddressesWithStats::from_addresses_with_source(vec![address], &PeerAddressSource::Config);
peer.set_id_for_test(1);

let (reply_tx, reply_rx) = oneshot::channel();
request_tx
.send(DialerRequest::Dial(Box::new(peer), Some(reply_tx)))
.await
.unwrap();

// Check that the dial failed. We're checking that the dial attempt was never made.
let res = reply_rx.await.unwrap();
assert!(res.is_ok());

shutdown.trigger();
timeout(Duration::from_secs(5), dialer_fut).await.unwrap().unwrap();
}
15 changes: 15 additions & 0 deletions comms/core/src/connectivity/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,21 @@ impl ConnectivityManagerActor {
let (node_id, mut new_status, connection) = match event {
PeerDisconnected(_, node_id, minimized) => (node_id, ConnectionStatus::Disconnected(*minimized), None),
PeerConnected(conn) => (conn.peer_node_id(), ConnectionStatus::Connected, Some(conn.clone())),
PeerConnectFailed(node_id, ConnectionManagerError::AllPeerAddressesAreExcluded(msg)) => {
warn!(
target: LOG_TARGET,
"Peer '{}' contains only excluded addresses ({})",
node_id,
msg
);
self.ban_peer(
node_id,
Duration::from_secs(3 * 24 * 60 * 60), // 3 days
"All peer addresses are excluded (User intervention)".to_string(),
)
.await?;
(node_id, ConnectionStatus::Failed, None)
},
PeerConnectFailed(node_id, ConnectionManagerError::NoiseHandshakeError(msg)) => {
if let Some(conn) = self.pool.get_connection(node_id) {
warn!(
Expand Down

0 comments on commit 583d331

Please sign in to comment.