Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(http1): implement an adaptive read buffer strategy #1725

Merged
merged 1 commit into from
Nov 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions benches/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,26 @@ fn http1_post(b: &mut test::Bencher) {
.bench(b)
}

#[bench]
fn http1_body_100kb(b: &mut test::Bencher) {
let body = &[b'x'; 1024 * 100];
opts()
.method(Method::POST)
.request_body(body)
.response_body(body)
.bench(b)
}

#[bench]
fn http1_body_10mb(b: &mut test::Bencher) {
let body = &[b'x'; 1024 * 1024 * 10];
opts()
.method(Method::POST)
.request_body(body)
.response_body(body)
.bench(b)
}

#[bench]
fn http1_get_parallel(b: &mut test::Bencher) {
opts()
Expand Down Expand Up @@ -96,6 +116,11 @@ impl Opts {
self
}

fn response_body(mut self, body: &'static [u8]) -> Self {
self.response_body = body;
self
}

fn parallel(mut self, cnt: u32) -> Self {
assert!(cnt > 0, "parallel count must be larger than 0");
self.parallel_cnt = cnt;
Expand All @@ -105,6 +130,9 @@ impl Opts {
fn bench(self, b: &mut test::Bencher) {
let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().unwrap();

b.bytes = self.response_body.len() as u64 + self.request_body.map(|b| b.len()).unwrap_or(0) as u64;

let addr = spawn_hello(&mut rt, self.response_body);

let connector = HttpConnector::new(1);
Expand Down
203 changes: 174 additions & 29 deletions src/proto/h1/io.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::cell::Cell;
use std::cmp;
use std::collections::VecDeque;
use std::fmt;
use std::io;
Expand Down Expand Up @@ -60,9 +61,7 @@ where
io: io,
read_blocked: false,
read_buf: BytesMut::with_capacity(0),
read_buf_strategy: ReadStrategy::Adaptive {
max: DEFAULT_MAX_BUFFER_SIZE,
},
read_buf_strategy: ReadStrategy::default(),
write_buf: WriteBuf::new(),
}
}
Expand All @@ -81,9 +80,7 @@ where
"The max_buf_size cannot be smaller than {}.",
MINIMUM_MAX_BUFFER_SIZE,
);
self.read_buf_strategy = ReadStrategy::Adaptive {
max,
};
self.read_buf_strategy = ReadStrategy::with_max(max);
self.write_buf.max_buf_size = max;
}

