diff --git a/src/client/conn.rs b/src/client/conn.rs index fcc24407f3..0e65e3c565 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -74,6 +74,7 @@ pub struct Builder { exec: Exec, h1_writev: bool, h1_title_case_headers: bool, + h1_read_buf_exact_size: Option, http2: bool, } @@ -432,6 +433,7 @@ impl Builder { Builder { exec: Exec::Default, h1_writev: true, + h1_read_buf_exact_size: None, h1_title_case_headers: false, http2: false, } @@ -461,6 +463,10 @@ impl Builder { self } + pub(super) fn h1_read_buf_exact_size(&mut self, sz: Option) -> &mut Builder { + self.h1_read_buf_exact_size = sz; + self + } /// Sets whether HTTP2 is required. /// /// Default is false. @@ -506,6 +512,9 @@ where if self.builder.h1_title_case_headers { conn.set_title_case_headers(); } + if let Some(sz) = self.builder.h1_read_buf_exact_size { + conn.set_read_buf_exact_size(sz); + } let cd = proto::h1::dispatch::Client::new(rx); let dispatch = proto::h1::Dispatcher::new(cd, conn); Either::A(dispatch) diff --git a/src/client/mod.rs b/src/client/mod.rs index b25486f1bf..ab5e987af2 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -110,6 +110,7 @@ pub struct Client { h1_writev: bool, h1_title_case_headers: bool, pool: Pool>, + h1_read_buf_exact_size: Option, retry_canceled_requests: bool, set_host: bool, ver: Ver, @@ -460,6 +461,7 @@ where C: Connect + Sync + 'static, let pool = self.pool.clone(); let h1_writev = self.h1_writev; let h1_title_case_headers = self.h1_title_case_headers; + let h1_read_buf_exact_size = self.h1_read_buf_exact_size; let ver = self.ver; let is_ver_h2 = self.ver == Ver::Http2; let connector = self.connector.clone(); @@ -506,6 +508,7 @@ where C: Connect + Sync + 'static, .exec(executor.clone()) .h1_writev(h1_writev) .h1_title_case_headers(h1_title_case_headers) + .h1_read_buf_exact_size(h1_read_buf_exact_size) .http2_only(is_h2) .handshake(io) .and_then(move |(tx, conn)| { @@ -545,6 +548,7 @@ impl Clone for Client { connector: self.connector.clone(), executor: self.executor.clone(), h1_writev: self.h1_writev, + h1_read_buf_exact_size: self.h1_read_buf_exact_size, h1_title_case_headers: self.h1_title_case_headers, pool: self.pool.clone(), retry_canceled_requests: self.retry_canceled_requests, @@ -791,6 +795,7 @@ pub struct Builder { keep_alive_timeout: Option, h1_writev: bool, h1_title_case_headers: bool, + h1_read_buf_exact_size: Option, max_idle_per_host: usize, retry_canceled_requests: bool, set_host: bool, @@ -805,6 +810,7 @@ impl Default for Builder { keep_alive_timeout: Some(Duration::from_secs(90)), h1_writev: true, h1_title_case_headers: false, + h1_read_buf_exact_size: None, max_idle_per_host: ::std::usize::MAX, retry_canceled_requests: true, set_host: true, @@ -851,6 +857,15 @@ impl Builder { self } + /// Sets the exact size of the read buffer to *always* use. + /// + /// Default is an adaptive read buffer. + #[inline] + pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self { + self.h1_read_buf_exact_size = Some(sz); + self + } + /// Set whether HTTP/1 connections will write header names as title case at /// the socket level. /// @@ -950,6 +965,7 @@ impl Builder { executor: self.exec.clone(), h1_writev: self.h1_writev, h1_title_case_headers: self.h1_title_case_headers, + h1_read_buf_exact_size: self.h1_read_buf_exact_size, pool: Pool::new( pool::Enabled(self.keep_alive), pool::IdleTimeout(self.keep_alive_timeout), @@ -968,6 +984,7 @@ impl fmt::Debug for Builder { f.debug_struct("Builder") .field("keep_alive", &self.keep_alive) .field("keep_alive_timeout", &self.keep_alive_timeout) + .field("http1_read_buf_exact_size", &self.h1_read_buf_exact_size) .field("http1_writev", &self.h1_writev) .field("max_idle_per_host", &self.max_idle_per_host) .field("set_host", &self.set_host) diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 148d43ce8c..6e4125db86 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -63,6 +63,10 @@ where I: AsyncRead + AsyncWrite, self.io.set_max_buf_size(max); } + pub fn set_read_buf_exact_size(&mut self, sz: usize) { + self.io.set_read_buf_exact_size(sz); + } + pub fn set_write_strategy_flatten(&mut self) { self.io.set_write_strategy_flatten(); } diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index 012400e013..86664f5837 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -31,9 +31,9 @@ const MAX_BUF_LIST_BUFFERS: usize = 16; pub struct Buffered { flush_pipeline: bool, io: T, - max_buf_size: usize, read_blocked: bool, read_buf: BytesMut, + read_buf_strategy: ReadStrategy, write_buf: WriteBuf, } @@ -58,10 +58,12 @@ where Buffered { flush_pipeline: false, io: io, - max_buf_size: DEFAULT_MAX_BUFFER_SIZE, + read_blocked: false, read_buf: BytesMut::with_capacity(0), + read_buf_strategy: ReadStrategy::Adaptive { + max: DEFAULT_MAX_BUFFER_SIZE, + }, write_buf: WriteBuf::new(), - read_blocked: false, } } @@ -76,17 +78,24 @@ where pub fn set_max_buf_size(&mut self, max: usize) { assert!( max >= MINIMUM_MAX_BUFFER_SIZE, - "The max_buf_size cannot be smaller than the initial buffer size." + "The max_buf_size cannot be smaller than {}.", + MINIMUM_MAX_BUFFER_SIZE, ); - self.max_buf_size = max; + self.read_buf_strategy = ReadStrategy::Adaptive { + max, + }; self.write_buf.max_buf_size = max; } + pub fn set_read_buf_exact_size(&mut self, sz: usize) { + self.read_buf_strategy = ReadStrategy::Exact(sz); + } + pub fn set_write_strategy_flatten(&mut self) { // this should always be called only at construction time, // so this assert is here to catch myself debug_assert!(self.write_buf.queue.bufs.is_empty()); - self.write_buf.set_strategy(Strategy::Flatten); + self.write_buf.set_strategy(WriteStrategy::Flatten); } pub fn read_buf(&self) -> &[u8] { @@ -140,10 +149,18 @@ where debug!("parsed {} headers", msg.head.headers.len()); return Ok(Async::Ready(msg)) }, - None => { - if self.read_buf.capacity() >= self.max_buf_size { - debug!("max_buf_size ({}) reached, closing", self.max_buf_size); - return Err(::Error::new_too_large()); + None => match self.read_buf_strategy { + ReadStrategy::Adaptive { max } => { + if self.read_buf.len() >= max { + debug!("max_buf_size ({}) reached, closing", max); + return Err(::Error::new_too_large()); + } + }, + ReadStrategy::Exact(exact) => { + if self.read_buf.len() >= exact { + debug!("exact buf size ({}) filled, closing", exact); + return Err(::Error::new_too_large()); + } } }, } @@ -160,8 +177,17 @@ where pub fn read_from_io(&mut self) -> Poll { use bytes::BufMut; self.read_blocked = false; - if self.read_buf.remaining_mut() < INIT_BUFFER_SIZE { - self.read_buf.reserve(INIT_BUFFER_SIZE); + match self.read_buf_strategy { + ReadStrategy::Adaptive { .. } => { + if self.read_buf.remaining_mut() < INIT_BUFFER_SIZE { + self.read_buf.reserve(INIT_BUFFER_SIZE); + } + }, + ReadStrategy::Exact(exact) => { + if self.read_buf.capacity() < exact { + self.read_buf.reserve(exact); + } + }, } self.io.read_buf(&mut self.read_buf).map(|ok| { match ok { @@ -196,7 +222,7 @@ where try_nb!(self.io.flush()); } else { match self.write_buf.strategy { - Strategy::Flatten => return self.flush_flattened(), + WriteStrategy::Flatten => return self.flush_flattened(), _ => (), } loop { @@ -256,6 +282,14 @@ where } } +#[derive(Clone, Copy, Debug)] +enum ReadStrategy { + Adaptive { + max: usize + }, + Exact(usize), +} + #[derive(Clone)] pub struct Cursor { bytes: T, @@ -313,7 +347,7 @@ pub(super) struct WriteBuf { max_buf_size: usize, /// Deque of user buffers if strategy is Queue queue: BufDeque, - strategy: Strategy, + strategy: WriteStrategy, } impl WriteBuf { @@ -322,7 +356,7 @@ impl WriteBuf { headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)), max_buf_size: DEFAULT_MAX_BUFFER_SIZE, queue: BufDeque::new(), - strategy: Strategy::Auto, + strategy: WriteStrategy::Auto, } } } @@ -332,7 +366,7 @@ impl WriteBuf where B: Buf, { - fn set_strategy(&mut self, strategy: Strategy) { + fn set_strategy(&mut self, strategy: WriteStrategy) { self.strategy = strategy; } @@ -344,7 +378,7 @@ where pub(super) fn buffer>(&mut self, mut buf: BB) { debug_assert!(buf.has_remaining()); match self.strategy { - Strategy::Flatten => { + WriteStrategy::Flatten => { let head = self.headers_mut(); //perf: This is a little faster than >::put, //but accomplishes the same result. @@ -360,7 +394,7 @@ where buf.advance(adv); } }, - Strategy::Auto | Strategy::Queue => { + WriteStrategy::Auto | WriteStrategy::Queue => { self.queue.bufs.push_back(buf.into()); }, } @@ -368,10 +402,10 @@ where fn can_buffer(&self) -> bool { match self.strategy { - Strategy::Flatten => { + WriteStrategy::Flatten => { self.remaining() < self.max_buf_size }, - Strategy::Auto | Strategy::Queue => { + WriteStrategy::Auto | WriteStrategy::Queue => { self.queue.bufs.len() < MAX_BUF_LIST_BUFFERS && self.remaining() < self.max_buf_size }, @@ -474,12 +508,12 @@ impl<'a, B: Buf> Buf for WriteBufAuto<'a, B> { impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> { fn drop(&mut self) { - if let Strategy::Auto = self.inner.strategy { + if let WriteStrategy::Auto = self.inner.strategy { if self.bytes_vec_called.get() { - self.inner.strategy = Strategy::Queue; + self.inner.strategy = WriteStrategy::Queue; } else if self.bytes_called.get() { trace!("detected no usage of vectored write, flattening"); - self.inner.strategy = Strategy::Flatten; + self.inner.strategy = WriteStrategy::Flatten; self.inner.headers.bytes.put(&mut self.inner.queue); } } @@ -488,7 +522,7 @@ impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> { #[derive(Debug)] -enum Strategy { +enum WriteStrategy { Auto, Flatten, Queue, @@ -640,7 +674,7 @@ mod tests { let mock = AsyncIo::new_buf(vec![], 1024); let mut buffered = Buffered::<_, Cursor>>::new(mock); - buffered.write_buf.set_strategy(Strategy::Flatten); + buffered.write_buf.set_strategy(WriteStrategy::Flatten); buffered.headers_buf().extend(b"hello "); buffered.buffer(Cursor::new(b"world, ".to_vec())); @@ -686,7 +720,7 @@ mod tests { let mut mock = AsyncIo::new_buf(vec![], 1024); mock.max_read_vecs(0); // disable vectored IO let mut buffered = Buffered::<_, Cursor>>::new(mock); - buffered.write_buf.set_strategy(Strategy::Queue); + buffered.write_buf.set_strategy(WriteStrategy::Queue); // we have 4 buffers, and vec IO disabled, but explicitly said // don't try to auto detect (via setting strategy above) @@ -710,7 +744,7 @@ mod tests { b.bytes = s.len() as u64; let mut write_buf = WriteBuf::<::Chunk>::new(); - write_buf.set_strategy(Strategy::Flatten); + write_buf.set_strategy(WriteStrategy::Flatten); b.iter(|| { let chunk = ::Chunk::from(s); write_buf.buffer(chunk);