Skip to content

Commit

Permalink
perf(lib): re-enable writev support (#2338)
Browse files Browse the repository at this point in the history
Tokio's `AsyncWrite` trait once again has support for vectored writes in
Tokio 0.3.4 (see tokio-rs/tokio#3149).

This branch re-enables vectored writes in Hyper for HTTP/1. Using
vectored writes in HTTP/2 will require an upstream change in the `h2`
crate as well.

I've removed the adaptive write buffer implementation
that attempts to detect whether vectored IO is or is not available,
since the Tokio 0.3.4 `AsyncWrite` trait exposes this directly via the
`is_write_vectored` method. Now, we just ask the IO whether or not it
supports vectored writes, and configure the buffer accordingly. This
makes the implementation somewhat simpler.

This also removes `http1_writev()` methods from the builders. These are
no longer necessary, as Hyper can now determine whether or not
to use vectored writes based on `is_write_vectored`, rather than trying
to auto-detect it.

Closes #2320 

BREAKING CHANGE: Removed `http1_writev` methods from `client::Builder`,
  `client::conn::Builder`, `server::Builder`, and `server::conn::Builder`.
  
  Vectored writes are now enabled based on whether the `AsyncWrite`
  implementation in use supports them, rather than though adaptive
  detection. To explicitly disable vectored writes, users may wrap the IO
  in a newtype that implements `AsyncRead` and `AsyncWrite` and returns
  `false` from its `AsyncWrite::is_write_vectored` method.
  • Loading branch information
hawkw authored Nov 24, 2020
1 parent 121c331 commit d6aadb8
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 217 deletions.
14 changes: 0 additions & 14 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ where
#[derive(Clone, Debug)]
pub struct Builder {
pub(super) exec: Exec,
h1_writev: Option<bool>,
h1_title_case_headers: bool,
h1_read_buf_exact_size: Option<usize>,
h1_max_buf_size: Option<usize>,
Expand Down Expand Up @@ -453,7 +452,6 @@ impl Builder {
pub fn new() -> Builder {
Builder {
exec: Exec::Default,
h1_writev: None,
h1_read_buf_exact_size: None,
h1_title_case_headers: false,
h1_max_buf_size: None,
Expand All @@ -475,11 +473,6 @@ impl Builder {
self
}

pub(super) fn h1_writev(&mut self, enabled: bool) -> &mut Builder {
self.h1_writev = Some(enabled);
self
}

pub(super) fn h1_title_case_headers(&mut self, enabled: bool) -> &mut Builder {
self.h1_title_case_headers = enabled;
self
Expand Down Expand Up @@ -663,13 +656,6 @@ impl Builder {
#[cfg(feature = "http1")]
Proto::Http1 => {
let mut conn = proto::Conn::new(io);
if let Some(writev) = opts.h1_writev {
if writev {
conn.set_write_strategy_queue();
} else {
conn.set_write_strategy_flatten();
}
}
if opts.h1_title_case_headers {
conn.set_title_case_headers();
}
Expand Down
19 changes: 1 addition & 18 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use http::{Method, Request, Response, Uri, Version};
use self::connect::{sealed::Connect, Alpn, Connected, Connection};
use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation};
use crate::body::{Body, HttpBody};
use crate::common::{lazy as hyper_lazy, task, exec::BoxSendFuture, Future, Lazy, Pin, Poll};
use crate::common::{exec::BoxSendFuture, lazy as hyper_lazy, task, Future, Lazy, Pin, Poll};
use crate::rt::Executor;

#[cfg(feature = "tcp")]
Expand Down Expand Up @@ -987,23 +987,6 @@ impl Builder {

// HTTP/1 options

/// Set whether HTTP/1 connections should try to use vectored writes,
/// or always flatten into a single buffer.
///
/// Note that setting this to false may mean more copies of body data,
/// but may also improve performance when an IO transport doesn't
/// support vectored writes well, such as most TLS implementations.
///
/// Setting this to true will force hyper to use queued strategy
/// which may eliminate unnecessary cloning on some TLS backends
///
/// Default is `auto`. In this mode hyper will try to guess which
/// mode to use
pub fn http1_writev(&mut self, val: bool) -> &mut Self {
self.conn_builder.h1_writev(val);
self
}

/// Sets the exact size of the read buffer to *always* use.
///
/// Note that setting this option unsets the `http1_max_buf_size` option.
Expand Down
1 change: 1 addition & 0 deletions src/common/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod rewind;

pub(crate) use self::rewind::Rewind;
pub(crate) const MAX_WRITEV_BUFS: usize = 64;
12 changes: 12 additions & 0 deletions src/common/io/rewind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,25 @@ where
Pin::new(&mut self.inner).poll_write(cx, buf)
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
}

fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored()
}
}

#[cfg(test)]
Expand Down
8 changes: 0 additions & 8 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,6 @@ where
self.io.set_read_buf_exact_size(sz);
}

pub fn set_write_strategy_flatten(&mut self) {
self.io.set_write_strategy_flatten();
}

pub fn set_write_strategy_queue(&mut self) {
self.io.set_write_strategy_queue();
}

#[cfg(feature = "client")]
pub fn set_title_case_headers(&mut self) {
self.state.title_case_headers = true;
Expand Down
160 changes: 34 additions & 126 deletions src/proto/h1/io.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::cell::Cell;
use std::cmp;
use std::fmt;
use std::io::{self, IoSlice};
Expand Down Expand Up @@ -57,13 +56,14 @@ where
B: Buf,
{
pub fn new(io: T) -> Buffered<T, B> {
let write_buf = WriteBuf::new(&io);
Buffered {
flush_pipeline: false,
io,
read_blocked: false,
read_buf: BytesMut::with_capacity(0),
read_buf_strategy: ReadStrategy::default(),
write_buf: WriteBuf::new(),
write_buf,
}
}

Expand Down Expand Up @@ -98,13 +98,6 @@ where
self.write_buf.set_strategy(WriteStrategy::Flatten);
}

pub fn set_write_strategy_queue(&mut self) {
// this should always be called only at construction time,
// so this assert is here to catch myself
debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
self.write_buf.set_strategy(WriteStrategy::Queue);
}

pub fn read_buf(&self) -> &[u8] {
self.read_buf.as_ref()
}
Expand Down Expand Up @@ -237,13 +230,13 @@ where
if let WriteStrategy::Flatten = self.write_buf.strategy {
return self.poll_flush_flattened(cx);
}

loop {
// TODO(eliza): this basically ignores all of `WriteBuf`...put
// back vectored IO and `poll_write_buf` when the appropriate Tokio
// changes land...
let n = ready!(Pin::new(&mut self.io)
// .poll_write_buf(cx, &mut self.write_buf.auto()))?;
.poll_write(cx, self.write_buf.auto().bytes()))?;
let n = {
let mut iovs = [IoSlice::new(&[]); crate::common::io::MAX_WRITEV_BUFS];
let len = self.write_buf.bytes_vectored(&mut iovs);
ready!(Pin::new(&mut self.io).poll_write_vectored(cx, &iovs[..len]))?
};
// TODO(eliza): we have to do this manually because
// `poll_write_buf` doesn't exist in Tokio 0.3 yet...when
// `poll_write_buf` comes back, the manual advance will need to leave!
Expand Down Expand Up @@ -462,12 +455,17 @@ pub(super) struct WriteBuf<B> {
}

impl<B: Buf> WriteBuf<B> {
fn new() -> WriteBuf<B> {
fn new(io: &impl AsyncWrite) -> WriteBuf<B> {
let strategy = if io.is_write_vectored() {
WriteStrategy::Queue
} else {
WriteStrategy::Flatten
};
WriteBuf {
headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
queue: BufList::new(),
strategy: WriteStrategy::Auto,
strategy,
}
}
}
Expand All @@ -480,12 +478,6 @@ where
self.strategy = strategy;
}

// TODO(eliza): put back writev!
#[inline]
fn auto(&mut self) -> WriteBufAuto<'_, B> {
WriteBufAuto::new(self)
}

pub(super) fn buffer<BB: Buf + Into<B>>(&mut self, mut buf: BB) {
debug_assert!(buf.has_remaining());
match self.strategy {
Expand All @@ -505,7 +497,7 @@ where
buf.advance(adv);
}
}
WriteStrategy::Auto | WriteStrategy::Queue => {
WriteStrategy::Queue => {
self.queue.push(buf.into());
}
}
Expand All @@ -514,7 +506,7 @@ where
fn can_buffer(&self) -> bool {
match self.strategy {
WriteStrategy::Flatten => self.remaining() < self.max_buf_size,
WriteStrategy::Auto | WriteStrategy::Queue => {
WriteStrategy::Queue => {
self.queue.bufs_cnt() < MAX_BUF_LIST_BUFFERS && self.remaining() < self.max_buf_size
}
}
Expand Down Expand Up @@ -573,65 +565,8 @@ impl<B: Buf> Buf for WriteBuf<B> {
}
}

/// Detects when wrapped `WriteBuf` is used for vectored IO, and
/// adjusts the `WriteBuf` strategy if not.
struct WriteBufAuto<'a, B: Buf> {
bytes_called: Cell<bool>,
bytes_vec_called: Cell<bool>,
inner: &'a mut WriteBuf<B>,
}

impl<'a, B: Buf> WriteBufAuto<'a, B> {
fn new(inner: &'a mut WriteBuf<B>) -> WriteBufAuto<'a, B> {
WriteBufAuto {
bytes_called: Cell::new(false),
bytes_vec_called: Cell::new(false),
inner,
}
}
}

impl<'a, B: Buf> Buf for WriteBufAuto<'a, B> {
#[inline]
fn remaining(&self) -> usize {
self.inner.remaining()
}

#[inline]
fn bytes(&self) -> &[u8] {
self.bytes_called.set(true);
self.inner.bytes()
}

#[inline]
fn advance(&mut self, cnt: usize) {
self.inner.advance(cnt)
}

#[inline]
fn bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
self.bytes_vec_called.set(true);
self.inner.bytes_vectored(dst)
}
}

impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> {
fn drop(&mut self) {
if let WriteStrategy::Auto = self.inner.strategy {
if self.bytes_vec_called.get() {
self.inner.strategy = WriteStrategy::Queue;
} else if self.bytes_called.get() {
trace!("detected no usage of vectored write, flattening");
self.inner.strategy = WriteStrategy::Flatten;
self.inner.headers.bytes.put(&mut self.inner.queue);
}
}
}
}

#[derive(Debug)]
enum WriteStrategy {
Auto,
Flatten,
Queue,
}
Expand All @@ -643,8 +578,8 @@ mod tests {

use tokio_test::io::Builder as Mock;

#[cfg(feature = "nightly")]
use test::Bencher;
// #[cfg(feature = "nightly")]
// use test::Bencher;

/*
impl<T: Read> MemRead for AsyncIo<T> {
Expand Down Expand Up @@ -873,33 +808,6 @@ mod tests {
buffered.flush().await.expect("flush");
}

#[tokio::test]
async fn write_buf_auto_flatten() {
let _ = pretty_env_logger::try_init();

let mock = Mock::new()
// Expects write_buf to only consume first buffer
.write(b"hello ")
// And then the Auto strategy will have flattened
.write(b"world, it's hyper!")
.build();

let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);

// we have 4 buffers, but hope to detect that vectored IO isn't
// being used, and switch to flattening automatically,
// resulting in only 2 writes
buffered.headers_buf().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.buffer(Cursor::new(b"it's ".to_vec()));
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);

buffered.flush().await.expect("flush");

assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
}

#[tokio::test]
async fn write_buf_queue_disable_auto() {
let _ = pretty_env_logger::try_init();
Expand Down Expand Up @@ -928,19 +836,19 @@ mod tests {
assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
}

#[cfg(feature = "nightly")]
#[bench]
fn bench_write_buf_flatten_buffer_chunk(b: &mut Bencher) {
let s = "Hello, World!";
b.bytes = s.len() as u64;

let mut write_buf = WriteBuf::<bytes::Bytes>::new();
write_buf.set_strategy(WriteStrategy::Flatten);
b.iter(|| {
let chunk = bytes::Bytes::from(s);
write_buf.buffer(chunk);
::test::black_box(&write_buf);
write_buf.headers.bytes.clear();
})
}
// #[cfg(feature = "nightly")]
// #[bench]
// fn bench_write_buf_flatten_buffer_chunk(b: &mut Bencher) {
// let s = "Hello, World!";
// b.bytes = s.len() as u64;

// let mut write_buf = WriteBuf::<bytes::Bytes>::new();
// write_buf.set_strategy(WriteStrategy::Flatten);
// b.iter(|| {
// let chunk = bytes::Bytes::from(s);
// write_buf.buffer(chunk);
// ::test::black_box(&write_buf);
// write_buf.headers.bytes.clear();
// })
// }
}
Loading

0 comments on commit d6aadb8

Please sign in to comment.