Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): change http1_half_close option default to disabled #1981

Merged
merged 1 commit into from
Oct 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ where I: AsyncRead + AsyncWrite + Unpin,
Conn {
io: Buffered::new(io),
state: State {
allow_half_close: true,
allow_half_close: false,
cached_headers: None,
error: None,
keep_alive: KA::Busy,
Expand Down Expand Up @@ -76,8 +76,8 @@ where I: AsyncRead + AsyncWrite + Unpin,
self.state.title_case_headers = true;
}

pub(crate) fn set_disable_half_close(&mut self) {
self.state.allow_half_close = false;
pub(crate) fn set_allow_half_close(&mut self) {
self.state.allow_half_close = true;
}

pub fn into_inner(self) -> (I, Bytes) {
Expand Down Expand Up @@ -172,7 +172,7 @@ where I: AsyncRead + AsyncWrite + Unpin,
// message should be reported as an error. If not, it is just
// the connection closing gracefully.
let must_error = self.should_error_on_eof();
self.state.close_read();
self.close_read();
self.io.consume_leading_lines();
let was_mid_parse = e.is_parse() || !self.io.read_buf().is_empty();
if was_mid_parse || must_error {
Expand All @@ -185,6 +185,7 @@ where I: AsyncRead + AsyncWrite + Unpin,
}
} else {
debug!("read eof");
self.close_write();
Poll::Ready(None)
}
}
Expand All @@ -204,7 +205,7 @@ where I: AsyncRead + AsyncWrite + Unpin,
None
})
} else if slice.is_empty() {
error!("decode stream unexpectedly ended");
error!("incoming body unexpectedly ended");
// This should be unreachable, since all 3 decoders
// either set eof=true or return an Err when reading
// an empty slice...
Expand All @@ -216,7 +217,7 @@ where I: AsyncRead + AsyncWrite + Unpin,
},
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => {
debug!("decode stream error: {}", e);
debug!("incoming body decode error: {}", e);
(Reading::Closed, Poll::Ready(Some(Err(e))))
},
}
Expand Down Expand Up @@ -294,6 +295,10 @@ where I: AsyncRead + AsyncWrite + Unpin,
return Poll::Pending;
}

if self.state.is_read_closed() {
return Poll::Ready(Err(crate::Error::new_incomplete()));
}

let num_read = ready!(self.force_io_read(cx)).map_err(crate::Error::new_io)?;

if num_read == 0 {
Expand All @@ -306,6 +311,8 @@ where I: AsyncRead + AsyncWrite + Unpin,
}

