diff --git a/examples/client.rs b/examples/client.rs
index bafb6ba7fe..b5df77e25d 100644
--- a/examples/client.rs
+++ b/examples/client.rs
@@ -40,7 +40,7 @@ fn main() {
println!("Response: {}", res.status());
println!("Headers: {:#?}", res.headers());
- res.into_parts().1.for_each(|chunk| {
+ res.into_parts().1.into_stream().for_each(|chunk| {
io::stdout().write_all(&chunk).map_err(From::from)
})
}).map(|_| {
diff --git a/examples/params.rs b/examples/params.rs
index ff51b91e3c..c632daa88e 100644
--- a/examples/params.rs
+++ b/examples/params.rs
@@ -30,7 +30,7 @@ impl Service for ParamExample {
Box::new(futures::future::ok(Response::new(INDEX.into())))
},
(&Method::POST, "/post") => {
- Box::new(req.into_parts().1.concat2().map(|b| {
+ Box::new(req.into_parts().1.into_stream().concat2().map(|b| {
// Parse the request body. form_urlencoded::parse
// always succeeds, but in general parsing may
// fail (for example, an invalid post of json), so
diff --git a/examples/send_file.rs b/examples/send_file.rs
index 71351ef975..7e507fe33f 100644
--- a/examples/send_file.rs
+++ b/examples/send_file.rs
@@ -3,15 +3,15 @@ extern crate futures;
extern crate hyper;
extern crate pretty_env_logger;
-use futures::{Future, Sink};
+use futures::{Future/*, Sink*/};
use futures::sync::oneshot;
-use hyper::{Body, Chunk, Method, Request, Response, StatusCode};
+use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode};
use hyper::error::Error;
use hyper::server::{Http, Service};
use std::fs::File;
-use std::io::{self, copy, Read};
+use std::io::{self, copy/*, Read*/};
use std::thread;
static NOTFOUND: &[u8] = b"Not Found";
@@ -80,7 +80,7 @@ impl Service for ResponseExamples {
// a small test file.
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
- let mut file = match File::open(INDEX) {
+ let _file = match File::open(INDEX) {
Ok(f) => f,
Err(_) => {
tx.send(Response::builder()
@@ -91,9 +91,10 @@ impl Service for ResponseExamples {
return;
},
};
- let (mut tx_body, rx_body) = Body::pair();
+ let (_tx_body, rx_body) = Body::channel();
let res = Response::new(rx_body.into());
tx.send(res).expect("Send error on successful file read");
+ /* TODO: fix once we have futures 0.2 Sink working
let mut buf = [0u8; 16];
loop {
match file.read(&mut buf) {
@@ -104,7 +105,7 @@ impl Service for ResponseExamples {
break;
} else {
let chunk: Chunk = buf[0..n].to_vec().into();
- match tx_body.send(Ok(chunk)).wait() {
+ match tx_body.send_data(chunk).wait() {
Ok(t) => { tx_body = t; },
Err(_) => { break; }
};
@@ -113,6 +114,7 @@ impl Service for ResponseExamples {
Err(_) => { break; }
}
}
+ */
});
Box::new(rx.map_err(|e| Error::from(io::Error::new(io::ErrorKind::Other, e))))
diff --git a/examples/server.rs b/examples/server.rs
index 7c7b3c510b..4c8cba4229 100644
--- a/examples/server.rs
+++ b/examples/server.rs
@@ -24,7 +24,7 @@ impl Service for Echo {
Response::new(INDEX.into())
},
(&Method::POST, "/echo") => {
- Response::new(req.into_parts().1)
+ Response::new(req.into_body())
},
_ => {
let mut res = Response::new(Body::empty());
diff --git a/examples/web_api.rs b/examples/web_api.rs
index d7fdb46e7b..566dad4c49 100644
--- a/examples/web_api.rs
+++ b/examples/web_api.rs
@@ -7,7 +7,6 @@ extern crate tokio_core;
use futures::{Future, Stream};
use hyper::{Body, Chunk, Client, Method, Request, Response, StatusCode};
-use hyper::error::Error;
use hyper::server::{Http, Service};
#[allow(unused)]
@@ -18,20 +17,18 @@ static URL: &str = "http://127.0.0.1:1337/web_api";
static INDEX: &[u8] = b"test.html";
static LOWERCASE: &[u8] = b"i am a lower case string";
-pub type ResponseStream = Box>;
-
struct ResponseExamples(tokio_core::reactor::Handle);
impl Service for ResponseExamples {
type Request = Request;
- type Response = Response;
+ type Response = Response;
type Error = hyper::Error;
type Future = Box>;
fn call(&self, req: Self::Request) -> Self::Future {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") | (&Method::GET, "/index.html") => {
- let body: ResponseStream = Box::new(Body::from(INDEX));
+ let body = Body::from(INDEX);
Box::new(futures::future::ok(Response::new(body)))
},
(&Method::GET, "/test.html") => {
@@ -45,7 +42,7 @@ impl Service for ResponseExamples {
let web_res_future = client.request(req);
Box::new(web_res_future.map(|web_res| {
- let body: ResponseStream = Box::new(web_res.into_parts().1.map(|b| {
+ let body = Body::wrap_stream(web_res.into_body().into_stream().map(|b| {
Chunk::from(format!("before: '{:?}'
after: '{:?}'",
std::str::from_utf8(LOWERCASE).unwrap(),
std::str::from_utf8(&b).unwrap()))
@@ -55,7 +52,7 @@ impl Service for ResponseExamples {
},
(&Method::POST, "/web_api") => {
// A web api to run against. Simple upcasing of the body.
- let body: ResponseStream = Box::new(req.into_parts().1.map(|chunk| {
+ let body = Body::wrap_stream(req.into_body().into_stream().map(|chunk| {
let upper = chunk.iter().map(|byte| byte.to_ascii_uppercase())
.collect::>();
Chunk::from(upper)
@@ -63,7 +60,7 @@ impl Service for ResponseExamples {
Box::new(futures::future::ok(Response::new(body)))
},
_ => {
- let body: ResponseStream = Box::new(Body::from(NOTFOUND));
+ let body = Body::from(NOTFOUND);
Box::new(futures::future::ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.body(body)
diff --git a/src/client/conn.rs b/src/client/conn.rs
index 91fdeeaa36..2a01d0135b 100644
--- a/src/client/conn.rs
+++ b/src/client/conn.rs
@@ -11,11 +11,12 @@ use std::fmt;
use std::marker::PhantomData;
use bytes::Bytes;
-use futures::{Async, Future, Poll, Stream};
+use futures::{Async, Future, Poll};
use futures::future::{self, Either};
use tokio_io::{AsyncRead, AsyncWrite};
use proto;
+use proto::body::Entity;
use super::dispatch;
use {Body, Request, Response, StatusCode};
@@ -44,14 +45,13 @@ pub struct SendRequest {
pub struct Connection
where
T: AsyncRead + AsyncWrite,
- B: Stream + 'static,
- B::Item: AsRef<[u8]>,
+ B: Entity + 'static,
{
inner: proto::dispatch::Dispatcher<
proto::dispatch::Client,
B,
T,
- B::Item,
+ B::Data,
proto::ClientUpgradeTransaction,
>,
}
@@ -134,8 +134,7 @@ impl SendRequest
impl SendRequest
where
- B: Stream + 'static,
- B::Item: AsRef<[u8]>,
+ B: Entity + 'static,
{
/// Sends a `Request` on the associated connection.
///
@@ -152,7 +151,7 @@ where
/// the `Host` header based on it. You must add a `Host` header yourself
/// before calling this method.
/// - Since absolute-form `Uri`s are not required, if received, they will
- /// be serialized as-is, irregardless of calling `Request::set_proxy`.
+ /// be serialized as-is.
///
/// # Example
///
@@ -185,19 +184,6 @@ where
/// # fn main() {}
/// ```
pub fn send_request(&mut self, req: Request) -> ResponseFuture {
- /* TODO?
- // The Connection API does less things automatically than the Client
- // API does. For instance, right here, we always assume set_proxy, so
- // that if an absolute-form URI is provided, it is serialized as-is.
- //
- // Part of the reason for this is to prepare for the change to `http`
- // types, where there is no more set_proxy.
- //
- // It's important that this method isn't called directly from the
- // `Client`, so that `set_proxy` there is still respected.
- req.set_proxy(true);
- */
-
let inner = match self.dispatch.send(req) {
Ok(rx) => {
Either::A(rx.then(move |res| {
@@ -269,8 +255,7 @@ impl fmt::Debug for SendRequest {
impl Connection
where
T: AsyncRead + AsyncWrite,
- B: Stream + 'static,
- B::Item: AsRef<[u8]>,
+ B: Entity + 'static,
{
/// Return the inner IO object, and additional information.
pub fn into_parts(self) -> Parts {
@@ -297,8 +282,7 @@ where
impl Future for Connection
where
T: AsyncRead + AsyncWrite,
- B: Stream + 'static,
- B::Item: AsRef<[u8]>,
+ B: Entity + 'static,
{
type Item = ();
type Error = ::Error;
@@ -311,8 +295,7 @@ where
impl fmt::Debug for Connection
where
T: AsyncRead + AsyncWrite + fmt::Debug,
- B: Stream + 'static,
- B::Item: AsRef<[u8]>,
+ B: Entity + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Connection")
@@ -341,8 +324,7 @@ impl Builder {
pub fn handshake(&self, io: T) -> Handshake
where
T: AsyncRead + AsyncWrite,
- B: Stream + 'static,
- B::Item: AsRef<[u8]>,
+ B: Entity + 'static,
{
Handshake {
inner: HandshakeInner {
@@ -356,8 +338,7 @@ impl Builder {
pub(super) fn handshake_no_upgrades(&self, io: T) -> HandshakeNoUpgrades
where
T: AsyncRead + AsyncWrite,
- B: Stream + 'static,
- B::Item: AsRef<[u8]>,
+ B: Entity + 'static,
{
HandshakeNoUpgrades {
inner: HandshakeInner {
@@ -374,8 +355,7 @@ impl Builder {
impl Future for Handshake
where
T: AsyncRead + AsyncWrite,
- B: Stream + 'static,
- B::Item: AsRef<[u8]>,
+ B: Entity + 'static,
{
type Item = (SendRequest, Connection);
type Error = ::Error;
@@ -400,14 +380,13 @@ impl fmt::Debug for Handshake {
impl Future for HandshakeNoUpgrades
where
T: AsyncRead + AsyncWrite,
- B: Stream + 'static,
- B::Item: AsRef<[u8]>,
+ B: Entity + 'static,
{
type Item = (SendRequest, proto::dispatch::Dispatcher<
proto::dispatch::Client,
B,
T,
- B::Item,
+ B::Data,
proto::ClientTransaction,
>);
type Error = ::Error;
@@ -420,8 +399,7 @@ where
impl Future for HandshakeInner
where
T: AsyncRead + AsyncWrite,
- B: Stream + 'static,
- B::Item: AsRef<[u8]>,
+ B: Entity + 'static,
R: proto::Http1Transaction<
Incoming=StatusCode,
Outgoing=proto::RequestLine,
@@ -431,7 +409,7 @@ where
proto::dispatch::Client,
B,
T,
- B::Item,
+ B::Data,
R,
>);
type Error = ::Error;
@@ -485,16 +463,16 @@ impl AssertSendSync for SendRequest {}
impl AssertSend for Connection
where
T: AsyncRead + AsyncWrite,
- B: Stream,
- B::Item: AsRef<[u8]> + Send,
+ B: Entity + 'static,
+ B::Data: Send + 'static,
{}
#[doc(hidden)]
impl AssertSendSync for Connection
where
T: AsyncRead + AsyncWrite,
- B: Stream,
- B::Item: AsRef<[u8]> + Send + Sync,
+ B: Entity + 'static,
+ B::Data: Send + Sync + 'static,
{}
#[doc(hidden)]
diff --git a/src/client/mod.rs b/src/client/mod.rs
index e2245de403..d8a808e318 100644
--- a/src/client/mod.rs
+++ b/src/client/mod.rs
@@ -7,14 +7,15 @@ use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
-use futures::{Async, Future, Poll, Stream};
+use futures::{Async, Future, Poll};
use futures::future::{self, Executor};
use http::{Method, Request, Response, Uri, Version};
use http::header::{Entry, HeaderValue, HOST};
use tokio::reactor::Handle;
pub use tokio_service::Service;
-use proto::{self, Body};
+use proto::body::{Body, Entity};
+use proto;
use self::pool::Pool;
pub use self::connect::{HttpConnector, Connect};
@@ -101,8 +102,7 @@ impl Client {
impl Client
where C: Connect,
- B: Stream + 'static,
- B::Item: AsRef<[u8]>,
+ B: Entity + 'static,
{
/// Send a `GET` request to the supplied `Uri`.
@@ -181,13 +181,13 @@ where C: Connect,
let client = self.clone();
//TODO: let is_proxy = req.is_proxy();
- //let uri = req.uri().clone();
+ let uri = req.uri().clone();
let fut = RetryableSendRequest {
client: client,
future: self.send_request(req, &domain),
domain: domain,
//is_proxy: is_proxy,
- //uri: uri,
+ uri: uri,
};
FutureResponse(Box::new(fut))
}
@@ -293,8 +293,7 @@ where C: Connect,
impl Service for Client
where C: Connect,
- B: Stream + 'static,
- B::Item: AsRef<[u8]>,
+ B: Entity + 'static,
{
type Request = Request;
type Response = Response;
@@ -350,14 +349,13 @@ struct RetryableSendRequest {
domain: String,
future: Box, Error=ClientError>>,
//is_proxy: bool,
- //uri: Uri,
+ uri: Uri,
}
impl Future for RetryableSendRequest
where
C: Connect,
- B: Stream + 'static,
- B::Item: AsRef<[u8]>,
+ B: Entity + 'static,
{
type Item = Response;
type Error = ::Error;
@@ -370,7 +368,7 @@ where
Err(ClientError::Normal(err)) => return Err(err),
Err(ClientError::Canceled {
connection_reused,
- req,
+ mut req,
reason,
}) => {
if !self.client.retry_canceled_requests || !connection_reused {
@@ -380,6 +378,7 @@ where
}
trace!("unstarted request canceled, trying again (reason={:?})", reason);
+ *req.uri_mut() = self.uri.clone();
self.future = self.client.send_request(req, &self.domain);
}
}
@@ -547,8 +546,7 @@ impl Config {
impl Config
where C: Connect,
- B: Stream,
- B::Item: AsRef<[u8]>,
+ B: Entity,
{
/// Construct the Client with this configuration.
#[inline]
@@ -569,8 +567,7 @@ where C: Connect,
}
impl Config
-where B: Stream,
- B::Item: AsRef<[u8]>,
+where B: Entity,
{
/// Construct the Client with this configuration.
#[inline]
diff --git a/src/headers.rs b/src/headers.rs
index 1e98ee4ce2..1cad1ba9e0 100644
--- a/src/headers.rs
+++ b/src/headers.rs
@@ -1,7 +1,14 @@
+use std::fmt::Write;
+
+use bytes::BytesMut;
use http::HeaderMap;
-use http::header::{CONNECTION, CONTENT_LENGTH, EXPECT, HeaderValue, TRANSFER_ENCODING};
+use http::header::{CONNECTION, CONTENT_LENGTH, EXPECT, TRANSFER_ENCODING};
+use http::header::{HeaderValue, OccupiedEntry, ValueIter};
use unicase;
+/// Maximum number of bytes needed to serialize a u64 into ASCII decimal.
+const MAX_DECIMAL_U64_BYTES: usize = 20;
+
pub fn connection_keep_alive(headers: &HeaderMap) -> bool {
for line in headers.get_all(CONNECTION) {
if let Ok(s) = line.to_str() {
@@ -31,13 +38,15 @@ pub fn connection_close(headers: &HeaderMap) -> bool {
}
pub fn content_length_parse(headers: &HeaderMap) -> Option {
+ content_length_parse_all(headers.get_all(CONTENT_LENGTH).into_iter())
+}
+
+pub fn content_length_parse_all(values: ValueIter) -> Option {
// If multiple Content-Length headers were sent, everything can still
// be alright if they all contain the same value, and all parse
// correctly. If not, then it's an error.
- let values = headers.get_all(CONTENT_LENGTH);
let folded = values
- .into_iter()
.fold(None, |prev, line| match prev {
Some(Ok(prev)) => {
Some(line
@@ -66,12 +75,25 @@ pub fn content_length_zero(headers: &mut HeaderMap) {
headers.insert(CONTENT_LENGTH, HeaderValue::from_static("0"));
}
+pub fn content_length_value(len: u64) -> HeaderValue {
+ let mut len_buf = BytesMut::with_capacity(MAX_DECIMAL_U64_BYTES);
+ write!(len_buf, "{}", len)
+ .expect("BytesMut can hold a decimal u64");
+ // safe because u64 Display is ascii numerals
+ unsafe {
+ HeaderValue::from_shared_unchecked(len_buf.freeze())
+ }
+}
+
pub fn expect_continue(headers: &HeaderMap) -> bool {
Some(&b"100-continue"[..]) == headers.get(EXPECT).map(|v| v.as_bytes())
}
pub fn transfer_encoding_is_chunked(headers: &HeaderMap) -> bool {
- let mut encodings = headers.get_all(TRANSFER_ENCODING).into_iter();
+ is_chunked(headers.get_all(TRANSFER_ENCODING).into_iter())
+}
+
+pub fn is_chunked(mut encodings: ValueIter) -> bool {
// chunked must always be the last encoding, according to spec
if let Some(line) = encodings.next_back() {
if let Ok(s) = line.to_str() {
@@ -83,3 +105,33 @@ pub fn transfer_encoding_is_chunked(headers: &HeaderMap) -> bool {
false
}
+
+pub fn add_chunked(mut entry: OccupiedEntry) {
+ const CHUNKED: &'static str = "chunked";
+
+ if let Some(line) = entry.iter_mut().next_back() {
+ // + 2 for ", "
+ let new_cap = line.as_bytes().len() + CHUNKED.len() + 2;
+ let mut buf = BytesMut::with_capacity(new_cap);
+ buf.copy_from_slice(line.as_bytes());
+ buf.copy_from_slice(b", ");
+ buf.copy_from_slice(CHUNKED.as_bytes());
+
+ *line = HeaderValue::from_shared(buf.freeze())
+ .expect("original header value plus ascii is valid");
+ return;
+ }
+
+ entry.insert(HeaderValue::from_static(CHUNKED));
+}
+
+#[cfg(test)]
+mod tests {
+ #[test]
+ fn assert_max_decimal_u64_bytes() {
+ assert_eq!(
+ super::MAX_DECIMAL_U64_BYTES,
+ ::std::u64::MAX.to_string().len()
+ );
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index 18d13c2bfc..66a0fa9a72 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -44,7 +44,7 @@ pub use http::{
pub use client::Client;
pub use error::{Result, Error};
-pub use proto::{Body, Chunk};
+pub use proto::{body, Body, Chunk};
pub use server::Server;
mod common;
diff --git a/src/proto/body.rs b/src/proto/body.rs
index 28acd51f67..23eea59e7b 100644
--- a/src/proto/body.rs
+++ b/src/proto/body.rs
@@ -1,50 +1,236 @@
+//! Streaming bodies for Requests and Responses
+use std::borrow::Cow;
use std::fmt;
use bytes::Bytes;
-use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
+use futures::{Async, Future, Poll, Stream};
use futures::sync::{mpsc, oneshot};
-use std::borrow::Cow;
+use http::HeaderMap;
use super::Chunk;
-pub type BodySender = mpsc::Sender>;
+type BodySender = mpsc::Sender>;
+
+/// This trait represents a streaming body of a `Request` or `Response`.
+pub trait Entity {
+ /// A buffer of bytes representing a single chunk of a body.
+ type Data: AsRef<[u8]>;
+
+ /// The error type of this stream.
+ //TODO: add bounds Into<::error::User> (or whatever it is called)
+ type Error;
+
+ /// Poll for a `Data` buffer.
+ ///
+ /// Similar to `Stream::poll_next`, this yields `Some(Data)` until
+ /// the body ends, when it yields `None`.
+ fn poll_data(&mut self) -> Poll