Skip to content

Commit

Permalink
feat(client): add http1_read_buf_exact_size Builder option
Browse files Browse the repository at this point in the history
This changes the read buffer strategy from being adaptive to always
using an exact size for the buffer.
  • Loading branch information
rrichardson authored and seanmonstar committed Nov 21, 2018
1 parent 92a8aba commit 2e7250b
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 27 deletions.
9 changes: 9 additions & 0 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub struct Builder {
exec: Exec,
h1_writev: bool,
h1_title_case_headers: bool,
h1_read_buf_exact_size: Option<usize>,
http2: bool,
}

Expand Down Expand Up @@ -432,6 +433,7 @@ impl Builder {
Builder {
exec: Exec::Default,
h1_writev: true,
h1_read_buf_exact_size: None,
h1_title_case_headers: false,
http2: false,
}
Expand Down Expand Up @@ -461,6 +463,10 @@ impl Builder {
self
}

pub(super) fn h1_read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
self.h1_read_buf_exact_size = sz;
self
}
/// Sets whether HTTP2 is required.
///
/// Default is false.
Expand Down Expand Up @@ -506,6 +512,9 @@ where
if self.builder.h1_title_case_headers {
conn.set_title_case_headers();
}
if let Some(sz) = self.builder.h1_read_buf_exact_size {
conn.set_read_buf_exact_size(sz);
}
let cd = proto::h1::dispatch::Client::new(rx);
let dispatch = proto::h1::Dispatcher::new(cd, conn);
Either::A(dispatch)
Expand Down
17 changes: 17 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ pub struct Client<C, B = Body> {
h1_writev: bool,
h1_title_case_headers: bool,
pool: Pool<PoolClient<B>>,
h1_read_buf_exact_size: Option<usize>,
retry_canceled_requests: bool,
set_host: bool,
ver: Ver,
Expand Down Expand Up @@ -460,6 +461,7 @@ where C: Connect + Sync + 'static,
let pool = self.pool.clone();
let h1_writev = self.h1_writev;
let h1_title_case_headers = self.h1_title_case_headers;
let h1_read_buf_exact_size = self.h1_read_buf_exact_size;
let ver = self.ver;
let is_ver_h2 = self.ver == Ver::Http2;
let connector = self.connector.clone();
Expand Down Expand Up @@ -506,6 +508,7 @@ where C: Connect + Sync + 'static,
.exec(executor.clone())
.h1_writev(h1_writev)
.h1_title_case_headers(h1_title_case_headers)
.h1_read_buf_exact_size(h1_read_buf_exact_size)
.http2_only(is_h2)
.handshake(io)
.and_then(move |(tx, conn)| {
Expand Down Expand Up @@ -545,6 +548,7 @@ impl<C, B> Clone for Client<C, B> {
connector: self.connector.clone(),
executor: self.executor.clone(),
h1_writev: self.h1_writev,
h1_read_buf_exact_size: self.h1_read_buf_exact_size,
h1_title_case_headers: self.h1_title_case_headers,
pool: self.pool.clone(),
retry_canceled_requests: self.retry_canceled_requests,
Expand Down Expand Up @@ -791,6 +795,7 @@ pub struct Builder {
keep_alive_timeout: Option<Duration>,
h1_writev: bool,
h1_title_case_headers: bool,
h1_read_buf_exact_size: Option<usize>,
max_idle_per_host: usize,
retry_canceled_requests: bool,
set_host: bool,
Expand All @@ -805,6 +810,7 @@ impl Default for Builder {
keep_alive_timeout: Some(Duration::from_secs(90)),
h1_writev: true,
h1_title_case_headers: false,
h1_read_buf_exact_size: None,
max_idle_per_host: ::std::usize::MAX,
retry_canceled_requests: true,
set_host: true,
Expand Down Expand Up @@ -851,6 +857,15 @@ impl Builder {
self
}

/// Sets the exact size of the read buffer to *always* use.
///
/// Default is an adaptive read buffer.
#[inline]
pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self {
self.h1_read_buf_exact_size = Some(sz);
self
}

/// Set whether HTTP/1 connections will write header names as title case at
/// the socket level.
///
Expand Down Expand Up @@ -950,6 +965,7 @@ impl Builder {
executor: self.exec.clone(),
h1_writev: self.h1_writev,
h1_title_case_headers: self.h1_title_case_headers,
h1_read_buf_exact_size: self.h1_read_buf_exact_size,
pool: Pool::new(
pool::Enabled(self.keep_alive),
pool::IdleTimeout(self.keep_alive_timeout),
Expand All @@ -968,6 +984,7 @@ impl fmt::Debug for Builder {
f.debug_struct("Builder")
.field("keep_alive", &self.keep_alive)
.field("keep_alive_timeout", &self.keep_alive_timeout)
.field("http1_read_buf_exact_size", &self.h1_read_buf_exact_size)
.field("http1_writev", &self.h1_writev)
.field("max_idle_per_host", &self.max_idle_per_host)
.field("set_host", &self.set_host)
Expand Down
4 changes: 4 additions & 0 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ where I: AsyncRead + AsyncWrite,
self.io.set_max_buf_size(max);
}

pub fn set_read_buf_exact_size(&mut self, sz: usize) {
self.io.set_read_buf_exact_size(sz);
}

pub fn set_write_strategy_flatten(&mut self) {
self.io.set_write_strategy_flatten();
}
Expand Down
88 changes: 61 additions & 27 deletions src/proto/h1/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ const MAX_BUF_LIST_BUFFERS: usize = 16;
pub struct Buffered<T, B> {
flush_pipeline: bool,
io: T,
max_buf_size: usize,
read_blocked: bool,
read_buf: BytesMut,
read_buf_strategy: ReadStrategy,
write_buf: WriteBuf<B>,
}

Expand All @@ -58,10 +58,12 @@ where
Buffered {
flush_pipeline: false,
io: io,
max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
read_blocked: false,
read_buf: BytesMut::with_capacity(0),
read_buf_strategy: ReadStrategy::Adaptive {
max: DEFAULT_MAX_BUFFER_SIZE,
},
write_buf: WriteBuf::new(),
read_blocked: false,
}
}

Expand All @@ -76,17 +78,24 @@ where
pub fn set_max_buf_size(&mut self, max: usize) {
assert!(
max >= MINIMUM_MAX_BUFFER_SIZE,
"The max_buf_size cannot be smaller than the initial buffer size."
"The max_buf_size cannot be smaller than {}.",
MINIMUM_MAX_BUFFER_SIZE,
);
self.max_buf_size = max;
self.read_buf_strategy = ReadStrategy::Adaptive {
max,
};
self.write_buf.max_buf_size = max;
}

pub fn set_read_buf_exact_size(&mut self, sz: usize) {
self.read_buf_strategy = ReadStrategy::Exact(sz);
}

pub fn set_write_strategy_flatten(&mut self) {
// this should always be called only at construction time,
// so this assert is here to catch myself
debug_assert!(self.write_buf.queue.bufs.is_empty());
self.write_buf.set_strategy(Strategy::Flatten);
self.write_buf.set_strategy(WriteStrategy::Flatten);
}

pub fn read_buf(&self) -> &[u8] {
Expand Down Expand Up @@ -140,10 +149,18 @@ where
debug!("parsed {} headers", msg.head.headers.len());
return Ok(Async::Ready(msg))
},
None => {
if self.read_buf.capacity() >= self.max_buf_size {
debug!("max_buf_size ({}) reached, closing", self.max_buf_size);
return Err(::Error::new_too_large());
None => match self.read_buf_strategy {
ReadStrategy::Adaptive { max } => {
if self.read_buf.len() >= max {
debug!("max_buf_size ({}) reached, closing", max);
return Err(::Error::new_too_large());
}
},
ReadStrategy::Exact(exact) => {
if self.read_buf.len() >= exact {
debug!("exact buf size ({}) filled, closing", exact);
return Err(::Error::new_too_large());
}
}
},
}
Expand All @@ -160,8 +177,17 @@ where
pub fn read_from_io(&mut self) -> Poll<usize, io::Error> {
use bytes::BufMut;
self.read_blocked = false;
if self.read_buf.remaining_mut() < INIT_BUFFER_SIZE {
self.read_buf.reserve(INIT_BUFFER_SIZE);
match self.read_buf_strategy {
ReadStrategy::Adaptive { .. } => {
if self.read_buf.remaining_mut() < INIT_BUFFER_SIZE {
self.read_buf.reserve(INIT_BUFFER_SIZE);
}
},
ReadStrategy::Exact(exact) => {
if self.read_buf.capacity() < exact {
self.read_buf.reserve(exact);
}
},
}
self.io.read_buf(&mut self.read_buf).map(|ok| {
match ok {
Expand Down Expand Up @@ -196,7 +222,7 @@ where
try_nb!(self.io.flush());
} else {
match self.write_buf.strategy {
Strategy::Flatten => return self.flush_flattened(),
WriteStrategy::Flatten => return self.flush_flattened(),
_ => (),
}
loop {
Expand Down Expand Up @@ -256,6 +282,14 @@ where
}
}

#[derive(Clone, Copy, Debug)]
enum ReadStrategy {
Adaptive {
max: usize
},
Exact(usize),
}

#[derive(Clone)]
pub struct Cursor<T> {
bytes: T,
Expand Down Expand Up @@ -313,7 +347,7 @@ pub(super) struct WriteBuf<B> {
max_buf_size: usize,
/// Deque of user buffers if strategy is Queue
queue: BufDeque<B>,
strategy: Strategy,
strategy: WriteStrategy,
}

impl<B> WriteBuf<B> {
Expand All @@ -322,7 +356,7 @@ impl<B> WriteBuf<B> {
headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
queue: BufDeque::new(),
strategy: Strategy::Auto,
strategy: WriteStrategy::Auto,
}
}
}
Expand All @@ -332,7 +366,7 @@ impl<B> WriteBuf<B>
where
B: Buf,
{
fn set_strategy(&mut self, strategy: Strategy) {
fn set_strategy(&mut self, strategy: WriteStrategy) {
self.strategy = strategy;
}

Expand All @@ -344,7 +378,7 @@ where
pub(super) fn buffer<BB: Buf + Into<B>>(&mut self, mut buf: BB) {
debug_assert!(buf.has_remaining());
match self.strategy {
Strategy::Flatten => {
WriteStrategy::Flatten => {
let head = self.headers_mut();
//perf: This is a little faster than <Vec as BufMut>>::put,
//but accomplishes the same result.
Expand All @@ -360,18 +394,18 @@ where
buf.advance(adv);
}
},
Strategy::Auto | Strategy::Queue => {
WriteStrategy::Auto | WriteStrategy::Queue => {
self.queue.bufs.push_back(buf.into());
},
}
}

fn can_buffer(&self) -> bool {
match self.strategy {
Strategy::Flatten => {
WriteStrategy::Flatten => {
self.remaining() < self.max_buf_size
},
Strategy::Auto | Strategy::Queue => {
WriteStrategy::Auto | WriteStrategy::Queue => {
self.queue.bufs.len() < MAX_BUF_LIST_BUFFERS
&& self.remaining() < self.max_buf_size
},
Expand Down Expand Up @@ -474,12 +508,12 @@ impl<'a, B: Buf> Buf for WriteBufAuto<'a, B> {

impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> {
fn drop(&mut self) {
if let Strategy::Auto = self.inner.strategy {
if let WriteStrategy::Auto = self.inner.strategy {
if self.bytes_vec_called.get() {
self.inner.strategy = Strategy::Queue;
self.inner.strategy = WriteStrategy::Queue;
} else if self.bytes_called.get() {
trace!("detected no usage of vectored write, flattening");
self.inner.strategy = Strategy::Flatten;
self.inner.strategy = WriteStrategy::Flatten;
self.inner.headers.bytes.put(&mut self.inner.queue);
}
}
Expand All @@ -488,7 +522,7 @@ impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> {


#[derive(Debug)]
enum Strategy {
enum WriteStrategy {
Auto,
Flatten,
Queue,
Expand Down Expand Up @@ -640,7 +674,7 @@ mod tests {

let mock = AsyncIo::new_buf(vec![], 1024);
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.write_buf.set_strategy(Strategy::Flatten);
buffered.write_buf.set_strategy(WriteStrategy::Flatten);

buffered.headers_buf().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
Expand Down Expand Up @@ -686,7 +720,7 @@ mod tests {
let mut mock = AsyncIo::new_buf(vec![], 1024);
mock.max_read_vecs(0); // disable vectored IO
let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
buffered.write_buf.set_strategy(Strategy::Queue);
buffered.write_buf.set_strategy(WriteStrategy::Queue);

// we have 4 buffers, and vec IO disabled, but explicitly said
// don't try to auto detect (via setting strategy above)
Expand All @@ -710,7 +744,7 @@ mod tests {
b.bytes = s.len() as u64;

let mut write_buf = WriteBuf::<::Chunk>::new();
write_buf.set_strategy(Strategy::Flatten);
write_buf.set_strategy(WriteStrategy::Flatten);
b.iter(|| {
let chunk = ::Chunk::from(s);
write_buf.buffer(chunk);
Expand Down

0 comments on commit 2e7250b

Please sign in to comment.