diff --git a/network/framework/src/application/interface.rs b/network/framework/src/application/interface.rs index 912e34c49e98b..0d56995d7a6ab 100644 --- a/network/framework/src/application/interface.rs +++ b/network/framework/src/application/interface.rs @@ -4,6 +4,7 @@ use crate::{ application::{error::Error, storage::PeersAndMetadata}, + peer::DisconnectReason, protocols::{ network::{Message, NetworkEvents, NetworkSender}, wire::handshake::v1::{ProtocolId, ProtocolIdSet}, @@ -39,7 +40,11 @@ pub trait NetworkClientInterface: Clone + Send + S /// Requests that the network connection for the specified peer /// is disconnected. // TODO: support disconnect reasons. - async fn disconnect_from_peer(&self, _peer: PeerNetworkId) -> Result<(), Error>; + async fn disconnect_from_peer( + &self, + _peer: PeerNetworkId, + _disconnect_reason: DisconnectReason, + ) -> Result<(), Error>; /// Returns a list of available peers (i.e., those that are /// currently connected and support the relevant protocols @@ -196,9 +201,15 @@ impl NetworkClientInterface for NetworkCl unimplemented!("Adding peers to discovery is not yet supported!"); } - async fn disconnect_from_peer(&self, peer: PeerNetworkId) -> Result<(), Error> { + async fn disconnect_from_peer( + &self, + peer: PeerNetworkId, + disconnect_reason: DisconnectReason, + ) -> Result<(), Error> { let network_sender = self.get_sender_for_network_id(&peer.network_id())?; - Ok(network_sender.disconnect_peer(peer.peer_id()).await?) + Ok(network_sender + .disconnect_peer(peer.peer_id(), disconnect_reason) + .await?) } fn get_available_peers(&self) -> Result, Error> { diff --git a/network/framework/src/connectivity_manager/mod.rs b/network/framework/src/connectivity_manager/mod.rs index eda70d0d3cc5e..8f25766914023 100644 --- a/network/framework/src/connectivity_manager/mod.rs +++ b/network/framework/src/connectivity_manager/mod.rs @@ -31,6 +31,7 @@ use crate::{ application::storage::PeersAndMetadata, counters, logging::NetworkSchema, + peer::DisconnectReason, peer_manager::{self, conn_notifs_channel, ConnectionRequestSender, PeerManagerError}, transport::ConnectionMetadata, }; @@ -511,8 +512,10 @@ where stale_peer.short_str() ); - if let Err(disconnect_error) = - self.connection_reqs_tx.disconnect_peer(stale_peer).await + if let Err(disconnect_error) = self + .connection_reqs_tx + .disconnect_peer(stale_peer, DisconnectReason::StaleConnection) + .await { info!( NetworkSchema::new(&self.network_context) diff --git a/network/framework/src/connectivity_manager/test.rs b/network/framework/src/connectivity_manager/test.rs index 54af3c699e3ed..60fac1d986079 100644 --- a/network/framework/src/connectivity_manager/test.rs +++ b/network/framework/src/connectivity_manager/test.rs @@ -206,7 +206,7 @@ impl TestHarness { info!("Waiting to receive disconnect request"); let success = result.is_ok(); match self.connection_reqs_rx.next().await.unwrap() { - ConnectionRequest::DisconnectPeer(p, result_tx) => { + ConnectionRequest::DisconnectPeer(p, _, result_tx) => { assert_eq!(peer_id, p); result_tx.send(result).unwrap(); }, diff --git a/network/framework/src/counters.rs b/network/framework/src/counters.rs index d381264061383..13b5398beaac2 100644 --- a/network/framework/src/counters.rs +++ b/network/framework/src/counters.rs @@ -28,6 +28,11 @@ pub const SUCCEEDED_LABEL: &str = "succeeded"; pub const FAILED_LABEL: &str = "failed"; pub const UNKNOWN_LABEL: &str = "unknown"; +// Connection operation labels +pub const DIAL_LABEL: &str = "dial"; +pub const DIAL_PEER_LABEL: &str = "dial_peer"; +pub const DISCONNECT_LABEL: &str = "disconnect"; + // Direction labels pub const INBOUND_LABEL: &str = "inbound"; pub const OUTBOUND_LABEL: &str = "outbound"; @@ -139,6 +144,27 @@ pub fn pending_connection_upgrades( ]) } +/// A simple counter for tracking network connection operations +pub static APTOS_NETWORK_CONNECTION_OPERATIONS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "aptos_network_connection_operations", + "Counter for tracking connection operations", + &["network_id", "operation", "label"] + ) + .unwrap() +}); + +/// Updates the network connection operation metrics with the given operation and label +pub fn update_network_connection_operation_metrics( + network_context: &NetworkContext, + operation: String, + label: String, +) { + APTOS_NETWORK_CONNECTION_OPERATIONS + .with_label_values(&[network_context.network_id().as_str(), &operation, &label]) + .inc(); +} + pub static APTOS_NETWORK_CONNECTION_UPGRADE_TIME: Lazy = Lazy::new(|| { register_histogram_vec!( "aptos_network_connection_upgrade_time_seconds", diff --git a/network/framework/src/lib.rs b/network/framework/src/lib.rs index bd72aa5dd7c58..68b8b0a2ab045 100644 --- a/network/framework/src/lib.rs +++ b/network/framework/src/lib.rs @@ -28,6 +28,5 @@ pub mod fuzzing; #[cfg(any(test, feature = "testing", feature = "fuzzing"))] pub mod testutils; -pub type DisconnectReason = peer::DisconnectReason; pub type ConnectivityRequest = connectivity_manager::ConnectivityRequest; pub type ProtocolId = protocols::wire::handshake::v1::ProtocolId; diff --git a/network/framework/src/peer/mod.rs b/network/framework/src/peer/mod.rs index 651d5fed0eece..9d48b99eb2371 100644 --- a/network/framework/src/peer/mod.rs +++ b/network/framework/src/peer/mod.rs @@ -71,25 +71,33 @@ pub enum PeerRequest { SendDirectSend(Message), } -/// The reason for closing a connection. -/// -/// For example, if the remote peer closed the connection or the connection was -/// lost, the disconnect reason will be `ConnectionLost`. In contrast, if the -/// [`PeerManager`](crate::peer_manager::PeerManager) requested us to close this -/// connection, then the disconnect reason will be `Requested`. +/// The reason for closing a network connection #[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)] pub enum DisconnectReason { - Requested, - ConnectionLost, + ConnectionClosed, // The connection was gracefully closed (e.g., by the peer) + InputOutputError, // An I/O error occurred on the connection (e.g., when reading messages) + NetworkHealthCheckFailure, // The connection failed the network health check (e.g., pings) + RequestedByPeerManager, // The peer manager requested the connection to be closed + StaleConnection, // The connection is stale (e.g., when a validator leaves the validator set) +} + +impl DisconnectReason { + /// Returns a string label for the disconnect reason + pub fn get_label(&self) -> String { + let label = match self { + DisconnectReason::ConnectionClosed => "ConnectionClosed", + DisconnectReason::InputOutputError => "InputOutputError", + DisconnectReason::NetworkHealthCheckFailure => "NetworkHealthCheckFailure", + DisconnectReason::RequestedByPeerManager => "RequestedByPeerManager", + DisconnectReason::StaleConnection => "StaleConnection", + }; + label.to_string() + } } impl fmt::Display for DisconnectReason { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let s = match self { - DisconnectReason::Requested => "Requested", - DisconnectReason::ConnectionLost => "ConnectionLost", - }; - write!(f, "{}", s) + write!(f, "{}", self.get_label()) } } @@ -237,7 +245,7 @@ where Some(request) => self.handle_outbound_request(request, &mut write_reqs_tx), // The PeerManager is requesting this connection to close // by dropping the corresponding peer_reqs_tx handle. - None => self.shutdown(DisconnectReason::Requested), + None => self.shutdown(DisconnectReason::RequestedByPeerManager), } }, // Handle a new inbound MultiplexMessage that we've just read off @@ -258,7 +266,7 @@ where } }, // The socket was gracefully closed by the remote peer. - None => self.shutdown(DisconnectReason::ConnectionLost), + None => self.shutdown(DisconnectReason::ConnectionClosed), } }, // Drive the queue of pending inbound rpcs. When one is fulfilled @@ -580,7 +588,7 @@ where }, ReadError::IoError(_) => { // IoErrors are mostly unrecoverable so just close the connection. - self.shutdown(DisconnectReason::ConnectionLost); + self.shutdown(DisconnectReason::InputOutputError); return Err(err.into()); }, }, diff --git a/network/framework/src/peer/test.rs b/network/framework/src/peer/test.rs index cf70f40bbe618..0d89020bae6ff 100644 --- a/network/framework/src/peer/test.rs +++ b/network/framework/src/peer/test.rs @@ -368,13 +368,13 @@ fn peers_send_message_concurrent() { // Check that we received both shutdown events assert_disconnected_event( remote_peer_id_a, - DisconnectReason::Requested, + DisconnectReason::RequestedByPeerManager, &mut connection_notifs_rx_a, ) .await; assert_disconnected_event( remote_peer_id_b, - DisconnectReason::ConnectionLost, + DisconnectReason::ConnectionClosed, &mut connection_notifs_rx_b, ) .await; @@ -905,7 +905,7 @@ fn peer_disconnect_request() { drop(peer_handle); assert_disconnected_event( remote_peer_id, - DisconnectReason::Requested, + DisconnectReason::RequestedByPeerManager, &mut connection_notifs_rx, ) .await; @@ -932,7 +932,7 @@ fn peer_disconnect_connection_lost() { connection.close().await.unwrap(); assert_disconnected_event( remote_peer_id, - DisconnectReason::ConnectionLost, + DisconnectReason::ConnectionClosed, &mut connection_notifs_rx, ) .await; @@ -1019,13 +1019,13 @@ fn peers_send_multiplex() { // Check that we received both shutdown events assert_disconnected_event( remote_peer_id_a, - DisconnectReason::Requested, + DisconnectReason::RequestedByPeerManager, &mut connection_notifs_rx_a, ) .await; assert_disconnected_event( remote_peer_id_b, - DisconnectReason::ConnectionLost, + DisconnectReason::ConnectionClosed, &mut connection_notifs_rx_b, ) .await; diff --git a/network/framework/src/peer_manager/mod.rs b/network/framework/src/peer_manager/mod.rs index 4ee50778121fb..d1cb58956f2d2 100644 --- a/network/framework/src/peer_manager/mod.rs +++ b/network/framework/src/peer_manager/mod.rs @@ -454,11 +454,26 @@ where ); } } else { + // Update the connection dial metrics + counters::update_network_connection_operation_metrics( + &self.network_context, + counters::DIAL_LABEL.into(), + counters::DIAL_PEER_LABEL.into(), + ); + + // Send a transport request to dial the peer let request = TransportRequest::DialPeer(requested_peer_id, addr, response_tx); self.transport_reqs_tx.send(request).await.unwrap(); }; }, - ConnectionRequest::DisconnectPeer(peer_id, resp_tx) => { + ConnectionRequest::DisconnectPeer(peer_id, disconnect_reason, resp_tx) => { + // Update the connection disconnect metrics + counters::update_network_connection_operation_metrics( + &self.network_context, + counters::DISCONNECT_LABEL.into(), + disconnect_reason.get_label(), + ); + // Send a CloseConnection request to Peer and drop the send end of the // PeerRequest channel. if let Some((conn_metadata, sender)) = self.active_peers.remove(&peer_id) { diff --git a/network/framework/src/peer_manager/senders.rs b/network/framework/src/peer_manager/senders.rs index 29d5ecc6733c4..e2c8b3ee1ac5c 100644 --- a/network/framework/src/peer_manager/senders.rs +++ b/network/framework/src/peer_manager/senders.rs @@ -3,6 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ + peer::DisconnectReason, peer_manager::{types::PeerManagerRequest, ConnectionRequest, PeerManagerError}, protocols::{ direct_send::Message, @@ -125,10 +126,16 @@ impl ConnectionRequestSender { oneshot_rx.await? } - pub async fn disconnect_peer(&self, peer: PeerId) -> Result<(), PeerManagerError> { + pub async fn disconnect_peer( + &self, + peer: PeerId, + disconnect_reason: DisconnectReason, + ) -> Result<(), PeerManagerError> { let (oneshot_tx, oneshot_rx) = oneshot::channel(); - self.inner - .push(peer, ConnectionRequest::DisconnectPeer(peer, oneshot_tx))?; + self.inner.push( + peer, + ConnectionRequest::DisconnectPeer(peer, disconnect_reason, oneshot_tx), + )?; oneshot_rx.await? } } diff --git a/network/framework/src/peer_manager/tests.rs b/network/framework/src/peer_manager/tests.rs index 712ab9160aa9f..61f8ea5141191 100644 --- a/network/framework/src/peer_manager/tests.rs +++ b/network/framework/src/peer_manager/tests.rs @@ -191,7 +191,7 @@ async fn check_correct_connection_is_live( assert_peer_disconnected_event( expected_peer_id, dropped_connection_origin, - DisconnectReason::Requested, + DisconnectReason::RequestedByPeerManager, peer_manager, ) .await; @@ -218,7 +218,7 @@ async fn check_correct_connection_is_live( assert_peer_disconnected_event( expected_peer_id, live_connection_origin, - DisconnectReason::ConnectionLost, + DisconnectReason::ConnectionClosed, peer_manager, ) .await; @@ -580,7 +580,7 @@ fn peer_manager_simultaneous_dial_disconnect_event() { ProtocolIdSet::mock(), PeerRole::Unknown, ), - DisconnectReason::ConnectionLost, + DisconnectReason::ConnectionClosed, ); peer_manager.handle_connection_event(event); // The active connection should still remain. @@ -621,6 +621,7 @@ fn test_dial_disconnect() { peer_manager .handle_outbound_connection_request(ConnectionRequest::DisconnectPeer( ids[0], + DisconnectReason::ConnectionClosed, disconnect_resp_tx, )) .await; @@ -636,7 +637,7 @@ fn test_dial_disconnect() { ProtocolIdSet::mock(), PeerRole::Unknown, ), - DisconnectReason::Requested, + DisconnectReason::RequestedByPeerManager, ); peer_manager.handle_connection_event(event); diff --git a/network/framework/src/peer_manager/types.rs b/network/framework/src/peer_manager/types.rs index 006465c7e9073..070f7b21dd96d 100644 --- a/network/framework/src/peer_manager/types.rs +++ b/network/framework/src/peer_manager/types.rs @@ -31,6 +31,7 @@ pub enum ConnectionRequest { ), DisconnectPeer( PeerId, + DisconnectReason, #[serde(skip)] oneshot::Sender>, ), } diff --git a/network/framework/src/protocols/health_checker/interface.rs b/network/framework/src/protocols/health_checker/interface.rs index 0d0f007c630ae..c80de32f25139 100644 --- a/network/framework/src/protocols/health_checker/interface.rs +++ b/network/framework/src/protocols/health_checker/interface.rs @@ -7,6 +7,7 @@ use crate::{ error::Error, interface::NetworkClientInterface, metadata::ConnectionState, storage::PeersAndMetadata, }, + peer::DisconnectReason, protocols::{ health_checker::{HealthCheckerMsg, HealthCheckerNetworkEvents}, network::Event, @@ -62,12 +63,16 @@ impl> /// Disconnect a peer, and keep track of the associated state /// Note: This removes the peer outright for now until we add GCing, and historical state management - pub async fn disconnect_peer(&mut self, peer_network_id: PeerNetworkId) -> Result<(), Error> { + pub async fn disconnect_peer( + &mut self, + peer_network_id: PeerNetworkId, + disconnect_reason: DisconnectReason, + ) -> Result<(), Error> { // Possibly already disconnected, but try anyways let _ = self.update_connection_state(peer_network_id, ConnectionState::Disconnecting); let result = self .network_client - .disconnect_from_peer(peer_network_id) + .disconnect_from_peer(peer_network_id, disconnect_reason) .await; let peer_id = peer_network_id.peer_id(); if result.is_ok() { diff --git a/network/framework/src/protocols/health_checker/mod.rs b/network/framework/src/protocols/health_checker/mod.rs index c59bc8a4a3dde..0287bb32ebfb6 100644 --- a/network/framework/src/protocols/health_checker/mod.rs +++ b/network/framework/src/protocols/health_checker/mod.rs @@ -23,6 +23,7 @@ use crate::{ constants::NETWORK_CHANNEL_SIZE, counters, logging::NetworkSchema, + peer::DisconnectReason, peer_manager::ConnectionNotification, protocols::{ health_checker::interface::HealthCheckNetworkInterface, @@ -372,7 +373,10 @@ impl + Unpin> HealthChec PeerNetworkId::new(self.network_context.network_id(), peer_id); if let Err(err) = timeout( Duration::from_millis(50), - self.network_interface.disconnect_peer(peer_network_id), + self.network_interface.disconnect_peer( + peer_network_id, + DisconnectReason::NetworkHealthCheckFailure, + ), ) .await { diff --git a/network/framework/src/protocols/health_checker/test.rs b/network/framework/src/protocols/health_checker/test.rs index 0027655e57a9c..ed029c4dcc915 100644 --- a/network/framework/src/protocols/health_checker/test.rs +++ b/network/framework/src/protocols/health_checker/test.rs @@ -163,7 +163,7 @@ impl TestHarness { async fn expect_disconnect(&mut self, expected_peer_id: PeerId) { let req = self.connection_reqs_rx.next().await.unwrap(); let (peer_id, res_tx) = match req { - ConnectionRequest::DisconnectPeer(peer_id, res_tx) => (peer_id, res_tx), + ConnectionRequest::DisconnectPeer(peer_id, _, res_tx) => (peer_id, res_tx), _ => panic!("Unexpected ConnectionRequest: {:?}", req), }; assert_eq!(peer_id, expected_peer_id); diff --git a/network/framework/src/protocols/network/mod.rs b/network/framework/src/protocols/network/mod.rs index 786b9ff033800..8a593f19ab98c 100644 --- a/network/framework/src/protocols/network/mod.rs +++ b/network/framework/src/protocols/network/mod.rs @@ -7,6 +7,7 @@ pub use crate::protocols::rpc::error::RpcError; use crate::{ error::NetworkError, + peer::DisconnectReason, peer_manager::{ConnectionRequestSender, PeerManagerRequestSender}, protocols::wire::messaging::v1::{IncomingRequest, NetworkMessage}, ProtocolId, @@ -377,8 +378,14 @@ impl NetworkSender { /// Request that a given Peer be disconnected and synchronously wait for the request to be /// performed. - pub async fn disconnect_peer(&self, peer: PeerId) -> Result<(), NetworkError> { - self.connection_reqs_tx.disconnect_peer(peer).await?; + pub async fn disconnect_peer( + &self, + peer: PeerId, + disconnect_reason: DisconnectReason, + ) -> Result<(), NetworkError> { + self.connection_reqs_tx + .disconnect_peer(peer, disconnect_reason) + .await?; Ok(()) } }