diff --git a/Cargo.toml b/Cargo.toml index bb61ce7980..5e481084f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ futures = "0.1.17" futures-cpupool = "0.1.6" http = { version = "0.1", optional = true } httparse = "1.0" +iovec = "0.1" language-tags = "0.2" log = "0.4" mime = "0.3.2" diff --git a/benches/server.rs b/benches/server.rs index 481493bfdc..1542ff7071 100644 --- a/benches/server.rs +++ b/benches/server.rs @@ -3,6 +3,7 @@ extern crate futures; extern crate hyper; +extern crate pretty_env_logger; extern crate test; use std::io::{Read, Write}; @@ -17,6 +18,7 @@ use hyper::server::{self, Service}; macro_rules! bench_server { ($b:ident, $header:expr, $body:expr) => ({ + let _ = pretty_env_logger::try_init(); let (_until_tx, until_rx) = oneshot::channel(); let addr = { let (addr_tx, addr_rx) = mpsc::channel(); @@ -53,7 +55,7 @@ macro_rules! bench_server { sum += tcp.read(&mut buf).unwrap(); } assert_eq!(sum, total_bytes); - }) + }); }) } diff --git a/src/lib.rs b/src/lib.rs index 2785b804ac..3f8ea04f31 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,6 +24,7 @@ extern crate futures_cpupool; #[cfg(feature = "compat")] extern crate http; extern crate httparse; +extern crate iovec; extern crate language_tags; #[macro_use] extern crate log; pub extern crate mime; diff --git a/src/mock.rs b/src/mock.rs index 57830a900a..e44351ee61 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -31,6 +31,12 @@ impl ::std::ops::Deref for Buf { } } +impl AsRef<[u8]> for Buf { + fn as_ref(&self) -> &[u8] { + &self.vec + } +} + impl> PartialEq for Buf { fn eq(&self, other: &S) -> bool { self.vec == other.as_ref() @@ -110,6 +116,13 @@ impl AsyncIo { } } +impl, T: AsRef<[u8]>> PartialEq for AsyncIo { + fn eq(&self, other: &S) -> bool { + self.inner.as_ref() == other.as_ref() + } +} + + impl Read for AsyncIo { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.blocked = false; @@ -156,6 +169,46 @@ impl AsyncWrite for AsyncIo { fn shutdown(&mut self) -> Poll<(), io::Error> { Ok(().into()) } + + fn write_buf(&mut self, buf: &mut B) -> Poll { + use futures::Async; + let r = { + static DUMMY: &[u8] = &[0]; + let mut bufs = [From::from(DUMMY); 64]; + let i = ::bytes::Buf::bytes_vec(&buf, &mut bufs); + let mut n = 0; + let mut ret = Ok(0); + for iovec in &bufs[..i] { + match self.write(iovec) { + Ok(num) => { + n += num; + ret = Ok(n); + }, + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + if let Ok(0) = ret { + ret = Err(e); + } + } else { + ret = Err(e); + } + break; + } + } + } + ret + }; + match r { + Ok(n) => { + ::bytes::Buf::advance(buf, n); + Ok(Async::Ready(n)) + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + Ok(Async::NotReady) + } + Err(e) => Err(e), + } + } } impl ::std::ops::Deref for AsyncIo { diff --git a/src/proto/conn.rs b/src/proto/conn.rs index 0ecbb5d8c1..e63c471342 100644 --- a/src/proto/conn.rs +++ b/src/proto/conn.rs @@ -1,5 +1,5 @@ use std::fmt; -use std::io::{self, Write}; +use std::io::{self}; use std::marker::PhantomData; use futures::{Async, AsyncSink, Poll, StartSend}; @@ -12,7 +12,7 @@ use tokio_proto::streaming::pipeline::{Frame, Transport}; use proto::Http1Transaction; use super::io::{Cursor, Buffered}; -use super::h1::{Encoder, Decoder}; +use super::h1::{EncodedBuf, Encoder, Decoder}; use method::Method; use version::HttpVersion; @@ -25,8 +25,8 @@ use version::HttpVersion; /// determine if this connection can be kept alive after the message, /// or if it is complete. pub struct Conn { - io: Buffered, - state: State, + io: Buffered>>, + state: State, _marker: PhantomData } @@ -103,8 +103,6 @@ where I: AsyncRead + AsyncWrite, error: err, }))), }; - } else if self.can_write_continue() { - try_nb!(self.flush()); } else if self.can_read_body() { return self.read_body() .map(|async| async.map(|chunk| Some(Frame::Body { @@ -153,13 +151,6 @@ where I: AsyncRead + AsyncWrite, } } - pub fn can_write_continue(&self) -> bool { - match self.state.writing { - Writing::Continue(..) => true, - _ => false, - } - } - pub fn can_read_body(&self) -> bool { match self.state.reading { Reading::Body(..) => true, @@ -228,7 +219,7 @@ where I: AsyncRead + AsyncWrite, self.state.busy(); if head.expecting_continue() { let msg = b"HTTP/1.1 100 Continue\r\n\r\n"; - self.state.writing = Writing::Continue(Cursor::new(msg)); + self.io.write_buf_mut().extend_from_slice(msg); } let wants_keep_alive = head.should_keep_alive(); self.state.keep_alive &= wants_keep_alive; @@ -370,9 +361,7 @@ where I: AsyncRead + AsyncWrite, }; match self.state.writing { - Writing::Continue(..) | - Writing::Body(..) | - Writing::Ending(..) => return, + Writing::Body(..) => return, Writing::Init | Writing::KeepAlive | Writing::Closed => (), @@ -408,7 +397,7 @@ where I: AsyncRead + AsyncWrite, pub fn can_write_head(&self) -> bool { match self.state.writing { - Writing::Continue(..) | Writing::Init => true, + Writing::Init => true, _ => false } } @@ -416,19 +405,14 @@ where I: AsyncRead + AsyncWrite, pub fn can_write_body(&self) -> bool { match self.state.writing { Writing::Body(..) => true, - Writing::Continue(..) | Writing::Init | - Writing::Ending(..) | Writing::KeepAlive | Writing::Closed => false, } } - pub fn has_queued_body(&self) -> bool { - match self.state.writing { - Writing::Body(_, Some(_)) => true, - _ => false, - } + pub fn can_buffer_body(&self) -> bool { + self.io.can_buffer() } pub fn write_head(&mut self, mut head: super::MessageHead, body: bool) { @@ -437,17 +421,10 @@ where I: AsyncRead + AsyncWrite, self.enforce_version(&mut head); let buf = self.io.write_buf_mut(); - // if a 100-continue has started but not finished sending, tack the - // remainder on to the start of the buffer. - if let Writing::Continue(ref pending) = self.state.writing { - if pending.has_started() { - buf.extend_from_slice(pending.buf()); - } - } self.state.writing = match T::encode(head, body, &mut self.state.method, buf) { Ok(encoder) => { if !encoder.is_eof() { - Writing::Body(encoder, None) + Writing::Body(encoder) } else { Writing::KeepAlive } @@ -492,46 +469,26 @@ where I: AsyncRead + AsyncWrite, pub fn write_body(&mut self, chunk: Option) -> StartSend, io::Error> { debug_assert!(self.can_write_body()); - if self.has_queued_body() { - try!(self.flush()); - - if !self.can_write_body() { - if chunk.as_ref().map(|c| c.as_ref().len()).unwrap_or(0) == 0 { - return Ok(AsyncSink::NotReady(chunk)); - } else { + if !self.can_buffer_body() { + if let Async::NotReady = self.flush()? { + // if chunk is Some(&[]), aka empty, whatever, just skip it + if chunk.as_ref().map(|c| c.as_ref().is_empty()).unwrap_or(false) { return Ok(AsyncSink::Ready); + } else { + return Ok(AsyncSink::NotReady(chunk)); } } } let state = match self.state.writing { - Writing::Body(ref mut encoder, ref mut queued) => { - if queued.is_some() { - return Ok(AsyncSink::NotReady(chunk)); - } + Writing::Body(ref mut encoder) => { if let Some(chunk) = chunk { if chunk.as_ref().is_empty() { return Ok(AsyncSink::Ready); } - let mut cursor = Cursor::new(chunk); - match encoder.encode(&mut self.io, cursor.buf()) { - Ok(n) => { - cursor.consume(n); - - if !cursor.is_written() { - trace!("Conn::start_send frame not written, queued"); - *queued = Some(cursor); - } - }, - Err(e) => match e.kind() { - io::ErrorKind::WouldBlock => { - trace!("Conn::start_send frame not written, queued"); - *queued = Some(cursor); - }, - _ => return Err(e) - } - } + let encoded = encoder.encode(Cursor::new(chunk)); + self.io.buffer(encoded); if encoder.is_eof() { Writing::KeepAlive @@ -541,8 +498,12 @@ where I: AsyncRead + AsyncWrite, } else { // end of stream, that means we should try to eof match encoder.end() { - Ok(Some(end)) => Writing::Ending(Cursor::new(end)), - Ok(None) => Writing::KeepAlive, + Ok(end) => { + if let Some(end) = end { + self.io.buffer(end); + } + Writing::KeepAlive + }, Err(_not_eof) => Writing::Closed, } } @@ -575,61 +536,11 @@ where I: AsyncRead + AsyncWrite, Err(err) } - fn write_queued(&mut self) -> Poll<(), io::Error> { - trace!("Conn::write_queued()"); - let state = match self.state.writing { - Writing::Continue(ref mut queued) => { - let n = self.io.buffer(queued.buf()); - queued.consume(n); - if queued.is_written() { - Writing::Init - } else { - return Ok(Async::NotReady); - } - } - Writing::Body(ref mut encoder, ref mut queued) => { - let complete = if let Some(chunk) = queued.as_mut() { - let n = try_nb!(encoder.encode(&mut self.io, chunk.buf())); - chunk.consume(n); - chunk.is_written() - } else { - true - }; - trace!("Conn::write_queued complete = {}", complete); - return if complete { - *queued = None; - Ok(Async::Ready(())) - } else { - Ok(Async::NotReady) - }; - }, - Writing::Ending(ref mut ending) => { - let n = self.io.buffer(ending.buf()); - ending.consume(n); - if ending.is_written() { - Writing::KeepAlive - } else { - return Ok(Async::NotReady); - } - }, - _ => return Ok(Async::Ready(())), - }; - self.state.writing = state; - Ok(Async::Ready(())) - } - pub fn flush(&mut self) -> Poll<(), io::Error> { - loop { - let queue_finished = try!(self.write_queued()).is_ready(); - try_nb!(self.io.flush()); - if queue_finished { - break; - } - } + try_ready!(self.io.flush()); self.try_keep_alive(); trace!("flushed {:?}", self.state); Ok(Async::Ready(())) - } pub fn shutdown(&mut self) -> Poll<(), io::Error> { @@ -740,7 +651,7 @@ where I: AsyncRead + AsyncWrite, }, }; - error!("writing illegal frame; state={:?}, frame={:?}", self.state.writing, DebugFrame(&frame)); + warn!("writing illegal frame; state={:?}, frame={:?}", self.state.writing, DebugFrame(&frame)); Err(io::Error::new(io::ErrorKind::InvalidInput, "illegal frame")) } @@ -778,13 +689,13 @@ impl, T, K: KeepAlive> fmt::Debug for Conn { } } -struct State { +struct State { error: Option<::Error>, keep_alive: K, method: Option, read_task: Option, reading: Reading, - writing: Writing, + writing: Writing, version: Version, } @@ -796,16 +707,14 @@ enum Reading { Closed, } -enum Writing { - Continue(Cursor<&'static [u8]>), +enum Writing { Init, - Body(Encoder, Option>), - Ending(Cursor<&'static [u8]>), + Body(Encoder), KeepAlive, Closed, } -impl, K: KeepAlive> fmt::Debug for State { +impl fmt::Debug for State { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("State") .field("reading", &self.reading) @@ -818,19 +727,12 @@ impl, K: KeepAlive> fmt::Debug for State { } } -impl> fmt::Debug for Writing { +impl fmt::Debug for Writing { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { - Writing::Continue(ref buf) => f.debug_tuple("Continue") - .field(buf) - .finish(), Writing::Init => f.write_str("Init"), - Writing::Body(ref enc, ref queued) => f.debug_tuple("Body") + Writing::Body(ref enc) => f.debug_tuple("Body") .field(enc) - .field(queued) - .finish(), - Writing::Ending(ref ending) => f.debug_tuple("Ending") - .field(ending) .finish(), Writing::KeepAlive => f.write_str("KeepAlive"), Writing::Closed => f.write_str("Closed"), @@ -884,7 +786,7 @@ impl KeepAlive for KA { } } -impl State { +impl State { fn close(&mut self) { trace!("State::close()"); self.reading = Reading::Closed; @@ -1031,15 +933,6 @@ mod tests { use std::str::FromStr; - impl Writing { - fn is_queued(&self) -> bool { - match *self { - Writing::Body(_, Some(_)) => true, - _ => false, - } - } - } - #[test] fn test_conn_init_read() { let good_message = b"GET / HTTP/1.1\r\n\r\n".to_vec(); @@ -1226,13 +1119,10 @@ mod tests { let io = AsyncIo::new_buf(vec![], 0); let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io, Default::default()); let max = ::proto::io::DEFAULT_MAX_BUFFER_SIZE + 4096; - conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64), None); - - assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 8].into()) }).unwrap().is_ready()); - assert!(!conn.state.writing.is_queued()); + conn.state.writing = Writing::Body(Encoder::length((max * 2) as u64)); - assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; max].into()) }).unwrap().is_ready()); - assert!(conn.state.writing.is_queued()); + assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; max].into()) }).unwrap().is_ready()); + assert!(!conn.can_buffer_body()); assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'b'; 1024 * 8].into()) }).unwrap().is_not_ready()); @@ -1253,7 +1143,7 @@ mod tests { let _: Result<(), ()> = future::lazy(|| { let io = AsyncIo::new_buf(vec![], 4096); let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io, Default::default()); - conn.state.writing = Writing::Body(Encoder::chunked(), None); + conn.state.writing = Writing::Body(Encoder::chunked()); assert!(conn.start_send(Frame::Body { chunk: Some("headers".into()) }).unwrap().is_ready()); assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'x'; 8192].into()) }).unwrap().is_ready()); @@ -1266,11 +1156,12 @@ mod tests { let _: Result<(), ()> = future::lazy(|| { let io = AsyncIo::new_buf(vec![], 1024 * 1024 * 5); let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io, Default::default()); - conn.state.writing = Writing::Body(Encoder::length(1024 * 1024), None); + conn.state.writing = Writing::Body(Encoder::length(1024 * 1024)); assert!(conn.start_send(Frame::Body { chunk: Some(vec![b'a'; 1024 * 1024].into()) }).unwrap().is_ready()); - assert!(conn.state.writing.is_queued()); + assert!(!conn.can_buffer_body()); + conn.io.io_mut().block_in(1024 * 1024 * 5); assert!(conn.poll_complete().unwrap().is_ready()); - assert!(!conn.state.writing.is_queued()); + assert!(conn.can_buffer_body()); assert!(conn.io.io_mut().flushed()); Ok(()) @@ -1329,7 +1220,7 @@ mod tests { let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io, Default::default()); conn.state.reading = Reading::KeepAlive; assert!(conn.poll().unwrap().is_not_ready()); - conn.state.writing = Writing::Body(Encoder::length(5_000), None); + conn.state.writing = Writing::Body(Encoder::length(5_000)); assert!(conn.poll_complete().unwrap().is_ready()); Ok::<(), ()>(()) }); diff --git a/src/proto/dispatch.rs b/src/proto/dispatch.rs index e0f5475906..89c3a1986d 100644 --- a/src/proto/dispatch.rs +++ b/src/proto/dispatch.rs @@ -87,8 +87,6 @@ where return Ok(Async::Ready(())); } else if self.conn.can_read_head() { try_ready!(self.poll_read_head()); - } else if self.conn.can_write_continue() { - try_nb!(self.conn.flush()); } else if let Some(mut body) = self.body_tx.take() { if self.conn.can_read_body() { match body.poll_ready() { @@ -196,7 +194,7 @@ where self.close(); return Ok(Async::Ready(())); } - } else if self.conn.has_queued_body() { + } else if !self.conn.can_buffer_body() { try_ready!(self.poll_flush()); } else if let Some(mut body) = self.body_rx.take() { let chunk = match body.poll()? { diff --git a/src/proto/h1/encode.rs b/src/proto/h1/encode.rs index 17f2724492..4fcfd60439 100644 --- a/src/proto/h1/encode.rs +++ b/src/proto/h1/encode.rs @@ -1,7 +1,9 @@ -use std::cmp; -use std::io::{self, Write}; +//use std::cmp; +use std::fmt; -use proto::io::AtomicWrite; +use bytes::{Buf, IntoBuf}; +use bytes::buf::{Chain, Take}; +use iovec::IoVec; /// Encoders to handle different Transfer-Encodings. #[derive(Debug, Clone)] @@ -9,10 +11,18 @@ pub struct Encoder { kind: Kind, } +#[derive(Debug)] +pub struct EncodedBuf { + kind: BufKind, +} + +#[derive(Debug)] +pub struct NotEof; + #[derive(Debug, PartialEq, Clone)] enum Kind { /// An Encoder for when Transfer-Encoding includes `chunked`. - Chunked(Chunked), + Chunked, /// An Encoder for when Content-Length is set. /// /// Enforces that the body is not longer than the Content-Length header. @@ -24,10 +34,18 @@ enum Kind { Eof } +#[derive(Debug)] +enum BufKind { + Exact(B), + Limited(Take), + Chunked(Chain>), + ChunkedEnd(CrLf), +} + impl Encoder { pub fn chunked() -> Encoder { Encoder { - kind: Kind::Chunked(Chunked::Init), + kind: Kind::Chunked, } } @@ -45,185 +63,103 @@ impl Encoder { pub fn is_eof(&self) -> bool { match self.kind { - Kind::Length(0) | - Kind::Chunked(Chunked::End) => true, + Kind::Length(0) => true, _ => false } } - pub fn end(&self) -> Result, NotEof> { + pub fn end(&self) -> Result>, NotEof> { match self.kind { Kind::Length(0) => Ok(None), - Kind::Chunked(Chunked::Init) => Ok(Some(b"0\r\n\r\n")), + Kind::Chunked => Ok(Some(EncodedBuf { + kind: BufKind::ChunkedEnd(CrLf(b"0\r\n\r\n")), + })), _ => Err(NotEof), } } - pub fn encode(&mut self, w: &mut W, msg: &[u8]) -> io::Result { - match self.kind { - Kind::Chunked(ref mut chunked) => { - chunked.encode(w, msg) + pub fn encode(&mut self, msg: B) -> EncodedBuf + where + B: IntoBuf, + { + let msg = msg.into_buf(); + let len = msg.remaining(); + assert!(len > 0, "encode() called with empty buf"); + + let buf = match self.kind { + Kind::Chunked => { + trace!("encoding chunked {}B", len); + BufKind::Chunked(ChunkSize::new(len) + .chain(msg.chain(CrLf(b"\r\n")))) }, Kind::Length(ref mut remaining) => { - if msg.is_empty() { - return Ok(0); + trace!("sized write, len = {}", len); + if len as u64 > *remaining { + let limit = *remaining as usize; + *remaining = 0; + BufKind::Limited(msg.take(limit)) + } else { + *remaining -= len as u64; + BufKind::Exact(msg) } - let n = { - let max = cmp::min(*remaining as usize, msg.len()); - trace!("sized write = {}", max); - let slice = &msg[..max]; - - try!(w.write_atomic(&[slice])) - }; - - if n == 0 { - return Err(io::Error::new(io::ErrorKind::WriteZero, "write zero")); - } - - *remaining -= n as u64; - trace!("encoded {} bytes, remaining = {}", n, remaining); - Ok(n) }, Kind::Eof => { - if msg.is_empty() { - return Ok(0); - } - w.write_atomic(&[msg]) + trace!("eof write {}B", len); + BufKind::Exact(msg) } + }; + EncodedBuf { + kind: buf, } } } -#[derive(Debug)] -pub struct NotEof; - -#[derive(Debug, PartialEq, Clone)] -enum Chunked { - Init, - Size(ChunkSize), - SizeCr, - SizeLf, - Body(usize), - BodyCr, - BodyLf, - End, -} +impl Buf for EncodedBuf +where + B: Buf, +{ + #[inline] + fn remaining(&self) -> usize { + match self.kind { + BufKind::Exact(ref b) => b.remaining(), + BufKind::Limited(ref b) => b.remaining(), + BufKind::Chunked(ref b) => b.remaining(), + BufKind::ChunkedEnd(ref b) => b.remaining(), + } + } -impl Chunked { - fn encode(&mut self, w: &mut W, msg: &[u8]) -> io::Result { - match *self { - Chunked::Init => { - let mut size = ChunkSize { - bytes: [0; CHUNK_SIZE_MAX_BYTES], - pos: 0, - len: 0, - }; - trace!("chunked write, size = {:?}", msg.len()); - write!(&mut size, "{:X}", msg.len()) - .expect("CHUNK_SIZE_MAX_BYTES should fit any usize"); - *self = Chunked::Size(size); - } - Chunked::End => return Ok(0), - _ => {} + #[inline] + fn bytes(&self) -> &[u8] { + match self.kind { + BufKind::Exact(ref b) => b.bytes(), + BufKind::Limited(ref b) => b.bytes(), + BufKind::Chunked(ref b) => b.bytes(), + BufKind::ChunkedEnd(ref b) => b.bytes(), } - let mut n = { - let pieces = match *self { - Chunked::Init => unreachable!("Chunked::Init should have become Chunked::Size"), - Chunked::Size(ref size) => [ - &size.bytes[size.pos.into() .. size.len.into()], - &b"\r\n"[..], - msg, - &b"\r\n"[..], - ], - Chunked::SizeCr => [ - &b""[..], - &b"\r\n"[..], - msg, - &b"\r\n"[..], - ], - Chunked::SizeLf => [ - &b""[..], - &b"\n"[..], - msg, - &b"\r\n"[..], - ], - Chunked::Body(pos) => [ - &b""[..], - &b""[..], - &msg[pos..], - &b"\r\n"[..], - ], - Chunked::BodyCr => [ - &b""[..], - &b""[..], - &b""[..], - &b"\r\n"[..], - ], - Chunked::BodyLf => [ - &b""[..], - &b""[..], - &b""[..], - &b"\n"[..], - ], - Chunked::End => unreachable!("Chunked::End shouldn't write more") - }; - try!(w.write_atomic(&pieces)) - }; + } - while n > 0 { - match *self { - Chunked::Init => unreachable!("Chunked::Init should have become Chunked::Size"), - Chunked::Size(mut size) => { - n = size.update(n); - if size.len == 0 { - *self = Chunked::SizeCr; - } else { - *self = Chunked::Size(size); - } - }, - Chunked::SizeCr => { - *self = Chunked::SizeLf; - n -= 1; - } - Chunked::SizeLf => { - *self = Chunked::Body(0); - n -= 1; - } - Chunked::Body(pos) => { - let left = msg.len() - pos; - if n >= left { - *self = Chunked::BodyCr; - n -= left; - } else { - *self = Chunked::Body(pos + n); - n = 0; - } - } - Chunked::BodyCr => { - *self = Chunked::BodyLf; - n -= 1; - } - Chunked::BodyLf => { - assert!(n == 1); - *self = if msg.len() == 0 { - Chunked::End - } else { - Chunked::Init - }; - n = 0; - }, - Chunked::End => unreachable!("Chunked::End shouldn't have any to write") - } + #[inline] + fn advance(&mut self, cnt: usize) { + match self.kind { + BufKind::Exact(ref mut b) => b.advance(cnt), + BufKind::Limited(ref mut b) => b.advance(cnt), + BufKind::Chunked(ref mut b) => b.advance(cnt), + BufKind::ChunkedEnd(ref mut b) => b.advance(cnt), } + } - match *self { - Chunked::Init | - Chunked::End => Ok(msg.len()), - _ => Err(io::ErrorKind::WouldBlock.into()) + #[inline] + fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize { + match self.kind { + BufKind::Exact(ref b) => b.bytes_vec(dst), + BufKind::Limited(ref b) => b.bytes_vec(dst), + BufKind::Chunked(ref b) => b.bytes_vec(dst), + BufKind::ChunkedEnd(ref b) => b.bytes_vec(dst), } } } + #[cfg(target_pointer_width = "32")] const USIZE_BYTES: usize = 4; @@ -235,27 +171,45 @@ const CHUNK_SIZE_MAX_BYTES: usize = USIZE_BYTES * 2; #[derive(Clone, Copy)] struct ChunkSize { - bytes: [u8; CHUNK_SIZE_MAX_BYTES], + bytes: [u8; CHUNK_SIZE_MAX_BYTES + 2], pos: u8, len: u8, } impl ChunkSize { - fn update(&mut self, n: usize) -> usize { - let diff = (self.len - self.pos).into(); - if n >= diff { - self.pos = 0; - self.len = 0; - n - diff - } else { - self.pos += n as u8; // just verified it was a small usize - 0 - } + fn new(len: usize) -> ChunkSize { + use std::fmt::Write; + let mut size = ChunkSize { + bytes: [0; CHUNK_SIZE_MAX_BYTES + 2], + pos: 0, + len: 0, + }; + write!(&mut size, "{:X}\r\n", len) + .expect("CHUNK_SIZE_MAX_BYTES should fit any usize"); + size + } +} + +impl Buf for ChunkSize { + #[inline] + fn remaining(&self) -> usize { + (self.len - self.pos).into() + } + + #[inline] + fn bytes(&self) -> &[u8] { + &self.bytes[self.pos.into() .. self.len.into()] + } + + #[inline] + fn advance(&mut self, cnt: usize) { + assert!(cnt <= self.remaining()); + self.pos += cnt as u8; // just asserted cnt fits in u8 } } -impl ::std::fmt::Debug for ChunkSize { - fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { +impl fmt::Debug for ChunkSize { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("ChunkSize") .field("bytes", &&self.bytes[..self.len.into()]) .field("pos", &self.pos) @@ -263,64 +217,122 @@ impl ::std::fmt::Debug for ChunkSize { } } -impl ::std::cmp::PartialEq for ChunkSize { - fn eq(&self, other: &ChunkSize) -> bool { - self.len == other.len && - self.pos == other.pos && - (&self.bytes[..]) == (&other.bytes[..]) +impl fmt::Write for ChunkSize { + fn write_str(&mut self, num: &str) -> fmt::Result { + use std::io::Write; + (&mut self.bytes[self.len.into()..]).write(num.as_bytes()) + .expect("&mut [u8].write() cannot error"); + self.len += num.len() as u8; // safe because bytes is never bigger than 256 + Ok(()) } } -impl io::Write for ChunkSize { - fn write(&mut self, msg: &[u8]) -> io::Result { - let n = (&mut self.bytes[self.len.into() ..]).write(msg) - .expect("&mut [u8].write() cannot error"); - self.len += n as u8; // safe because bytes is never bigger than 256 - Ok(n) +#[derive(Debug)] +struct CrLf(&'static [u8]); + +impl Buf for CrLf { + #[inline] + fn remaining(&self) -> usize { + self.0.len() } - fn flush(&mut self) -> io::Result<()> { - Ok(()) + #[inline] + fn bytes(&self) -> &[u8] { + self.0 + } + + #[inline] + fn advance(&mut self, cnt: usize) { + self.0 = &self.0[cnt..]; + } + + #[inline] + fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize { + if dst.is_empty() { + return 0; + } else { + dst[0] = self.0.into(); + return 1; + } } } #[cfg(test)] mod tests { + use bytes::{BufMut}; + + use proto::io::Cursor; use super::Encoder; - use mock::{AsyncIo, Buf}; #[test] - fn test_chunked_encode_sync() { - let mut dst = Buf::new(); + fn chunked() { let mut encoder = Encoder::chunked(); + let mut dst = Vec::new(); + + let msg1 = b"foo bar".as_ref(); + let buf1 = encoder.encode(msg1); + dst.put(buf1); + assert_eq!(dst, b"7\r\nfoo bar\r\n"); + + let msg2 = b"baz quux herp".as_ref(); + let buf2 = encoder.encode(msg2); + dst.put(buf2); + + assert_eq!(dst, b"7\r\nfoo bar\r\nD\r\nbaz quux herp\r\n"); - encoder.encode(&mut dst, b"foo bar").unwrap(); - encoder.encode(&mut dst, b"baz quux herp").unwrap(); - encoder.encode(&mut dst, b"").unwrap(); - assert_eq!(&dst[..], &b"7\r\nfoo bar\r\nD\r\nbaz quux herp\r\n0\r\n\r\n"[..]); + let end = encoder.end::>>().unwrap().unwrap(); + dst.put(end); + + assert_eq!(dst, b"7\r\nfoo bar\r\nD\r\nbaz quux herp\r\n0\r\n\r\n".as_ref()); } #[test] - fn test_chunked_encode_async() { - let mut dst = AsyncIo::new(Buf::new(), 7); - let mut encoder = Encoder::chunked(); + fn length() { + let max_len = 8; + let mut encoder = Encoder::length(max_len as u64); + let mut dst = Vec::new(); + + + let msg1 = b"foo bar".as_ref(); + let buf1 = encoder.encode(msg1); + dst.put(buf1); + - assert!(encoder.encode(&mut dst, b"foo bar").is_err()); - dst.block_in(6); - assert_eq!(7, encoder.encode(&mut dst, b"foo bar").unwrap()); - dst.block_in(30); - assert_eq!(13, encoder.encode(&mut dst, b"baz quux herp").unwrap()); - encoder.encode(&mut dst, b"").unwrap(); - assert_eq!(&dst[..], &b"7\r\nfoo bar\r\nD\r\nbaz quux herp\r\n0\r\n\r\n"[..]); + assert_eq!(dst, b"foo bar"); + assert!(!encoder.is_eof()); + encoder.end::<()>().unwrap_err(); + + let msg2 = b"baz".as_ref(); + let buf2 = encoder.encode(msg2); + dst.put(buf2); + + assert_eq!(dst.len(), max_len); + assert_eq!(dst, b"foo barb"); + assert!(encoder.is_eof()); + assert!(encoder.end::<()>().unwrap().is_none()); } #[test] - fn test_sized_encode() { - let mut dst = Buf::new(); - let mut encoder = Encoder::length(8); - encoder.encode(&mut dst, b"foo bar").unwrap(); - assert_eq!(encoder.encode(&mut dst, b"baz").unwrap(), 1); + fn eof() { + let mut encoder = Encoder::eof(); + let mut dst = Vec::new(); - assert_eq!(dst, b"foo barb"); + + let msg1 = b"foo bar".as_ref(); + let buf1 = encoder.encode(msg1); + dst.put(buf1); + + + assert_eq!(dst, b"foo bar"); + assert!(!encoder.is_eof()); + encoder.end::<()>().unwrap_err(); + + let msg2 = b"baz".as_ref(); + let buf2 = encoder.encode(msg2); + dst.put(buf2); + + assert_eq!(dst, b"foo barbaz"); + assert!(!encoder.is_eof()); + encoder.end::<()>().unwrap_err(); } } diff --git a/src/proto/h1/mod.rs b/src/proto/h1/mod.rs index 3853d4e096..dfd62906a6 100644 --- a/src/proto/h1/mod.rs +++ b/src/proto/h1/mod.rs @@ -1,5 +1,5 @@ pub use self::decode::Decoder; -pub use self::encode::Encoder; +pub use self::encode::{EncodedBuf, Encoder}; mod date; mod decode; diff --git a/src/proto/io.rs b/src/proto/io.rs index 06b86c91a9..53ff630120 100644 --- a/src/proto/io.rs +++ b/src/proto/io.rs @@ -1,27 +1,31 @@ -use std::cmp; +use std::collections::VecDeque; use std::fmt; -use std::io::{self, Write}; -use std::ptr; +use std::io; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use futures::{Async, Poll}; +use iovec::IoVec; use tokio_io::{AsyncRead, AsyncWrite}; use super::{Http1Transaction, MessageHead}; -use bytes::{BytesMut, Bytes}; const INIT_BUFFER_SIZE: usize = 8192; pub const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; +const MAX_BUF_LIST_BUFFERS: usize = 16; -pub struct Buffered { +pub struct Buffered { flush_pipeline: bool, io: T, max_buf_size: usize, read_blocked: bool, read_buf: BytesMut, - write_buf: WriteBuf, + write_buf: WriteBuf, } -impl fmt::Debug for Buffered { +impl fmt::Debug for Buffered +where + B: Buf, +{ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Buffered") .field("read_buf", &self.read_buf) @@ -30,8 +34,12 @@ impl fmt::Debug for Buffered { } } -impl Buffered { - pub fn new(io: T) -> Buffered { +impl Buffered +where + T: AsyncRead + AsyncWrite, + B: Buf, +{ + pub fn new(io: T) -> Buffered { Buffered { flush_pipeline: false, io: io, @@ -44,6 +52,11 @@ impl Buffered { pub fn set_flush_pipeline(&mut self, enabled: bool) { self.flush_pipeline = enabled; + self.write_buf.set_strategy(if enabled { + Strategy::Flatten + } else { + Strategy::Queue + }); } pub fn set_max_buf_size(&mut self, max: usize) { @@ -56,9 +69,17 @@ impl Buffered { } pub fn write_buf_mut(&mut self) -> &mut Vec { - self.write_buf.maybe_reset(); - self.write_buf.maybe_reserve(0); - &mut self.write_buf.buf.bytes + let buf = self.write_buf.head_mut(); + buf.maybe_reset(); + &mut buf.bytes + } + + pub fn buffer(&mut self, buf: B) { + self.write_buf.buffer(buf) + } + + pub fn can_buffer(&self) -> bool { + self.flush_pipeline || self.write_buf.can_buffer() } pub fn consume_leading_lines(&mut self) { @@ -118,10 +139,6 @@ impl Buffered { }) } - pub fn buffer>(&mut self, buf: B) -> usize { - self.write_buf.buffer(buf.as_ref()) - } - pub fn io_mut(&mut self) -> &mut T { &mut self.io } @@ -129,33 +146,23 @@ impl Buffered { pub fn is_read_blocked(&self) -> bool { self.read_blocked } -} -impl Write for Buffered { - fn write(&mut self, data: &[u8]) -> io::Result { - let n = self.write_buf.buffer(data); - if n == 0 { - Err(io::ErrorKind::WouldBlock.into()) - } else { - Ok(n) - } - } - - fn flush(&mut self) -> io::Result<()> { + pub fn flush(&mut self) -> Poll<(), io::Error> { if self.flush_pipeline && !self.read_buf.is_empty() { - Ok(()) + //Ok(()) } else if self.write_buf.remaining() == 0 { - self.io.flush() + try_nb!(self.io.flush()); } else { loop { - let n = try!(self.write_buf.write_into(&mut self.io)); + let n = try_ready!(self.io.write_buf(&mut self.write_buf)); debug!("flushed {} bytes", n); if self.write_buf.remaining() == 0 { break; } } - self.io.flush() + try_nb!(self.io.flush()) } + Ok(Async::Ready(())) } } @@ -163,7 +170,11 @@ pub trait MemRead { fn read_mem(&mut self, len: usize) -> Poll; } -impl MemRead for Buffered { +impl MemRead for Buffered +where + T: AsyncRead + AsyncWrite, + B: Buf, +{ fn read_mem(&mut self, len: usize) -> Poll { trace!("Buffered.read_mem read_buf={}, wanted={}", self.read_buf.len(), len); if !self.read_buf.is_empty() { @@ -191,143 +202,270 @@ impl> Cursor { } } - pub fn has_started(&self) -> bool { - self.pos != 0 + #[inline] + pub fn buf(&self) -> &[u8] { + &self.bytes.as_ref()[self.pos..] } - pub fn is_written(&self) -> bool { - trace!("Cursor::is_written pos = {}, len = {}", self.pos, self.bytes.as_ref().len()); - self.pos >= self.bytes.as_ref().len() + #[inline] + pub fn consume(&mut self, num: usize) { + self.pos += num; } +} - pub fn write_to(&mut self, dst: &mut W) -> io::Result { - if self.remaining() == 0 { - Ok(0) - } else { - dst.write(&self.bytes.as_ref()[self.pos..]).map(|n| { - self.pos += n; - n - }) +impl Cursor> { + fn maybe_reset(&mut self) { + if self.pos != 0 && self.remaining() == 0 { + self.pos = 0; + unsafe { + self.bytes.set_len(0); + } } } +} +impl> fmt::Debug for Cursor { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Cursor") + .field("pos", &self.pos) + .field("len", &self.bytes.as_ref().len()) + .finish() + } +} + +impl> Buf for Cursor { + #[inline] fn remaining(&self) -> usize { self.bytes.as_ref().len() - self.pos } #[inline] - pub fn buf(&self) -> &[u8] { - &self.bytes.as_ref()[self.pos..] + fn bytes(&self) -> &[u8] { + self.buf() } #[inline] - pub fn consume(&mut self, num: usize) { - trace!("Cursor::consume({})", num); - self.pos = ::std::cmp::min(self.bytes.as_ref().len(), self.pos + num); + fn advance(&mut self, cnt: usize) { + self.consume(cnt) } } -impl> fmt::Debug for Cursor { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Cursor") - .field("pos", &self.pos) - .field("len", &self.bytes.as_ref().len()) - .finish() - } +// an internal buffer to collect writes before flushes +struct WriteBuf { + buf: BufDeque, + max_buf_size: usize, + strategy: Strategy, } -pub trait AtomicWrite { - fn write_atomic(&mut self, data: &[&[u8]]) -> io::Result; +impl WriteBuf { + fn new() -> WriteBuf { + WriteBuf { + buf: BufDeque::new(), + max_buf_size: DEFAULT_MAX_BUFFER_SIZE, + strategy: Strategy::Queue, + } + } } -/* -#[cfg(not(windows))] -impl AtomicWrite for T { - fn write_atomic(&mut self, bufs: &[&[u8]]) -> io::Result { - self.writev(bufs) +impl WriteBuf +where + B: Buf, +{ + fn set_strategy(&mut self, strategy: Strategy) { + self.strategy = strategy; + } + + fn buffer(&mut self, buf: B) { + match self.strategy { + Strategy::Flatten => { + let head = self.head_mut(); + head.maybe_reset(); + head.bytes.put(buf); + }, + Strategy::Queue => { + self.buf.bufs.push_back(VecOrBuf::Buf(buf)); + }, + } } -} + fn can_buffer(&self) -> bool { + match self.strategy { + Strategy::Flatten => { + self.remaining() < self.max_buf_size + }, + Strategy::Queue => { + // for now, the simplest of heuristics + self.buf.bufs.len() < MAX_BUF_LIST_BUFFERS + && self.remaining() < self.max_buf_size + }, + } + } + + fn head_mut(&mut self) -> &mut Cursor> { + // this dance is brought to you, The Borrow Checker! -#[cfg(windows)] -*/ -impl AtomicWrite for T { - fn write_atomic(&mut self, bufs: &[&[u8]]) -> io::Result { - if bufs.len() == 1 { - self.write(bufs[0]) + let reuse_back = if let Some(&VecOrBuf::Vec(_)) = self.buf.bufs.back() { + true } else { - let vec = bufs.concat(); - self.write(&vec) + false + }; + + if !reuse_back { + let head_buf = Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)); + self.buf.bufs.push_back(VecOrBuf::Vec(head_buf)); + } + if let Some(&mut VecOrBuf::Vec(ref mut v)) = self.buf.bufs.back_mut() { + v + } else { + unreachable!("head_buf just pushed on back"); } } } -//} -// an internal buffer to collect writes before flushes +impl fmt::Debug for WriteBuf { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("WriteBuf") + .field("remaining", &self.remaining()) + .field("strategy", &self.strategy) + .finish() + } +} + +impl Buf for WriteBuf { + #[inline] + fn remaining(&self) -> usize { + self.buf.remaining() + } + + #[inline] + fn bytes(&self) -> &[u8] { + self.buf.bytes() + } + + #[inline] + fn advance(&mut self, cnt: usize) { + self.buf.advance(cnt) + } + + #[inline] + fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize { + self.buf.bytes_vec(dst) + } +} + #[derive(Debug)] -struct WriteBuf{ - buf: Cursor>, - max_buf_size: usize, +enum Strategy { + Flatten, + Queue, } -impl WriteBuf { - fn new() -> WriteBuf { - WriteBuf { - buf: Cursor::new(Vec::new()), - max_buf_size: DEFAULT_MAX_BUFFER_SIZE, +enum VecOrBuf { + Vec(Cursor>), + Buf(B), +} + +impl Buf for VecOrBuf { + #[inline] + fn remaining(&self) -> usize { + match *self { + VecOrBuf::Vec(ref v) => v.remaining(), + VecOrBuf::Buf(ref b) => b.remaining(), + } + } + + #[inline] + fn bytes(&self) -> &[u8] { + match *self { + VecOrBuf::Vec(ref v) => v.bytes(), + VecOrBuf::Buf(ref b) => b.bytes(), + } + } + + #[inline] + fn advance(&mut self, cnt: usize) { + match *self { + VecOrBuf::Vec(ref mut v) => v.advance(cnt), + VecOrBuf::Buf(ref mut b) => b.advance(cnt), } } - fn write_into(&mut self, w: &mut W) -> io::Result { - self.buf.write_to(w) - } - - fn buffer(&mut self, data: &[u8]) -> usize { - trace!("WriteBuf::buffer() len = {:?}", data.len()); - self.maybe_reset(); - self.maybe_reserve(data.len()); - let vec = &mut self.buf.bytes; - let len = cmp::min(vec.capacity() - vec.len(), data.len()); - assert!(vec.capacity() - vec.len() >= len); - unsafe { - // in rust 1.9, we could use slice::copy_from_slice - ptr::copy( - data.as_ptr(), - vec.as_mut_ptr().offset(vec.len() as isize), - len - ); - let new_len = vec.len() + len; - vec.set_len(new_len); + #[inline] + fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize { + match *self { + VecOrBuf::Vec(ref v) => v.bytes_vec(dst), + VecOrBuf::Buf(ref b) => b.bytes_vec(dst), } - len } +} +struct BufDeque { + bufs: VecDeque>, +} + + +impl BufDeque { + fn new() -> BufDeque { + BufDeque { + bufs: VecDeque::new(), + } + } +} + +impl Buf for BufDeque { + #[inline] fn remaining(&self) -> usize { - self.buf.remaining() + self.bufs.iter() + .map(|buf| buf.remaining()) + .sum() } #[inline] - fn maybe_reserve(&mut self, needed: usize) { - let vec = &mut self.buf.bytes; - let cap = vec.capacity(); - if cap == 0 { - let init = cmp::min(self.max_buf_size, cmp::max(INIT_BUFFER_SIZE, needed)); - trace!("WriteBuf reserving initial {}", init); - vec.reserve(init); - } else if cap < self.max_buf_size { - vec.reserve(cmp::min(needed, self.max_buf_size - cap)); - trace!("WriteBuf reserved {}", vec.capacity() - cap); + fn bytes(&self) -> &[u8] { + if let Some(buf) = self.bufs.front() { + buf.bytes() + } else { + &[] } } - fn maybe_reset(&mut self) { - if self.buf.pos != 0 && self.buf.remaining() == 0 { - self.buf.pos = 0; - unsafe { - self.buf.bytes.set_len(0); + #[inline] + fn advance(&mut self, mut cnt: usize) { + let mut maybe_reclaim = None; + while cnt > 0 { + { + let front = &mut self.bufs[0]; + let rem = front.remaining(); + if rem > cnt { + front.advance(cnt); + return; + } else { + front.advance(rem); + cnt -= rem; + } + } + maybe_reclaim = self.bufs.pop_front(); + } + + if let Some(VecOrBuf::Vec(v)) = maybe_reclaim { + trace!("reclaiming write buf Vec"); + self.bufs.push_back(VecOrBuf::Vec(v)); + } + } + + #[inline] + fn bytes_vec<'t>(&'t self, dst: &mut [&'t IoVec]) -> usize { + if dst.is_empty() { + return 0; + } + let mut vecs = 0; + for buf in &self.bufs { + vecs += buf.bytes_vec(&mut dst[vecs..]); + if vecs == dst.len() { + break; } } + vecs } } @@ -351,7 +489,7 @@ fn test_iobuf_write_empty_slice() { let mut mock = AsyncIo::new(MockBuf::new(), 256); mock.error(io::Error::new(io::ErrorKind::Other, "logic error")); - let mut io_buf = Buffered::new(mock); + let mut io_buf = Buffered::<_, Cursor>>::new(mock); // underlying io will return the logic error upon write, // so we are testing that the io_buf does not trigger a write @@ -366,7 +504,7 @@ fn test_parse_reads_until_blocked() { let raw = "HTTP/1.1 200 OK\r\n"; let mock = AsyncIo::new(MockBuf::wrap(raw.into()), raw.len()); - let mut buffered = Buffered::new(mock); + let mut buffered = Buffered::<_, Cursor>>::new(mock); assert_eq!(buffered.parse::().unwrap(), Async::NotReady); assert!(buffered.io.blocked()); } diff --git a/tests/server.rs b/tests/server.rs index 40e556a786..12e8a9bb6c 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -23,6 +23,7 @@ use std::thread; use std::time::Duration; use hyper::StatusCode; +use hyper::header::ContentLength; use hyper::server::{Http, Request, Response, Service, NewService, service_fn}; @@ -630,8 +631,14 @@ fn expect_continue() { fn pipeline_disabled() { let server = serve(); let mut req = connect(server.addr()); - server.reply().status(hyper::Ok); - server.reply().status(hyper::Ok); + server.reply() + .status(hyper::Ok) + .header(ContentLength(12)) + .body("Hello World!"); + server.reply() + .status(hyper::Ok) + .header(ContentLength(12)) + .body("Hello World!"); req.write_all(b"\ GET / HTTP/1.1\r\n\ @@ -671,8 +678,14 @@ fn pipeline_enabled() { .. Default::default() }); let mut req = connect(server.addr()); - server.reply().status(hyper::Ok); - server.reply().status(hyper::Ok); + server.reply() + .status(hyper::Ok) + .header(ContentLength(12)) + .body("Hello World\n"); + server.reply() + .status(hyper::Ok) + .header(ContentLength(12)) + .body("Hello World\n"); req.write_all(b"\ GET / HTTP/1.1\r\n\ @@ -687,6 +700,23 @@ fn pipeline_enabled() { let mut buf = vec![0; 4096]; let n = req.read(&mut buf).expect("read 1"); assert_ne!(n, 0); + + { + let mut lines = buf.split(|&b| b == b'\n'); + assert_eq!(s(lines.next().unwrap()), "HTTP/1.1 200 OK\r"); + assert_eq!(s(lines.next().unwrap()), "Content-Length: 12\r"); + lines.next().unwrap(); // Date + assert_eq!(s(lines.next().unwrap()), "\r"); + assert_eq!(s(lines.next().unwrap()), "Hello World"); + + assert_eq!(s(lines.next().unwrap()), "HTTP/1.1 200 OK\r"); + assert_eq!(s(lines.next().unwrap()), "Content-Length: 12\r"); + lines.next().unwrap(); // Date + assert_eq!(s(lines.next().unwrap()), "\r"); + assert_eq!(s(lines.next().unwrap()), "Hello World"); + } + + // with pipeline enabled, both responses should have been in the first read // so a second read should be EOF let n = req.read(&mut buf).expect("read 2"); @@ -992,6 +1022,51 @@ fn max_buf_size() { core.run(fut).unwrap_err(); } +#[test] +fn streaming_body() { + let _ = pretty_env_logger::try_init(); + let mut core = Core::new().unwrap(); + let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); + let addr = listener.local_addr().unwrap(); + + thread::spawn(move || { + let mut tcp = connect(&addr); + tcp.write_all(b"GET / HTTP/1.1\r\n\r\n").unwrap(); + let mut buf = [0; 8192]; + let mut sum = tcp.read(&mut buf).expect("read 1"); + + let expected = "HTTP/1.1 200 "; + assert_eq!(s(&buf[..expected.len()]), expected); + + loop { + let n = tcp.read(&mut buf).expect("read loop"); + sum += n; + if n == 0 { + break; + } + } + assert_eq!(sum, 1_007_089); + }); + + let fut = listener.incoming() + .into_future() + .map_err(|_| unreachable!()) + .and_then(|(item, _incoming)| { + let (socket, _) = item.unwrap(); + Http::<& &'static [u8]>::new() + .keep_alive(false) + .serve_connection(socket, service_fn(|_| { + static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_00] as _; + let b = ::futures::stream::iter_ok(S.iter()); + Ok(Response::, ::hyper::Error>>::new() + .with_body(b)) + })) + .map(|_| ()) + }); + + core.run(fut).unwrap(); +} + #[test] fn remote_addr() { let server = serve(); @@ -1085,6 +1160,12 @@ impl<'a> ReplyBuilder<'a> { } } +impl<'a> Drop for ReplyBuilder<'a> { + fn drop(&mut self) { + let _ = self.tx.send(Reply::End); + } +} + impl Drop for Serve { fn drop(&mut self) { drop(self.shutdown_signal.take()); @@ -1104,6 +1185,7 @@ enum Reply { Status(hyper::StatusCode), Headers(hyper::Headers), Body(Vec), + End, } #[derive(Debug)] @@ -1164,6 +1246,7 @@ impl Service for TestService { Reply::Body(body) => { res.set_body(body); }, + Reply::End => break, } } res