diff --git a/src/http/body.rs b/src/http/body.rs index f98d593123..d0b80329d2 100644 --- a/src/http/body.rs +++ b/src/http/body.rs @@ -96,3 +96,18 @@ fn _assert_send_sync() { _assert_send::(); _assert_sync::(); } + +#[test] +fn test_body_stream_concat() { + use futures::{Sink, Stream, Future}; + let (tx, body) = Body::pair(); + + ::std::thread::spawn(move || { + let tx = tx.send(Ok("hello ".into())).wait().unwrap(); + tx.send(Ok("world".into())).wait().unwrap(); + }); + + let total = body.concat().wait().unwrap(); + assert_eq!(total.as_ref(), b"hello world"); + +} diff --git a/src/http/chunk.rs b/src/http/chunk.rs index a8b65940ea..ec0b3d5e6f 100644 --- a/src/http/chunk.rs +++ b/src/http/chunk.rs @@ -1,12 +1,41 @@ use std::fmt; +use std::mem; -use bytes::Bytes; +use bytes::{Bytes, BytesMut, BufMut}; /// A piece of a message body. pub struct Chunk(Inner); enum Inner { + Mut(BytesMut), Shared(Bytes), + Swapping, +} + +impl Inner { + fn as_bytes_mut(&mut self, reserve: usize) -> &mut BytesMut { + match *self { + Inner::Mut(ref mut bytes) => return bytes, + _ => () + } + + let bytes = match mem::replace(self, Inner::Swapping) { + Inner::Shared(bytes) => bytes, + _ => unreachable!(), + }; + + let bytes_mut = bytes.try_mut().unwrap_or_else(|bytes| { + let mut bytes_mut = BytesMut::with_capacity(reserve + bytes.len()); + bytes_mut.put_slice(bytes.as_ref()); + bytes_mut + }); + + *self = Inner::Mut(bytes_mut); + match *self { + Inner::Mut(ref mut bytes) => bytes, + _ => unreachable!(), + } + } } impl From> for Chunk { @@ -46,7 +75,9 @@ impl From for Chunk { impl From for Bytes { fn from(chunk: Chunk) -> Bytes { match chunk.0 { + Inner::Mut(bytes_mut) => bytes_mut.freeze(), Inner::Shared(bytes) => bytes, + Inner::Swapping => unreachable!(), } } } @@ -64,7 +95,9 @@ impl AsRef<[u8]> for Chunk { #[inline] fn as_ref(&self) -> &[u8] { match self.0 { + Inner::Mut(ref slice) => slice, Inner::Shared(ref slice) => slice, + Inner::Swapping => unreachable!(), } } } @@ -75,3 +108,24 @@ impl fmt::Debug for Chunk { fmt::Debug::fmt(self.as_ref(), f) } } + +impl IntoIterator for Chunk { + type Item = u8; + type IntoIter = ::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + match self.0 { + Inner::Mut(bytes) => bytes.freeze().into_iter(), + Inner::Shared(bytes) => bytes.into_iter(), + Inner::Swapping => unreachable!(), + } + } +} + +impl Extend for Chunk { + fn extend(&mut self, iter: T) where T: IntoIterator { + let iter = iter.into_iter(); + + self.0.as_bytes_mut(iter.size_hint().0).extend(iter); + } +}