Skip to content

Commit

Permalink
[network] make peer actor loop non-blocking
Browse files Browse the repository at this point in the history
This commit tries to make the peer actor loop non-blocking in case of underlying connection is blocked.

1. use aptos channel for the main loop to avoid channel backpressure.
2. add timeout for writes and move close_rx to writer task.
3. drop sender to shut down the multiplex task.
  • Loading branch information
Zekun Li authored and zekun000 committed Jun 25, 2024
1 parent a4c2e38 commit af79a8f
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 80 deletions.
7 changes: 4 additions & 3 deletions network/framework/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,11 @@ pub static PENDING_PEER_MANAGER_DIAL_REQUESTS: Lazy<IntGauge> = Lazy::new(|| {
});

/// Counter of messages pending in queue to be sent out on the wire.
pub static PENDING_WIRE_MESSAGES: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
pub static PENDING_WIRE_MESSAGES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_network_pending_wire_messages",
"Number of pending wire messages"
"Number of pending wire messages",
&["state"],
)
.unwrap()
});
Expand Down
156 changes: 86 additions & 70 deletions network/framework/src/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::{
transport::{self, Connection, ConnectionMetadata},
ProtocolId,
};
use aptos_channels::aptos_channel;
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::network_id::NetworkContext;
use aptos_logger::prelude::*;
use aptos_short_hex_str::AsShortHexStr;
Expand All @@ -51,7 +51,7 @@ use futures::{
use futures_util::stream::select;
use serde::Serialize;
use std::{fmt, panic, time::Duration};
use tokio::runtime::Handle;
use tokio::{runtime::Handle, time::timeout};
use tokio_util::compat::{
FuturesAsyncReadCompatExt, TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt,
};
Expand Down Expand Up @@ -240,7 +240,7 @@ where
// Handle a new outbound request from the PeerManager.
maybe_request = self.peer_reqs_rx.next() => {
match maybe_request {
Some(request) => self.handle_outbound_request(request, &mut write_reqs_tx).await,
Some(request) => self.handle_outbound_request(request, &mut write_reqs_tx),
// The PeerManager is requesting this connection to close
// by dropping the corresponding peer_reqs_tx handle.
None => self.shutdown(DisconnectReason::Requested),
Expand All @@ -251,7 +251,7 @@ where
maybe_message = reader.next() => {
match maybe_message {
Some(message) => {
if let Err(err) = self.handle_inbound_message(message, &mut write_reqs_tx).await {
if let Err(err) = self.handle_inbound_message(message, &mut write_reqs_tx) {
warn!(
NetworkSchema::new(&self.network_context)
.connection_metadata(&self.connection_metadata),
Expand All @@ -277,7 +277,7 @@ where
};

// Send the response to the remote peer
if let Err(error) = self.inbound_rpcs.send_outbound_response(&mut write_reqs_tx, maybe_response).await {
if let Err(error) = self.inbound_rpcs.send_outbound_response(&mut write_reqs_tx, maybe_response) {
// It's quite common for applications to drop an RPC request.
// If this happens, we want to avoid logging a warning/error
// (as it makes the logs noisy). Otherwise, we log normally.
Expand Down Expand Up @@ -312,7 +312,8 @@ where

// Finish shutting down the connection. Close the writer task and notify
// PeerManager that this connection has shutdown.
self.do_shutdown(writer_close_tx, reason).await;
self.do_shutdown(write_reqs_tx, writer_close_tx, reason)
.await;
}

// Start a new task on the given executor which is responsible for writing outbound messages on
Expand All @@ -330,30 +331,44 @@ where
mut writer: MultiplexMessageSink<impl AsyncWrite + Unpin + Send + 'static>,
max_frame_size: usize,
max_message_size: usize,
) -> (aptos_channels::Sender<NetworkMessage>, oneshot::Sender<()>) {
) -> (
aptos_channel::Sender<(), NetworkMessage>,
oneshot::Sender<()>,
) {
let remote_peer_id = connection_metadata.remote_peer_id;
let (write_reqs_tx, mut write_reqs_rx): (aptos_channels::Sender<NetworkMessage>, _) =
aptos_channels::new(1024, &counters::PENDING_WIRE_MESSAGES);
let (write_reqs_tx, mut write_reqs_rx): (aptos_channel::Sender<(), NetworkMessage>, _) =
aptos_channel::new(
QueueStyle::KLAST,
1024,
Some(&counters::PENDING_WIRE_MESSAGES),
);
let (close_tx, mut close_rx) = oneshot::channel();

let (mut msg_tx, msg_rx) = aptos_channels::new(1024, &counters::PENDING_MULTIPLEX_MESSAGE);
let (stream_msg_tx, stream_msg_rx) =
aptos_channels::new(1024, &counters::PENDING_MULTIPLEX_STREAM);

// this task ends when the multiplex task ends (by dropping the senders)
// this task ends when the multiplex task ends (by dropping the senders) or receiving a close instruction
let writer_task = async move {
let mut stream = select(msg_rx, stream_msg_rx);
let log_context =
NetworkSchema::new(&network_context).connection_metadata(&connection_metadata);
while let Some(message) = stream.next().await {
if let Err(err) = writer.send(&message).await {
warn!(
log_context,
error = %err,
"{} Error in sending message to peer: {}",
network_context,
remote_peer_id.short_str(),
);
loop {
futures::select! {
message = stream.select_next_some() => {
if let Err(err) = timeout(transport::TRANSPORT_TIMEOUT,writer.send(&message)).await {
warn!(
log_context,
error = %err,
"{} Error in sending message to peer: {}",
network_context,
remote_peer_id.short_str(),
);
}
}
_ = close_rx => {
break;
}
}
}
info!(
Expand Down Expand Up @@ -399,30 +414,27 @@ where
},
}
};
// the task ends when the write_reqs_tx is dropped
let multiplex_task = async move {
let mut outbound_stream =
OutboundStream::new(max_frame_size, max_message_size, stream_msg_tx);
loop {
futures::select! {
message = write_reqs_rx.select_next_some() => {
// either channel full would block the other one
let result = if outbound_stream.should_stream(&message) {
outbound_stream.stream_message(message).await
} else {
msg_tx.send(MultiplexMessage::Message(message)).await.map_err(|_| anyhow::anyhow!("Writer task ended"))
};
if let Err(err) = result {
warn!(
error = %err,
"{} Error in sending message to peer: {}",
network_context,
remote_peer_id.short_str(),
);
}
},
_ = close_rx => {
break;
}
while let Some(message) = write_reqs_rx.next().await {
// either channel full would block the other one
let result = if outbound_stream.should_stream(&message) {
outbound_stream.stream_message(message).await
} else {
msg_tx
.send(MultiplexMessage::Message(message))
.await
.map_err(|_| anyhow::anyhow!("Writer task ended"))
};
if let Err(err) = result {
warn!(
error = %err,
"{} Error in sending message to peer: {}",
network_context,
remote_peer_id.short_str(),
);
}
}
};
Expand All @@ -431,7 +443,7 @@ where
(write_reqs_tx, close_tx)
}

async fn handle_inbound_network_message(
fn handle_inbound_network_message(
&mut self,
message: NetworkMessage,
) -> Result<(), PeerManagerError> {
Expand Down Expand Up @@ -470,7 +482,7 @@ where
Ok(())
}

async fn handle_inbound_stream_message(
fn handle_inbound_stream_message(
&mut self,
message: StreamMessage,
) -> Result<(), PeerManagerError> {
Expand All @@ -480,17 +492,17 @@ where
},
StreamMessage::Fragment(fragment) => {
if let Some(message) = self.inbound_stream.append_fragment(fragment)? {
self.handle_inbound_network_message(message).await?;
self.handle_inbound_network_message(message)?;
}
},
}
Ok(())
}

async fn handle_inbound_message(
fn handle_inbound_message(
&mut self,
message: Result<MultiplexMessage, ReadError>,
write_reqs_tx: &mut aptos_channels::Sender<NetworkMessage>,
write_reqs_tx: &mut aptos_channel::Sender<(), NetworkMessage>,
) -> Result<(), PeerManagerError> {
trace!(
NetworkSchema::new(&self.network_context)
Expand All @@ -512,7 +524,7 @@ where
let error_code = ErrorCode::parsing_error(*message_type, *protocol_id);
let message = NetworkMessage::Error(error_code);

write_reqs_tx.send(message).await?;
write_reqs_tx.push((), message)?;
return Err(err.into());
},
ReadError::IoError(_) => {
Expand All @@ -524,10 +536,8 @@ where
};

match message {
MultiplexMessage::Message(message) => {
self.handle_inbound_network_message(message).await
},
MultiplexMessage::Stream(message) => self.handle_inbound_stream_message(message).await,
MultiplexMessage::Message(message) => self.handle_inbound_network_message(message),
MultiplexMessage::Stream(message) => self.handle_inbound_stream_message(message),
}
}

Expand Down Expand Up @@ -575,10 +585,10 @@ where
network_application_inbound_traffic(self.network_context, protocol_id, data_len);
}

async fn handle_outbound_request(
fn handle_outbound_request(
&mut self,
request: PeerRequest,
write_reqs_tx: &mut aptos_channels::Sender<NetworkMessage>,
write_reqs_tx: &mut aptos_channel::Sender<(), NetworkMessage>,
) {
trace!(
"Peer {} PeerRequest::{:?}",
Expand All @@ -598,7 +608,7 @@ where
raw_msg: Vec::from(message.mdata.as_ref()),
});

match write_reqs_tx.send(message).await {
match write_reqs_tx.push((), message) {
Ok(_) => {
self.update_outbound_direct_send_metrics(protocol_id, message_len as u64);
},
Expand All @@ -621,7 +631,6 @@ where
if let Err(e) = self
.outbound_rpcs
.handle_outbound_request(request, write_reqs_tx)
.await
{
warn!(
NetworkSchema::new(&self.network_context)
Expand Down Expand Up @@ -653,9 +662,30 @@ where
self.state = State::ShuttingDown(reason);
}

async fn do_shutdown(mut self, writer_close_tx: oneshot::Sender<()>, reason: DisconnectReason) {
let remote_peer_id = self.remote_peer_id();
async fn do_shutdown(
mut self,
write_req_tx: aptos_channel::Sender<(), NetworkMessage>,
writer_close_tx: oneshot::Sender<()>,
reason: DisconnectReason,
) {
// Drop the sender to shut down multiplex task.
drop(write_req_tx);

// Send a close instruction to the writer task. On receipt of this
// instruction, the writer task drops all pending outbound messages and
// closes the connection.
if let Err(e) = writer_close_tx.send(()) {
info!(
NetworkSchema::new(&self.network_context)
.connection_metadata(&self.connection_metadata),
error = ?e,
"{} Failed to send close instruction to writer task. It must already be terminating/terminated. Error: {:?}",
self.network_context,
e
);
}

let remote_peer_id = self.remote_peer_id();
// Send a PeerDisconnected event to PeerManager.
if let Err(e) = self
.connection_notifs_tx
Expand All @@ -676,20 +706,6 @@ where
);
}

// Send a close instruction to the writer task. On receipt of this
// instruction, the writer task drops all pending outbound messages and
// closes the connection.
if let Err(e) = writer_close_tx.send(()) {
info!(
NetworkSchema::new(&self.network_context)
.connection_metadata(&self.connection_metadata),
error = ?e,
"{} Failed to send close instruction to writer task. It must already be terminating/terminated. Error: {:?}",
self.network_context,
e
);
}

trace!(
NetworkSchema::new(&self.network_context)
.connection_metadata(&self.connection_metadata),
Expand Down
13 changes: 6 additions & 7 deletions network/framework/src/protocols/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ use error::RpcError;
use futures::{
channel::oneshot,
future::{BoxFuture, FusedFuture, Future, FutureExt},
sink::SinkExt,
stream::{FuturesUnordered, StreamExt},
};
use serde::Serialize;
Expand Down Expand Up @@ -324,9 +323,9 @@ impl InboundRpcs {
/// Handle a completed response from the application handler. If successful,
/// we update the appropriate counters and enqueue the response message onto
/// the outbound write queue.
pub async fn send_outbound_response(
pub fn send_outbound_response(
&mut self,
write_reqs_tx: &mut aptos_channels::Sender<NetworkMessage>,
write_reqs_tx: &mut aptos_channel::Sender<(), NetworkMessage>,
maybe_response: Result<(RpcResponse, ProtocolId), RpcError>,
) -> Result<(), RpcError> {
let network_context = &self.network_context;
Expand Down Expand Up @@ -354,7 +353,7 @@ impl InboundRpcs {
response.request_id,
);
let message = NetworkMessage::RpcResponse(response);
write_reqs_tx.send(message).await?;
write_reqs_tx.push((), message)?;

// Update the outbound RPC response metrics
self.update_outbound_rpc_response_metrics(protocol_id, res_len);
Expand Down Expand Up @@ -433,10 +432,10 @@ impl OutboundRpcs {
}

/// Handle a new outbound rpc request from the application layer.
pub async fn handle_outbound_request(
pub fn handle_outbound_request(
&mut self,
request: OutboundRpcRequest,
write_reqs_tx: &mut aptos_channels::Sender<NetworkMessage>,
write_reqs_tx: &mut aptos_channel::Sender<(), NetworkMessage>,
) -> Result<(), RpcError> {
let network_context = &self.network_context;
let peer_id = &self.remote_peer_id;
Expand Down Expand Up @@ -499,7 +498,7 @@ impl OutboundRpcs {
priority: Priority::default(),
raw_request: Vec::from(request_data.as_ref()),
});
write_reqs_tx.send(message).await?;
write_reqs_tx.push((), message)?;

// Update the outbound RPC request metrics
self.update_outbound_rpc_request_metrics(protocol_id, req_len);
Expand Down

0 comments on commit af79a8f

Please sign in to comment.