Expand Down Expand Up @@ -149,18 +146,11 @@ where
debug!("parsed {} headers", msg.head.headers.len());
return Ok(Async::Ready(msg))
},
None => match self.read_buf_strategy {
ReadStrategy::Adaptive { max } => {
if self.read_buf.len() >= max {
debug!("max_buf_size ({}) reached, closing", max);
return Err(::Error::new_too_large());
}
},
ReadStrategy::Exact(exact) => {
if self.read_buf.len() >= exact {
debug!("exact buf size ({}) filled, closing", exact);
return Err(::Error::new_too_large());
}
None => {
let max = self.read_buf_strategy.max();
if self.read_buf.len() >= max {
debug!("max_buf_size ({}) reached, closing", max);
return Err(::Error::new_too_large());
}
},
}
Expand All @@ -177,22 +167,15 @@ where
pub fn read_from_io(&mut self) -> Poll<usize, io::Error> {
use bytes::BufMut;
self.read_blocked = false;
match self.read_buf_strategy {
ReadStrategy::Adaptive { .. } => {
if self.read_buf.remaining_mut() < INIT_BUFFER_SIZE {
self.read_buf.reserve(INIT_BUFFER_SIZE);
}
},
ReadStrategy::Exact(exact) => {
if self.read_buf.capacity() < exact {
self.read_buf.reserve(exact);
}
},
let next = self.read_buf_strategy.next();
if self.read_buf.remaining_mut() < next {
self.read_buf.reserve(next);
}
self.io.read_buf(&mut self.read_buf).map(|ok| {
match ok {
Async::Ready(n) => {
debug!("read {} bytes", n);
self.read_buf_strategy.record(n);
Async::Ready(n)
},
Async::NotReady => {
Expand Down Expand Up @@ -285,11 +268,82 @@ where
#[derive(Clone, Copy, Debug)]
enum ReadStrategy {
Adaptive {
decrease_now: bool,
next: usize,
max: usize
},
Exact(usize),
}

impl ReadStrategy {
fn with_max(max: usize) -> ReadStrategy {
ReadStrategy::Adaptive {
decrease_now: false,
next: INIT_BUFFER_SIZE,
max,
}
}

fn next(&self) -> usize {
match *self {
ReadStrategy::Adaptive { next, .. } => next,
ReadStrategy::Exact(exact) => exact,
}
}

fn max(&self) -> usize {
match *self {
ReadStrategy::Adaptive { max, .. } => max,
ReadStrategy::Exact(exact) => exact,
}
}

fn record(&mut self, bytes_read: usize) {
match *self {
ReadStrategy::Adaptive { ref mut decrease_now, ref mut next, max, .. } => {
if bytes_read >= *next {
*next = cmp::min(incr_power_of_two(*next), max);
*decrease_now = false;
} else {
let decr_to = prev_power_of_two(*next);
if bytes_read < decr_to {
if *decrease_now {
*next = cmp::max(decr_to, INIT_BUFFER_SIZE);
*decrease_now = false;
} else {
// Decreasing is a two "record" process.
*decrease_now = true;
}
} else {
// A read within the current range should cancel
// a potential decrease, since we just saw proof
// that we still need this size.
*decrease_now = false;
}
}
},
_ => (),
}
}
}

fn incr_power_of_two(n: usize) -> usize {
n.saturating_mul(2)
}

fn prev_power_of_two(n: usize) -> usize {
// Only way this shift can underflow is if n is less than 4.
// (Which would means `usize::MAX >> 64` and underflowed!)
debug_assert!(n >= 4);
(::std::usize::MAX >> (n.leading_zeros() + 2)) + 1
}

impl Default for ReadStrategy {
fn default() -> ReadStrategy {
ReadStrategy::with_max(DEFAULT_MAX_BUFFER_SIZE)
}
}

#[derive(Clone)]
pub struct Cursor<T> {
bytes: T,
Expand Down Expand Up @@ -637,6 +691,97 @@ mod tests {
assert!(buffered.io.blocked());
}

#[test]
fn read_strategy_adaptive_increments() {
let mut strategy = ReadStrategy::default();
assert_eq!(strategy.next(), 8192);

// Grows if record == next
strategy.record(8192);
assert_eq!(strategy.next(), 16384);

strategy.record(16384);
assert_eq!(strategy.next(), 32768);

// Enormous records still increment at same rate
strategy.record(::std::usize::MAX);
assert_eq!(strategy.next(), 65536);

let max = strategy.max();
while strategy.next() < max {
strategy.record(max);
}

assert_eq!(strategy.next(), max, "never goes over max");
strategy.record(max + 1);
assert_eq!(strategy.next(), max, "never goes over max");
}

#[test]
fn read_strategy_adaptive_decrements() {
let mut strategy = ReadStrategy::default();
strategy.record(8192);
assert_eq!(strategy.next(), 16384);

strategy.record(1);
assert_eq!(strategy.next(), 16384, "first smaller record doesn't decrement yet");
strategy.record(8192);
assert_eq!(strategy.next(), 16384, "record was with range");

strategy.record(1);
assert_eq!(strategy.next(), 16384, "in-range record should make this the 'first' again");

strategy.record(1);
assert_eq!(strategy.next(), 8192, "second smaller record decrements");

strategy.record(1);
assert_eq!(strategy.next(), 8192, "first doesn't decrement");
strategy.record(1);
assert_eq!(strategy.next(), 8192, "doesn't decrement under minimum");
}

#[test]
fn read_strategy_adaptive_stays_the_same() {
let mut strategy = ReadStrategy::default();
strategy.record(8192);
assert_eq!(strategy.next(), 16384);

strategy.record(8193);
assert_eq!(strategy.next(), 16384, "first smaller record doesn't decrement yet");

strategy.record(8193);
assert_eq!(strategy.next(), 16384, "with current step does not decrement");
}

#[test]
fn read_strategy_adaptive_max_fuzz() {
fn fuzz(max: usize) {
let mut strategy = ReadStrategy::with_max(max);
while strategy.next() < max {
strategy.record(::std::usize::MAX);
}
let mut next = strategy.next();
while next > 8192 {
strategy.record(1);
strategy.record(1);
next = strategy.next();
assert!(
next.is_power_of_two(),
"decrement should be powers of two: {} (max = {})",
next,
max,
);
}
}

let mut max = 8192;
while max < ::std::usize::MAX {
fuzz(max);
max = (max / 2).saturating_mul(3);
}
fuzz(::std::usize::MAX);
}

#[test]
#[should_panic]
fn write_buf_requires_non_empty_bufs() {
Expand Down