Skip to content

Commit

Permalink
fix(rpc): remove chunking
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Jun 14, 2024
1 parent ad490ae commit 107855b
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 486 deletions.
55 changes: 9 additions & 46 deletions networking/rpc_framework/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ use crate::{
RpcHandshakeError,
RpcServerError,
RpcStatus,
RPC_CHUNKING_MAX_CHUNKS,
};

const LOG_TARGET: &str = "comms::rpc::client";
Expand Down Expand Up @@ -934,53 +933,17 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin

pub async fn read_response(&mut self) -> Result<proto::RpcResponse, RpcError> {
let timer = Instant::now();
let mut resp = self.next().await?;
let resp = self.next().await?;
self.time_to_first_msg = Some(timer.elapsed());
self.check_response(&resp)?;
let mut chunk_count = 1;
let mut last_chunk_flags =
RpcMessageFlags::from_bits(u8::try_from(resp.flags).map_err(|_| {
RpcStatus::protocol_error(&format!("invalid message flag: must be less than {}", u8::MAX))
})?)
.ok_or(RpcStatus::protocol_error(&format!(
"invalid message flag, does not match any flags ({})",
resp.flags
)))?;
let mut last_chunk_size = resp.payload.len();
self.bytes_read += last_chunk_size;
loop {
trace!(
target: LOG_TARGET,
"Chunk {} received (flags={:?}, {} bytes, {} total)",
chunk_count,
last_chunk_flags,
last_chunk_size,
resp.payload.len()
);
if !last_chunk_flags.is_more() {
return Ok(resp);
}

if chunk_count >= RPC_CHUNKING_MAX_CHUNKS {
return Err(RpcError::RemotePeerExceededMaxChunkCount {
expected: RPC_CHUNKING_MAX_CHUNKS,
});
}

let msg = self.next().await?;
last_chunk_flags = RpcMessageFlags::from_bits(u8::try_from(msg.flags).map_err(|_| {
RpcStatus::protocol_error(&format!("invalid message flag: must be less than {}", u8::MAX))
})?)
.ok_or(RpcStatus::protocol_error(&format!(
"invalid message flag, does not match any flags ({})",
resp.flags
)))?;
last_chunk_size = msg.payload.len();
self.bytes_read += last_chunk_size;
self.check_response(&resp)?;
resp.payload.extend(msg.payload);
chunk_count += 1;
}
self.bytes_read = resp.payload.len();
trace!(
target: LOG_TARGET,
"Received {} bytes in {:.2?}",
resp.payload.len(),
self.time_to_first_msg.unwrap_or_default()
);
Ok(resp)
}

pub async fn read_ack(&mut self) -> Result<proto::RpcResponse, RpcError> {
Expand Down
7 changes: 1 addition & 6 deletions networking/rpc_framework/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,12 @@
//! Provides a request/response protocol that supports streaming.
//! Available with the `rpc` crate feature.
// TODO: fix all tests
// #[cfg(test)]
// mod test;

/// Maximum frame size of each RPC message. This is enforced in tokio's length delimited codec.
/// This can be thought of as the hard limit on message size.
pub const RPC_MAX_FRAME_SIZE: usize = 3 * 1024 * 1024; // 3 MiB
/// Maximum number of chunks into which a message can be broken up.
const RPC_CHUNKING_MAX_CHUNKS: usize = 16; // 16 x 256 Kib = 4 MiB max combined message size
const RPC_CHUNKING_THRESHOLD: usize = 256 * 1024;
const RPC_CHUNKING_SIZE_LIMIT: usize = 384 * 1024;

/// The maximum request payload size
const fn max_request_size() -> usize {
Expand All @@ -44,7 +39,7 @@ const fn max_request_size() -> usize {

/// The maximum size for a single RPC response message
const fn max_response_size() -> usize {
RPC_CHUNKING_MAX_CHUNKS * RPC_CHUNKING_THRESHOLD
RPC_MAX_FRAME_SIZE
}

/// The maximum size for a single RPC response excluding overhead
Expand Down
21 changes: 15 additions & 6 deletions networking/rpc_framework/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use bytes::Bytes;
use crate::{
body::{Body, IntoBody},
error::HandshakeRejectReason,
max_response_payload_size,
proto,
proto::rpc_session_reply::SessionResult,
RpcError,
Expand Down Expand Up @@ -145,8 +146,6 @@ bitflags! {
const FIN = 0x01;
/// Typically sent with empty contents and used to confirm a substream is alive.
const ACK = 0x02;
/// Another chunk to be received
const MORE = 0x04;
}
}
impl RpcMessageFlags {
Expand All @@ -157,10 +156,6 @@ impl RpcMessageFlags {
pub fn is_ack(self) -> bool {
self.contains(Self::ACK)
}

pub fn is_more(self) -> bool {
self.contains(Self::MORE)
}
}

impl Default for RpcMessageFlags {
Expand Down Expand Up @@ -218,6 +213,20 @@ impl RpcResponse {
payload: self.payload.to_vec(),
}
}

pub fn exceeded_message_size(self) -> RpcResponse {
let msg = format!(
"The response size exceeded the maximum allowed payload size. Max = {} bytes, Got = {} bytes",
max_response_payload_size() as f32,
self.payload.len() as f32,
);
RpcResponse {
request_id: self.request_id,
status: RpcStatusCode::MalformedResponse,
flags: RpcMessageFlags::FIN,
payload: msg.into_bytes().into(),
}
}
}

impl Default for RpcResponse {
Expand Down
Loading

0 comments on commit 107855b

Please sign in to comment.