From d586e08aa96914d727927bd26c61971f848c0f4d Mon Sep 17 00:00:00 2001 From: Josh Lind Date: Wed, 13 Nov 2024 21:04:33 -0500 Subject: [PATCH] [Network] Force message streaming. --- config/src/config/network_config.rs | 4 ++-- network/framework/src/protocols/network/mod.rs | 1 + network/framework/src/protocols/stream/mod.rs | 12 +++++++++++- .../src/protocols/wire/messaging/v1/metadata.rs | 7 +++++++ 4 files changed, 21 insertions(+), 3 deletions(-) diff --git a/config/src/config/network_config.rs b/config/src/config/network_config.rs index 8ecd0964a6723..afee2ea1e70fb 100644 --- a/config/src/config/network_config.rs +++ b/config/src/config/network_config.rs @@ -38,7 +38,7 @@ pub const HANDSHAKE_VERSION: u8 = 0; pub const NETWORK_CHANNEL_SIZE: usize = 1024; pub const PING_INTERVAL_MS: u64 = 10_000; pub const PING_TIMEOUT_MS: u64 = 20_000; -pub const PING_FAILURES_TOLERATED: u64 = 3; +pub const PING_FAILURES_TOLERATED: u64 = 10; pub const CONNECTIVITY_CHECK_INTERVAL_MS: u64 = 5000; pub const MAX_CONNECTION_DELAY_MS: u64 = 60_000; /* 1 minute */ pub const MAX_FULLNODE_OUTBOUND_CONNECTIONS: usize = 6; @@ -47,7 +47,7 @@ pub const MAX_MESSAGE_METADATA_SIZE: usize = 128 * 1024; /* 128 KiB: a buffer fo pub const MESSAGE_PADDING_SIZE: usize = 2 * 1024 * 1024; /* 2 MiB: a safety buffer to allow messages to get larger during serialization */ pub const MAX_APPLICATION_MESSAGE_SIZE: usize = (MAX_MESSAGE_SIZE - MAX_MESSAGE_METADATA_SIZE) - MESSAGE_PADDING_SIZE; /* The message size that applications should check against */ -pub const MAX_FRAME_SIZE: usize = 4 * 1024 * 1024; /* 4 MiB large messages will be chunked into multiple frames and streamed */ +pub const MAX_FRAME_SIZE: usize = 100 * 1024; /* 100KB large messages will be chunked into multiple frames and streamed */ pub const MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; /* 64 MiB */ pub const CONNECTION_BACKOFF_BASE: u64 = 2; pub const IP_BYTE_BUCKET_RATE: usize = 102400 /* 100 KiB */; diff --git a/network/framework/src/protocols/network/mod.rs b/network/framework/src/protocols/network/mod.rs index 8a593f19ab98c..ffbabd78f53e3 100644 --- a/network/framework/src/protocols/network/mod.rs +++ b/network/framework/src/protocols/network/mod.rs @@ -138,6 +138,7 @@ pub struct ReceivedMessage { pub message: NetworkMessage, pub sender: PeerNetworkId, + // TODO: clean this up! // unix microseconds pub receive_timestamp_micros: u64, diff --git a/network/framework/src/protocols/stream/mod.rs b/network/framework/src/protocols/stream/mod.rs index 6e92e5f72ea99..ada9225f8ae12 100644 --- a/network/framework/src/protocols/stream/mod.rs +++ b/network/framework/src/protocols/stream/mod.rs @@ -8,6 +8,7 @@ use crate::protocols::wire::messaging::v1::{ use anyhow::{bail, ensure}; use aptos_channels::Sender; use aptos_id_generator::{IdGenerator, U32IdGenerator}; +use aptos_logger::info; use futures_util::SinkExt; #[cfg(any(test, feature = "fuzzing"))] use proptest_derive::Arbitrary; @@ -182,7 +183,14 @@ impl OutboundStream { /// Returns true iff the message should be streamed (i.e., broken into chunks) pub fn should_stream(&self, message_with_metadata: &NetworkMessageWithMetadata) -> bool { let message_length = message_with_metadata.network_message().data_len(); - message_length > self.max_message_size + let result = message_length > self.max_frame_size; + if result { + info!( + "Message length {} exceed size limit {}, should stream: {}", + message_length, self.max_frame_size, result + ); + } + result } pub async fn stream_message( @@ -225,6 +233,8 @@ impl OutboundStream { "Number of fragments overflowed" ); + info!("Stream message with {} fragments", num_chunks + 1); + // Create the stream header multiplex message let header_multiplex_message = MultiplexMessage::Stream(StreamMessage::Header(StreamHeader { diff --git a/network/framework/src/protocols/wire/messaging/v1/metadata.rs b/network/framework/src/protocols/wire/messaging/v1/metadata.rs index e5effb67e5323..321217a1e94ad 100644 --- a/network/framework/src/protocols/wire/messaging/v1/metadata.rs +++ b/network/framework/src/protocols/wire/messaging/v1/metadata.rs @@ -190,6 +190,13 @@ impl MessageMetadata { /// Marks the message as being sent over the network wire by emitting latency metrics pub fn mark_message_as_sending(&mut self) { + // If this message is a streamed message fragment, there's no need to emit + // any metrics (we only emit metrics for the head and tail of streamed messages). + if self.message_stream_type == MessageStreamType::StreamedMessageFragment { + return; + } + + // Otherwise, emit the latency metrics if let Some(application_send_time) = self.application_send_time { // Calculate the application to wire send latency let application_to_wire_latency = application_send_time