fn force_io_read(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> {
debug_assert!(!self.state.is_read_closed());

let result = ready!(self.io.poll_read_from_io(cx));
Poll::Ready(result.map_err(|e| {
trace!("force_io_read; io error = {:?}", e);
Expand Down Expand Up @@ -619,8 +626,10 @@ where I: AsyncRead + AsyncWrite + Unpin,

pub fn disable_keep_alive(&mut self) {
if self.state.is_idle() {
self.state.close_read();
trace!("disable_keep_alive; closing idle connection");
self.state.close();
} else {
trace!("disable_keep_alive; in-progress connection");
self.state.disable_keep_alive();
}
}
Expand Down
14 changes: 12 additions & 2 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ where
}

pub fn disable_keep_alive(&mut self) {
self.conn.disable_keep_alive()
self.conn.disable_keep_alive();
if self.conn.is_write_closed() {
self.close();
}
}

pub fn into_inner(self) -> (I, Bytes, D) {
Expand Down Expand Up @@ -233,10 +236,17 @@ where
// if here, the dispatcher gave the user the error
// somewhere else. we still need to shutdown, but
// not as a second error.
self.close();
Poll::Ready(Ok(()))
},
None => {
// read eof, conn will start to shutdown automatically
// read eof, the write side will have been closed too unless
// allow_read_close was set to true, in which case just do
// nothing...
debug_assert!(self.conn.is_read_closed());
if self.conn.is_write_closed() {
self.close();
}
Poll::Ready(Ok(()))
}
}
Expand Down
15 changes: 7 additions & 8 deletions src/server/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl Http {

Http {
exec: Exec::Default,
h1_half_close: true,
h1_half_close: false,
h1_writev: true,
h2_builder,
mode: ConnectionMode::Fallback,
Expand All @@ -221,12 +221,11 @@ impl<E> Http<E> {
/// Set whether HTTP/1 connections should support half-closures.
///
/// Clients can chose to shutdown their write-side while waiting
/// for the server to respond. Setting this to `false` will
/// automatically close any connection immediately if `read`
/// detects an EOF.
/// for the server to respond. Setting this to `true` will
/// prevent closing the connection immediately if `read`
/// detects an EOF in the middle of a request.
///
/// Default is `true`.
#[inline]
/// Default is `false`.
pub fn http1_half_close(&mut self, val: bool) -> &mut Self {
self.h1_half_close = val;
self
Expand Down Expand Up @@ -390,8 +389,8 @@ impl<E> Http<E> {
if !self.keep_alive {
conn.disable_keep_alive();
}
if !self.h1_half_close {
conn.set_disable_half_close();
if self.h1_half_close {
conn.set_allow_half_close();
}
if !self.h1_writev {
conn.set_write_strategy_flatten();
Expand Down
8 changes: 4 additions & 4 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,11 @@ impl<I, E> Builder<I, E> {
/// Set whether HTTP/1 connections should support half-closures.
///
/// Clients can chose to shutdown their write-side while waiting
/// for the server to respond. Setting this to `false` will
/// automatically close any connection immediately if `read`
/// detects an EOF.
/// for the server to respond. Setting this to `true` will
/// prevent closing the connection immediately if `read`
/// detects an EOF in the middle of a request.
///
/// Default is `true`.
/// Default is `false`.
pub fn http1_half_close(mut self, val: bool) -> Self {
self.protocol.http1_half_close(val);
self
Expand Down
52 changes: 28 additions & 24 deletions tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ fn disable_keep_alive_post_request() {

#[test]
fn empty_parse_eof_does_not_return_error() {
let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap();
Expand Down Expand Up @@ -983,13 +984,13 @@ fn nonempty_parse_eof_returns_error() {
}

#[test]
fn socket_half_closed() {
fn http1_allow_half_close() {
let _ = pretty_env_logger::try_init();
let mut rt = Runtime::new().unwrap();
let listener = tcp_bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = listener.local_addr().unwrap();

thread::spawn(move || {
let t1 = thread::spawn(move || {
let mut tcp = connect(&addr);
tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap();
tcp.shutdown(::std::net::Shutdown::Write).expect("SHDN_WR");
Expand All @@ -1005,13 +1006,16 @@ fn socket_half_closed() {
.map(Option::unwrap)
.map_err(|_| unreachable!())
.and_then(|socket| {
Http::new().serve_connection(socket, service_fn(|_| {
Http::new()
.http1_half_close(true)
.serve_connection(socket, service_fn(|_| {
tokio_timer::delay_for(Duration::from_millis(500))
.map(|_| Ok::<_, hyper::Error>(Response::new(Body::empty())))
}))
});

rt.block_on(fut).unwrap();
t1.join().expect("client thread");
}

#[test]
Expand Down Expand Up @@ -1852,28 +1856,28 @@ impl tower_service::Service<Request<Body>> for TestService {
Ok(()).into()
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
let tx1 = self.tx.clone();
let tx2 = self.tx.clone();
fn call(&mut self, mut req: Request<Body>) -> Self::Future {
let tx = self.tx.clone();
let replies = self.reply.clone();
req
.into_body()
.try_concat()
.map_ok(move |chunk| {
tx1.send(Msg::Chunk(chunk.to_vec())).unwrap();
()
})
.map(move |result| {
let msg = match result {
Ok(()) => Msg::End,
Err(e) => Msg::Error(e),
};
tx2.send(msg).unwrap();
})
.map(move |_| {
TestService::build_reply(replies)
})
.boxed()
hyper::rt::spawn(async move {
while let Some(chunk) = req.body_mut().next().await {
match chunk {
Ok(chunk) => {
tx.send(Msg::Chunk(chunk.to_vec())).unwrap();
},
Err(err) => {
tx.send(Msg::Error(err)).unwrap();
return;
},
}
}

tx.send(Msg::End).unwrap();
});

Box::pin(async move {
TestService::build_reply(replies)
})
}
}

Expand Down