Skip to content

Commit

Permalink
(de)compression: reduce memory allocation to improve performance (#521)
Browse files Browse the repository at this point in the history
Currently, every time `WrapBody::poll_frame` is called, new instance of
`BytesMut` is created with the default capacity, which is effectively
64 bytes. This ends up with a lot of memory allocation in certain
situations, making the throughput significantly worse.

To optimize memory allocation, `WrapBody` now gets `BytesMut` as its
field, with initial capacity of 4096 bytes. This buffer will be reused
as much as possible across multiple `poll_frame` calls, and only when
its capacity becomes 0, new allocation of another 4096 bytes is
performed.

Fixes: #520
  • Loading branch information
magurotuna authored Sep 23, 2024
1 parent aeca262 commit 9fdf0eb
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions tower-http/src/compression_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,17 @@ pin_project! {
// rust-analyer thinks this field is private if its `pub(crate)` but works fine when its
// `pub`
pub read: M::Output,
// A buffer to temporarily store the data read from the underlying body.
// Reused as much as possible to optimize allocations.
buf: BytesMut,
read_all_data: bool,
}
}

impl<M: DecorateAsyncRead> WrapBody<M> {
const INTERNAL_BUF_CAPACITY: usize = 4096;
}

impl<M: DecorateAsyncRead> WrapBody<M> {
#[allow(dead_code)]
pub(crate) fn new<B>(body: B, quality: CompressionLevel) -> Self
Expand All @@ -167,6 +174,7 @@ impl<M: DecorateAsyncRead> WrapBody<M> {

Self {
read,
buf: BytesMut::with_capacity(Self::INTERNAL_BUF_CAPACITY),
read_all_data: false,
}
}
Expand All @@ -186,16 +194,21 @@ where
cx: &mut Context<'_>,
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
let mut this = self.project();
let mut buf = BytesMut::new();

if !*this.read_all_data {
let result = tokio_util::io::poll_read_buf(this.read.as_mut(), cx, &mut buf);
if this.buf.capacity() == 0 {
this.buf.reserve(Self::INTERNAL_BUF_CAPACITY);
}

let result = tokio_util::io::poll_read_buf(this.read.as_mut(), cx, &mut this.buf);

match ready!(result) {
Ok(0) => {
*this.read_all_data = true;
}
Ok(_) => {
return Poll::Ready(Some(Ok(Frame::data(buf.freeze()))));
let chunk = this.buf.split().freeze();
return Poll::Ready(Some(Ok(Frame::data(chunk))));
}
Err(err) => {
let body_error: Option<B::Error> = M::get_pin_mut(this.read)
Expand Down

0 comments on commit 9fdf0eb

Please sign in to comment.