From af79a8f3570f1d32e834f9545137855a2d6d7ea0 Mon Sep 17 00:00:00 2001 From: Zekun Li Date: Mon, 24 Jun 2024 12:32:08 -0700 Subject: [PATCH] [network] make peer actor loop non-blocking This commit tries to make the peer actor loop non-blocking in case of underlying connection is blocked. 1. use aptos channel for the main loop to avoid channel backpressure. 2. add timeout for writes and move close_rx to writer task. 3. drop sender to shut down the multiplex task. --- network/framework/src/counters.rs | 7 +- network/framework/src/peer/mod.rs | 156 ++++++++++++--------- network/framework/src/protocols/rpc/mod.rs | 13 +- 3 files changed, 96 insertions(+), 80 deletions(-) diff --git a/network/framework/src/counters.rs b/network/framework/src/counters.rs index e5b5b68c8aacd..068c696329a01 100644 --- a/network/framework/src/counters.rs +++ b/network/framework/src/counters.rs @@ -415,10 +415,11 @@ pub static PENDING_PEER_MANAGER_DIAL_REQUESTS: Lazy = Lazy::new(|| { }); /// Counter of messages pending in queue to be sent out on the wire. -pub static PENDING_WIRE_MESSAGES: Lazy = Lazy::new(|| { - register_int_gauge!( +pub static PENDING_WIRE_MESSAGES: Lazy = Lazy::new(|| { + register_int_counter_vec!( "aptos_network_pending_wire_messages", - "Number of pending wire messages" + "Number of pending wire messages", + &["state"], ) .unwrap() }); diff --git a/network/framework/src/peer/mod.rs b/network/framework/src/peer/mod.rs index 3033365382ba4..a4d0a517cfbf6 100644 --- a/network/framework/src/peer/mod.rs +++ b/network/framework/src/peer/mod.rs @@ -34,7 +34,7 @@ use crate::{ transport::{self, Connection, ConnectionMetadata}, ProtocolId, }; -use aptos_channels::aptos_channel; +use aptos_channels::{aptos_channel, message_queues::QueueStyle}; use aptos_config::network_id::NetworkContext; use aptos_logger::prelude::*; use aptos_short_hex_str::AsShortHexStr; @@ -51,7 +51,7 @@ use futures::{ use futures_util::stream::select; use serde::Serialize; use std::{fmt, panic, time::Duration}; -use tokio::runtime::Handle; +use tokio::{runtime::Handle, time::timeout}; use tokio_util::compat::{ FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt, }; @@ -240,7 +240,7 @@ where // Handle a new outbound request from the PeerManager. maybe_request = self.peer_reqs_rx.next() => { match maybe_request { - Some(request) => self.handle_outbound_request(request, &mut write_reqs_tx).await, + Some(request) => self.handle_outbound_request(request, &mut write_reqs_tx), // The PeerManager is requesting this connection to close // by dropping the corresponding peer_reqs_tx handle. None => self.shutdown(DisconnectReason::Requested), @@ -251,7 +251,7 @@ where maybe_message = reader.next() => { match maybe_message { Some(message) => { - if let Err(err) = self.handle_inbound_message(message, &mut write_reqs_tx).await { + if let Err(err) = self.handle_inbound_message(message, &mut write_reqs_tx) { warn!( NetworkSchema::new(&self.network_context) .connection_metadata(&self.connection_metadata), @@ -277,7 +277,7 @@ where }; // Send the response to the remote peer - if let Err(error) = self.inbound_rpcs.send_outbound_response(&mut write_reqs_tx, maybe_response).await { + if let Err(error) = self.inbound_rpcs.send_outbound_response(&mut write_reqs_tx, maybe_response) { // It's quite common for applications to drop an RPC request. // If this happens, we want to avoid logging a warning/error // (as it makes the logs noisy). Otherwise, we log normally. @@ -312,7 +312,8 @@ where // Finish shutting down the connection. Close the writer task and notify // PeerManager that this connection has shutdown. - self.do_shutdown(writer_close_tx, reason).await; + self.do_shutdown(write_reqs_tx, writer_close_tx, reason) + .await; } // Start a new task on the given executor which is responsible for writing outbound messages on @@ -330,30 +331,44 @@ where mut writer: MultiplexMessageSink, max_frame_size: usize, max_message_size: usize, - ) -> (aptos_channels::Sender, oneshot::Sender<()>) { + ) -> ( + aptos_channel::Sender<(), NetworkMessage>, + oneshot::Sender<()>, + ) { let remote_peer_id = connection_metadata.remote_peer_id; - let (write_reqs_tx, mut write_reqs_rx): (aptos_channels::Sender, _) = - aptos_channels::new(1024, &counters::PENDING_WIRE_MESSAGES); + let (write_reqs_tx, mut write_reqs_rx): (aptos_channel::Sender<(), NetworkMessage>, _) = + aptos_channel::new( + QueueStyle::KLAST, + 1024, + Some(&counters::PENDING_WIRE_MESSAGES), + ); let (close_tx, mut close_rx) = oneshot::channel(); let (mut msg_tx, msg_rx) = aptos_channels::new(1024, &counters::PENDING_MULTIPLEX_MESSAGE); let (stream_msg_tx, stream_msg_rx) = aptos_channels::new(1024, &counters::PENDING_MULTIPLEX_STREAM); - // this task ends when the multiplex task ends (by dropping the senders) + // this task ends when the multiplex task ends (by dropping the senders) or receiving a close instruction let writer_task = async move { let mut stream = select(msg_rx, stream_msg_rx); let log_context = NetworkSchema::new(&network_context).connection_metadata(&connection_metadata); - while let Some(message) = stream.next().await { - if let Err(err) = writer.send(&message).await { - warn!( - log_context, - error = %err, - "{} Error in sending message to peer: {}", - network_context, - remote_peer_id.short_str(), - ); + loop { + futures::select! { + message = stream.select_next_some() => { + if let Err(err) = timeout(transport::TRANSPORT_TIMEOUT,writer.send(&message)).await { + warn!( + log_context, + error = %err, + "{} Error in sending message to peer: {}", + network_context, + remote_peer_id.short_str(), + ); + } + } + _ = close_rx => { + break; + } } } info!( @@ -399,30 +414,27 @@ where }, } }; + // the task ends when the write_reqs_tx is dropped let multiplex_task = async move { let mut outbound_stream = OutboundStream::new(max_frame_size, max_message_size, stream_msg_tx); - loop { - futures::select! { - message = write_reqs_rx.select_next_some() => { - // either channel full would block the other one - let result = if outbound_stream.should_stream(&message) { - outbound_stream.stream_message(message).await - } else { - msg_tx.send(MultiplexMessage::Message(message)).await.map_err(|_| anyhow::anyhow!("Writer task ended")) - }; - if let Err(err) = result { - warn!( - error = %err, - "{} Error in sending message to peer: {}", - network_context, - remote_peer_id.short_str(), - ); - } - }, - _ = close_rx => { - break; - } + while let Some(message) = write_reqs_rx.next().await { + // either channel full would block the other one + let result = if outbound_stream.should_stream(&message) { + outbound_stream.stream_message(message).await + } else { + msg_tx + .send(MultiplexMessage::Message(message)) + .await + .map_err(|_| anyhow::anyhow!("Writer task ended")) + }; + if let Err(err) = result { + warn!( + error = %err, + "{} Error in sending message to peer: {}", + network_context, + remote_peer_id.short_str(), + ); } } }; @@ -431,7 +443,7 @@ where (write_reqs_tx, close_tx) } - async fn handle_inbound_network_message( + fn handle_inbound_network_message( &mut self, message: NetworkMessage, ) -> Result<(), PeerManagerError> { @@ -470,7 +482,7 @@ where Ok(()) } - async fn handle_inbound_stream_message( + fn handle_inbound_stream_message( &mut self, message: StreamMessage, ) -> Result<(), PeerManagerError> { @@ -480,17 +492,17 @@ where }, StreamMessage::Fragment(fragment) => { if let Some(message) = self.inbound_stream.append_fragment(fragment)? { - self.handle_inbound_network_message(message).await?; + self.handle_inbound_network_message(message)?; } }, } Ok(()) } - async fn handle_inbound_message( + fn handle_inbound_message( &mut self, message: Result, - write_reqs_tx: &mut aptos_channels::Sender, + write_reqs_tx: &mut aptos_channel::Sender<(), NetworkMessage>, ) -> Result<(), PeerManagerError> { trace!( NetworkSchema::new(&self.network_context) @@ -512,7 +524,7 @@ where let error_code = ErrorCode::parsing_error(*message_type, *protocol_id); let message = NetworkMessage::Error(error_code); - write_reqs_tx.send(message).await?; + write_reqs_tx.push((), message)?; return Err(err.into()); }, ReadError::IoError(_) => { @@ -524,10 +536,8 @@ where }; match message { - MultiplexMessage::Message(message) => { - self.handle_inbound_network_message(message).await - }, - MultiplexMessage::Stream(message) => self.handle_inbound_stream_message(message).await, + MultiplexMessage::Message(message) => self.handle_inbound_network_message(message), + MultiplexMessage::Stream(message) => self.handle_inbound_stream_message(message), } } @@ -575,10 +585,10 @@ where network_application_inbound_traffic(self.network_context, protocol_id, data_len); } - async fn handle_outbound_request( + fn handle_outbound_request( &mut self, request: PeerRequest, - write_reqs_tx: &mut aptos_channels::Sender, + write_reqs_tx: &mut aptos_channel::Sender<(), NetworkMessage>, ) { trace!( "Peer {} PeerRequest::{:?}", @@ -598,7 +608,7 @@ where raw_msg: Vec::from(message.mdata.as_ref()), }); - match write_reqs_tx.send(message).await { + match write_reqs_tx.push((), message) { Ok(_) => { self.update_outbound_direct_send_metrics(protocol_id, message_len as u64); }, @@ -621,7 +631,6 @@ where if let Err(e) = self .outbound_rpcs .handle_outbound_request(request, write_reqs_tx) - .await { warn!( NetworkSchema::new(&self.network_context) @@ -653,9 +662,30 @@ where self.state = State::ShuttingDown(reason); } - async fn do_shutdown(mut self, writer_close_tx: oneshot::Sender<()>, reason: DisconnectReason) { - let remote_peer_id = self.remote_peer_id(); + async fn do_shutdown( + mut self, + write_req_tx: aptos_channel::Sender<(), NetworkMessage>, + writer_close_tx: oneshot::Sender<()>, + reason: DisconnectReason, + ) { + // Drop the sender to shut down multiplex task. + drop(write_req_tx); + + // Send a close instruction to the writer task. On receipt of this + // instruction, the writer task drops all pending outbound messages and + // closes the connection. + if let Err(e) = writer_close_tx.send(()) { + info!( + NetworkSchema::new(&self.network_context) + .connection_metadata(&self.connection_metadata), + error = ?e, + "{} Failed to send close instruction to writer task. It must already be terminating/terminated. Error: {:?}", + self.network_context, + e + ); + } + let remote_peer_id = self.remote_peer_id(); // Send a PeerDisconnected event to PeerManager. if let Err(e) = self .connection_notifs_tx @@ -676,20 +706,6 @@ where ); } - // Send a close instruction to the writer task. On receipt of this - // instruction, the writer task drops all pending outbound messages and - // closes the connection. - if let Err(e) = writer_close_tx.send(()) { - info!( - NetworkSchema::new(&self.network_context) - .connection_metadata(&self.connection_metadata), - error = ?e, - "{} Failed to send close instruction to writer task. It must already be terminating/terminated. Error: {:?}", - self.network_context, - e - ); - } - trace!( NetworkSchema::new(&self.network_context) .connection_metadata(&self.connection_metadata), diff --git a/network/framework/src/protocols/rpc/mod.rs b/network/framework/src/protocols/rpc/mod.rs index 2769993da793c..6f06d0ec4e2dd 100644 --- a/network/framework/src/protocols/rpc/mod.rs +++ b/network/framework/src/protocols/rpc/mod.rs @@ -71,7 +71,6 @@ use error::RpcError; use futures::{ channel::oneshot, future::{BoxFuture, FusedFuture, Future, FutureExt}, - sink::SinkExt, stream::{FuturesUnordered, StreamExt}, }; use serde::Serialize; @@ -324,9 +323,9 @@ impl InboundRpcs { /// Handle a completed response from the application handler. If successful, /// we update the appropriate counters and enqueue the response message onto /// the outbound write queue. - pub async fn send_outbound_response( + pub fn send_outbound_response( &mut self, - write_reqs_tx: &mut aptos_channels::Sender, + write_reqs_tx: &mut aptos_channel::Sender<(), NetworkMessage>, maybe_response: Result<(RpcResponse, ProtocolId), RpcError>, ) -> Result<(), RpcError> { let network_context = &self.network_context; @@ -354,7 +353,7 @@ impl InboundRpcs { response.request_id, ); let message = NetworkMessage::RpcResponse(response); - write_reqs_tx.send(message).await?; + write_reqs_tx.push((), message)?; // Update the outbound RPC response metrics self.update_outbound_rpc_response_metrics(protocol_id, res_len); @@ -433,10 +432,10 @@ impl OutboundRpcs { } /// Handle a new outbound rpc request from the application layer. - pub async fn handle_outbound_request( + pub fn handle_outbound_request( &mut self, request: OutboundRpcRequest, - write_reqs_tx: &mut aptos_channels::Sender, + write_reqs_tx: &mut aptos_channel::Sender<(), NetworkMessage>, ) -> Result<(), RpcError> { let network_context = &self.network_context; let peer_id = &self.remote_peer_id; @@ -499,7 +498,7 @@ impl OutboundRpcs { priority: Priority::default(), raw_request: Vec::from(request_data.as_ref()), }); - write_reqs_tx.send(message).await?; + write_reqs_tx.push((), message)?; // Update the outbound RPC request metrics self.update_outbound_rpc_request_metrics(protocol_id, req_len);