diff --git a/base_layer/contacts/tests/contacts_service.rs b/base_layer/contacts/tests/contacts_service.rs index 73acc051ae..8c8d86ad49 100644 --- a/base_layer/contacts/tests/contacts_service.rs +++ b/base_layer/contacts/tests/contacts_service.rs @@ -26,7 +26,7 @@ use rand::rngs::OsRng; use tari_common::configuration::{MultiaddrList, Network, StringList}; use tari_common_sqlite::connection::{DbConnection, DbConnectionUrl}; use tari_common_types::{tari_address::TariAddress, types::PublicKey}; -use tari_comms::{peer_manager::PeerFeatures, NodeIdentity}; +use tari_comms::{multiaddr::Multiaddr, peer_manager::PeerFeatures, NodeIdentity}; use tari_comms_dht::{store_forward::SafConfig, DhtConfig}; use tari_contacts::contacts_service::{ error::{ContactsServiceError, ContactsServiceStorageError}, @@ -96,6 +96,10 @@ pub fn setup_contacts_service( rpc_max_simultaneous_sessions: 0, rpc_max_sessions_per_peer: 0, listener_self_liveness_check_interval: None, + excluded_dial_addresses: vec![ + "/ip4/172.2.3.4/tcp/18188".parse::().expect("will not fail"), + "/ip4/172.2.3.4/tcp/18189".parse::().expect("will not fail"), + ], }; let peer_message_subscription_factory = Arc::new(subscription_factory); let shutdown = Shutdown::new(); diff --git a/base_layer/p2p/src/config.rs b/base_layer/p2p/src/config.rs index 601a71ec5f..db6c3e8b91 100644 --- a/base_layer/p2p/src/config.rs +++ b/base_layer/p2p/src/config.rs @@ -125,6 +125,8 @@ pub struct P2pConfig { /// The maximum allowed RPC sessions per peer. /// Default: 10 pub rpc_max_sessions_per_peer: usize, + /// Addresses that should never be dialed + pub excluded_dial_addresses: Vec, } impl Default for P2pConfig { @@ -149,6 +151,10 @@ impl Default for P2pConfig { auxiliary_tcp_listener_address: None, rpc_max_simultaneous_sessions: 100, rpc_max_sessions_per_peer: 10, + excluded_dial_addresses: vec![ + "/ip4/172.2.3.4/tcp/18188".parse::().expect("will not fail"), + "/ip4/172.2.3.4/tcp/18189".parse::().expect("will not fail"), + ], } } } diff --git a/base_layer/p2p/src/initialization.rs b/base_layer/p2p/src/initialization.rs index 843f629d2d..85f0773329 100644 --- a/base_layer/p2p/src/initialization.rs +++ b/base_layer/p2p/src/initialization.rs @@ -331,7 +331,8 @@ async fn configure_comms_and_dht( .with_listener_liveness_max_sessions(config.listener_liveness_max_sessions) .with_listener_liveness_allowlist_cidrs(listener_liveness_allowlist_cidrs) .with_dial_backoff(ConstantBackoff::new(Duration::from_millis(500))) - .with_peer_storage(peer_database, Some(file_lock)); + .with_peer_storage(peer_database, Some(file_lock)) + .with_excluded_dial_addresses(config.excluded_dial_addresses.clone()); let mut comms = match config.auxiliary_tcp_listener_address { Some(ref addr) => builder.with_auxiliary_tcp_listener_address(addr.clone()).build()?, diff --git a/base_layer/wallet_ffi/src/lib.rs b/base_layer/wallet_ffi/src/lib.rs index 421d47486c..9962fae64d 100644 --- a/base_layer/wallet_ffi/src/lib.rs +++ b/base_layer/wallet_ffi/src/lib.rs @@ -5335,6 +5335,10 @@ pub unsafe extern "C" fn comms_config_create( rpc_max_simultaneous_sessions: 0, rpc_max_sessions_per_peer: 0, listener_self_liveness_check_interval: None, + excluded_dial_addresses: vec![ + "/ip4/172.2.3.4/tcp/18188".parse::().expect("will not fail"), + "/ip4/172.2.3.4/tcp/18189".parse::().expect("will not fail"), + ], }; Box::into_raw(Box::new(config)) diff --git a/common/config/presets/c_base_node_c.toml b/common/config/presets/c_base_node_c.toml index 536ab3a6b7..76ed7ed02e 100644 --- a/common/config/presets/c_base_node_c.toml +++ b/common/config/presets/c_base_node_c.toml @@ -160,6 +160,8 @@ listener_self_liveness_check_interval = 15 #rpc_max_simultaneous_sessions = 100 # The maximum comms RPC sessions allowed per peer (default value = 10). #rpc_max_sessions_per_peer = 10 +# Addresses that should never be dialed (default value = ["/ip4/172.2.3.4/tcp/18188", "/ip4/172.2.3.4/tcp/18189"]) +#excluded_dial_addresses = ["/ip4/172.2.3.4/tcp/18188", "/ip4/172.2.3.4/tcp/18189"] [base_node.p2p.transport] # -------------- Transport configuration -------------- diff --git a/common/config/presets/d_console_wallet.toml b/common/config/presets/d_console_wallet.toml index a8d3225ba0..bb4a0e63b0 100644 --- a/common/config/presets/d_console_wallet.toml +++ b/common/config/presets/d_console_wallet.toml @@ -168,7 +168,7 @@ event_channel_size = 3500 # peers can find you. # _NOTE_: If using the `tor` transport type, public_address will be ignored and an onion address will be # automatically configured -#public_addresses = ["/ip4/172.2.3.4/tcp/18189",] +#public_addresses = ["/ip4/172.2.3.4/tcp/18188",] # Optionally bind an additional TCP socket for inbound Tari P2P protocol commms. # Use cases include: @@ -208,6 +208,8 @@ event_channel_size = 3500 #rpc_max_simultaneous_sessions = 100 # The maximum comms RPC sessions allowed per peer (default value = 10). #rpc_max_sessions_per_peer = 10 +# Addresses that should never be dialed (default value = ["/ip4/172.2.3.4/tcp/18188", "/ip4/172.2.3.4/tcp/18189"]) +#excluded_dial_addresses = ["/ip4/172.2.3.4/tcp/18188", "/ip4/172.2.3.4/tcp/18189"] [wallet.p2p.transport] # -------------- Transport configuration -------------- diff --git a/comms/core/src/builder/mod.rs b/comms/core/src/builder/mod.rs index 5cae88e774..727d13cf6f 100644 --- a/comms/core/src/builder/mod.rs +++ b/comms/core/src/builder/mod.rs @@ -242,6 +242,11 @@ impl CommsBuilder { self } + pub fn with_excluded_dial_addresses(mut self, excluded_addresses: Vec) -> Self { + self.connection_manager_config.excluded_dial_addresses = excluded_addresses; + self + } + /// Restrict liveness sessions to certain address ranges (CIDR format). pub fn with_listener_liveness_allowlist_cidrs(mut self, cidrs: Vec) -> Self { self.connection_manager_config.liveness_cidr_allowlist = cidrs; diff --git a/comms/core/src/connection_manager/dialer.rs b/comms/core/src/connection_manager/dialer.rs index 32d399bb7c..245d3e4308 100644 --- a/comms/core/src/connection_manager/dialer.rs +++ b/comms/core/src/connection_manager/dialer.rs @@ -174,6 +174,7 @@ where fn handle_request(&mut self, pending_dials: &mut DialFuturesUnordered, request: DialerRequest) { use DialerRequest::{CancelPendingDial, Dial, NotifyNewInboundConnection}; debug!(target: LOG_TARGET, "Connection dialer got request: {:?}", request); + match request { Dial(peer, reply_tx) => { self.handle_dial_peer_request(pending_dials, peer, reply_tx); @@ -515,7 +516,7 @@ where tokio::select! { _ = delay => { debug!(target: LOG_TARGET, "[Attempt {}] Connecting to peer '{}'", current_state.num_attempts(), current_state.peer().node_id.short_str()); - match Self::dial_peer(current_state, &noise_config, ¤t_transport, config.network_info.network_wire_byte).await { + match Self::dial_peer(current_state, &noise_config, ¤t_transport, config.network_info.network_wire_byte, config.excluded_dial_addresses.clone()).await { (state, Ok((socket, addr))) => { debug!(target: LOG_TARGET, "Dial succeeded for peer '{}' after {} attempt(s)", state.peer().node_id.short_str(), state.num_attempts()); break (state, Ok((socket, addr))); @@ -524,6 +525,8 @@ where (state, Err(ConnectionManagerError::NoiseHandshakeError(e))) => break (state, Err(ConnectionManagerError::NoiseHandshakeError(e))), // Inflight dial was cancelled (state, Err(ConnectionManagerError::DialCancelled)) => break (state, Err(ConnectionManagerError::DialCancelled)), + // All public addresses for this peer are excluded + (state, Err(ConnectionManagerError::AllPeerAddressesAreExcluded(e))) => break (state, Err(ConnectionManagerError::AllPeerAddressesAreExcluded(e))), (state, Err(err)) => { debug!(target: LOG_TARGET, "Failed to dial peer {} | Attempt {} | Error: {}", state.peer().node_id.short_str(), state.num_attempts(), err); if state.num_attempts() >= config.max_dial_attempts { @@ -554,6 +557,7 @@ where noise_config: &NoiseConfig, transport: &TTransport, network_byte: u8, + excluded_dial_addresses: Vec, ) -> ( DialState, Result<(NoiseSocket, Multiaddr), ConnectionManagerError>, @@ -564,10 +568,7 @@ where .clone() .into_vec() .iter() - .filter(|&a| { - a == &"/memory/0".parse::().expect("will not fail") || // Used for tests, allowed - a != &ConnectionManagerConfig::default().listener_address // Not allowed to dial the default - }) + .filter(|&a| !excluded_dial_addresses.iter().any(|excluded| a == excluded)) .cloned() .collect::>(); if addresses.is_empty() { @@ -577,10 +578,17 @@ where "Dial - No more contactable addresses for peer '{}'", node_id_hex ); - return ( - dial_state, - Err(ConnectionManagerError::NoContactableAddressesForPeer(node_id_hex)), - ); + return if dial_state.peer().addresses.is_empty() { + ( + dial_state, + Err(ConnectionManagerError::NoContactableAddressesForPeer(node_id_hex)), + ) + } else { + ( + dial_state, + Err(ConnectionManagerError::AllPeerAddressesAreExcluded(node_id_hex)), + ) + }; } let cancel_signal = dial_state.get_cancel_signal(); for address in addresses { diff --git a/comms/core/src/connection_manager/error.rs b/comms/core/src/connection_manager/error.rs index 7d5b174329..d229aefadd 100644 --- a/comms/core/src/connection_manager/error.rs +++ b/comms/core/src/connection_manager/error.rs @@ -90,6 +90,8 @@ pub enum ConnectionManagerError { PeerValidationError(#[from] PeerValidatorError), #[error("No contactable addresses for peer {0} left")] NoContactableAddressesForPeer(String), + #[error("All peer addresses are excluded for peer {0}")] + AllPeerAddressesAreExcluded(String), #[error("Yamux error: {0}")] YamuxControlError(#[from] YamuxControlError), } diff --git a/comms/core/src/connection_manager/manager.rs b/comms/core/src/connection_manager/manager.rs index ff88f17501..67c28679cd 100644 --- a/comms/core/src/connection_manager/manager.rs +++ b/comms/core/src/connection_manager/manager.rs @@ -133,6 +133,8 @@ pub struct ConnectionManagerConfig { pub auxiliary_tcp_listener_address: Option, /// Peer validation configuration. See [PeerValidatorConfig] pub peer_validation_config: PeerValidatorConfig, + /// Addresses that should never be dialed + pub excluded_dial_addresses: Vec, } impl Default for ConnectionManagerConfig { @@ -154,6 +156,7 @@ impl Default for ConnectionManagerConfig { auxiliary_tcp_listener_address: None, peer_validation_config: PeerValidatorConfig::default(), noise_handshake_recv_timeout: Duration::from_secs(6), + excluded_dial_addresses: vec![], } } } diff --git a/comms/core/src/connection_manager/tests/listener_dialer.rs b/comms/core/src/connection_manager/tests/listener_dialer.rs index 6db482ac33..6afcc885be 100644 --- a/comms/core/src/connection_manager/tests/listener_dialer.rs +++ b/comms/core/src/connection_manager/tests/listener_dialer.rs @@ -256,3 +256,135 @@ async fn banned() { timeout(Duration::from_secs(5), dialer_fut).await.unwrap().unwrap(); } + +#[tokio::test] +async fn excluded_yes() { + let (event_tx, _event_rx) = mpsc::channel(10); + let mut shutdown = Shutdown::new(); + + let node_identity1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); + let noise_config1 = NoiseConfig::new(node_identity1.clone()); + let expected_proto = ProtocolId::from_static(b"/tari/test-proto"); + let supported_protocols = vec![expected_proto.clone()]; + let peer_manager1 = build_peer_manager(); + let mut listener = PeerListener::new( + Default::default(), + "/memory/0".parse().unwrap(), + MemoryTransport, + noise_config1.clone(), + event_tx.clone(), + peer_manager1.clone(), + node_identity1.clone(), + shutdown.to_signal(), + ); + listener.set_supported_protocols(supported_protocols.clone()); + + // Get the listener address of the peer + let address = listener.listen().await.unwrap(); + + let node_identity2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); + let noise_config2 = NoiseConfig::new(node_identity2.clone()); + let (request_tx, request_rx) = mpsc::channel(1); + let peer_manager2 = build_peer_manager(); + let connection_manager_config = ConnectionManagerConfig { + excluded_dial_addresses: vec![address.clone()], + ..Default::default() + }; + let mut dialer = Dialer::new( + connection_manager_config, + node_identity2.clone(), + peer_manager2.clone(), + MemoryTransport, + noise_config2.clone(), + ConstantBackoff::new(Duration::from_millis(100)), + request_rx, + event_tx.clone(), + shutdown.to_signal(), + ); + dialer.set_supported_protocols(supported_protocols.clone()); + + let dialer_fut = tokio::spawn(dialer.run()); + + let mut peer = node_identity1.to_peer(); + peer.addresses = MultiaddressesWithStats::from_addresses_with_source(vec![address], &PeerAddressSource::Config); + peer.set_id_for_test(1); + + let (reply_tx, reply_rx) = oneshot::channel(); + request_tx + .send(DialerRequest::Dial(Box::new(peer), Some(reply_tx))) + .await + .unwrap(); + + // Check that the dial failed. We're checking that the dial attempt was never made. + let res = reply_rx.await.unwrap(); + assert_eq!(format!("{:?}", res), "Err(ConnectFailedMaximumAttemptsReached)"); + + shutdown.trigger(); + timeout(Duration::from_secs(5), dialer_fut).await.unwrap().unwrap(); +} + +#[tokio::test] +async fn excluded_no() { + let (event_tx, _event_rx) = mpsc::channel(10); + let mut shutdown = Shutdown::new(); + + let node_identity1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); + let noise_config1 = NoiseConfig::new(node_identity1.clone()); + let expected_proto = ProtocolId::from_static(b"/tari/test-proto"); + let supported_protocols = vec![expected_proto.clone()]; + let peer_manager1 = build_peer_manager(); + let mut listener = PeerListener::new( + Default::default(), + "/memory/0".parse().unwrap(), + MemoryTransport, + noise_config1.clone(), + event_tx.clone(), + peer_manager1.clone(), + node_identity1.clone(), + shutdown.to_signal(), + ); + listener.set_supported_protocols(supported_protocols.clone()); + + // Get the listener address of the peer + let address = listener.listen().await.unwrap(); + + let node_identity2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); + let noise_config2 = NoiseConfig::new(node_identity2.clone()); + let (request_tx, request_rx) = mpsc::channel(1); + let peer_manager2 = build_peer_manager(); + let connection_manager_config = ConnectionManagerConfig { + excluded_dial_addresses: vec![], + ..Default::default() + }; + let mut dialer = Dialer::new( + connection_manager_config, + node_identity2.clone(), + peer_manager2.clone(), + MemoryTransport, + noise_config2.clone(), + ConstantBackoff::new(Duration::from_millis(100)), + request_rx, + event_tx.clone(), + shutdown.to_signal(), + ); + dialer.set_supported_protocols(supported_protocols.clone()); + + let dialer_fut = tokio::spawn(dialer.run()); + + let mut peer = node_identity1.to_peer(); + peer.addresses = MultiaddressesWithStats::from_addresses_with_source(vec![address], &PeerAddressSource::Config); + peer.set_id_for_test(1); + + let (reply_tx, reply_rx) = oneshot::channel(); + request_tx + .send(DialerRequest::Dial(Box::new(peer), Some(reply_tx))) + .await + .unwrap(); + + // Check that the dial failed. We're checking that the dial attempt was never made. + let res = reply_rx.await.unwrap(); + assert!(res.is_ok()); + + shutdown.trigger(); + timeout(Duration::from_secs(5), dialer_fut).await.unwrap().unwrap(); +} diff --git a/comms/core/src/connectivity/manager.rs b/comms/core/src/connectivity/manager.rs index 8a59fb5ae8..972a6b5895 100644 --- a/comms/core/src/connectivity/manager.rs +++ b/comms/core/src/connectivity/manager.rs @@ -659,6 +659,21 @@ impl ConnectivityManagerActor { let (node_id, mut new_status, connection) = match event { PeerDisconnected(_, node_id, minimized) => (node_id, ConnectionStatus::Disconnected(*minimized), None), PeerConnected(conn) => (conn.peer_node_id(), ConnectionStatus::Connected, Some(conn.clone())), + PeerConnectFailed(node_id, ConnectionManagerError::AllPeerAddressesAreExcluded(msg)) => { + warn!( + target: LOG_TARGET, + "Peer '{}' contains only excluded addresses ({})", + node_id, + msg + ); + self.ban_peer( + node_id, + Duration::from_secs(3 * 24 * 60 * 60), // 3 days + "All peer addresses are excluded (User intervention)".to_string(), + ) + .await?; + (node_id, ConnectionStatus::Failed, None) + }, PeerConnectFailed(node_id, ConnectionManagerError::NoiseHandshakeError(msg)) => { if let Some(conn) = self.pool.get_connection(node_id) { warn!(