Skip to content

Commit

Permalink
Remove chunking from rpc
Browse files Browse the repository at this point in the history
Removed chunking from rpc as this does not improve latency or throughput.
  • Loading branch information
hansieodendaal committed May 17, 2024
1 parent 64e650b commit c0d08f3
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 309 deletions.
11 changes: 0 additions & 11 deletions comms/core/src/protocol/rpc/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,17 +267,6 @@ pub struct RpcResponse {
pub payload: Bytes,
}

impl RpcResponse {
pub fn to_proto(&self) -> proto::rpc::RpcResponse {
proto::rpc::RpcResponse {
request_id: self.request_id,
status: self.status as u32,
flags: self.flags.bits().into(),
payload: self.payload.to_vec(),
}
}
}

impl Default for RpcResponse {
fn default() -> Self {
Self {
Expand Down
18 changes: 0 additions & 18 deletions comms/core/src/protocol/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,12 @@ mod test;
pub const RPC_MAX_FRAME_SIZE: usize = 3 * 1024 * 1024; // 3 MiB
/// Maximum number of chunks into which a message can be broken up.
const RPC_CHUNKING_MAX_CHUNKS: usize = 16; // 16 x 256 Kib = 4 MiB max combined message size
const RPC_CHUNKING_THRESHOLD: usize = 256 * 1024;
const RPC_CHUNKING_SIZE_LIMIT: usize = 384 * 1024;

/// The maximum request payload size
const fn max_request_size() -> usize {
RPC_MAX_FRAME_SIZE
}

/// The maximum size for a single RPC response message
const fn max_response_size() -> usize {
RPC_CHUNKING_MAX_CHUNKS * RPC_CHUNKING_THRESHOLD
}

/// The maximum size for a single RPC response excluding overhead
const fn max_response_payload_size() -> usize {
// RpcResponse overhead is:
// - 4 varint protobuf fields, each field ID is 1 byte
// - 3 u32 fields, VarInt(u32::MAX) is 5 bytes
// - 1 length varint for the payload, allow for 5 bytes to be safe (max_payload_size being technically too small is
// fine, being too large isn't)
const MAX_HEADER_SIZE: usize = 4 + 4 * 5;
max_response_size() - MAX_HEADER_SIZE
}

mod body;
pub use body::{Body, ClientStreaming, IntoBody, Streaming};

Expand Down
272 changes: 0 additions & 272 deletions comms/core/src/protocol/rpc/server/chunking.rs

This file was deleted.

14 changes: 9 additions & 5 deletions comms/core/src/protocol/rpc/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

mod chunking;
use chunking::ChunkedResponseIter;
// mod chunking;

mod error;
pub use error::RpcServerError;
Expand Down Expand Up @@ -52,7 +51,7 @@ use std::{
time::{Duration, Instant},
};

use futures::{future, stream, stream::FuturesUnordered, SinkExt, StreamExt};
use futures::{future, stream::FuturesUnordered, SinkExt, StreamExt};
use log::*;
use prost::Message;
use router::Router;
Expand Down Expand Up @@ -749,12 +748,17 @@ where
let mut stream = body
.into_message()
.map(|result| into_response(request_id, result))
.flat_map(move |message| {
.map(move |message| {
#[cfg(feature = "metrics")]
if !message.status.is_ok() {
metrics::status_error_counter(&node_id, &protocol, message.status).inc();
}
stream::iter(ChunkedResponseIter::new(message))
proto::rpc::RpcResponse {
request_id,
status: message.status.as_u32(),
flags: message.flags.bits().into(),
payload: message.payload.to_vec(),
}
})
.map(|resp| Bytes::from(resp.to_encoded_bytes()));

Expand Down
6 changes: 3 additions & 3 deletions comms/core/src/protocol/rpc/test/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ async fn response_too_big() {
let (_inbound, outbound, _, _, _shutdown) = setup(GreetingService::new(&[]), 1).await;
let socket = outbound.get_yamux_control().open_stream().await.unwrap();

let framed = framing::canonical(socket, rpc::max_response_size());
let framed = framing::canonical(socket, rpc::RPC_MAX_FRAME_SIZE);
let mut client = GreetingClient::builder()
.with_deadline(Duration::from_secs(5))
.connect(framed)
Expand All @@ -291,15 +291,15 @@ async fn response_too_big() {

// RPC_MAX_FRAME_SIZE bytes will always be too large because of the overhead of the RpcResponse proto message
let err = client
.reply_with_msg_of_size(rpc::max_response_payload_size() as u64 + 1)
.reply_with_msg_of_size(rpc::RPC_MAX_FRAME_SIZE as u64 + 1)
.await
.unwrap_err();
unpack_enum!(RpcError::RequestFailed(status) = err);
unpack_enum!(RpcStatusCode::MalformedResponse = status.as_status_code());

// Check that the exact frame size boundary works and that the session is still going
let _string = client
.reply_with_msg_of_size(rpc::max_response_payload_size() as u64 - 9)
.reply_with_msg_of_size(rpc::RPC_MAX_FRAME_SIZE as u64 - 9)
.await
.unwrap();
}
Expand Down

0 comments on commit c0d08f3

Please sign in to comment.