Skip to content

Commit

Permalink
feat(lib): add support to disable tokio-proto internals
Browse files Browse the repository at this point in the history
For now, this adds `client::Config::no_proto`, `server::Http::no_proto`,
and `server::Server::no_proto` to skip tokio-proto implementations, and
use an internal dispatch system instead.

`Http::no_proto` is similar to `Http::bind_connection`, but returns a
`Connection` that is a `Future` to drive HTTP with the provided service.
Any errors prior to parsing a request, and after delivering a response
(but before flush the response body) will be returned from this future.

See #1342 for more.
  • Loading branch information
seanmonstar committed Oct 27, 2017
1 parent 8153cfa commit f7532b7
Show file tree
Hide file tree
Showing 14 changed files with 1,042 additions and 157 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ matrix:
env: FEATURES="--features nightly"
- rust: beta
- rust: stable
- rust: stable
env: HYPER_NO_PROTO=1
- rust: stable
env: FEATURES="--features compat"
- rust: 1.17.0
Expand Down
6 changes: 4 additions & 2 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![deny(warnings)]
//#![deny(warnings)]
extern crate futures;
extern crate hyper;
extern crate tokio_core;
Expand Down Expand Up @@ -32,7 +32,9 @@ fn main() {

let mut core = tokio_core::reactor::Core::new().unwrap();
let handle = core.handle();
let client = Client::new(&handle);
let client = Client::configure()
.no_proto()
.build(&handle);

let work = client.get(url).and_then(|res| {
println!("Response: {}", res.status());
Expand Down
3 changes: 2 additions & 1 deletion examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ impl Service for Hello {
fn main() {
pretty_env_logger::init().unwrap();
let addr = "127.0.0.1:3000".parse().unwrap();
let server = Http::new().bind(&addr, || Ok(Hello)).unwrap();
let mut server = Http::new().bind(&addr, || Ok(Hello)).unwrap();
server.no_proto();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().unwrap();
}
3 changes: 2 additions & 1 deletion examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ fn main() {
pretty_env_logger::init().unwrap();
let addr = "127.0.0.1:1337".parse().unwrap();

let server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
let mut server = Http::new().bind(&addr, || Ok(Echo)).unwrap();
server.no_proto();
println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap());
server.run().unwrap();
}
179 changes: 126 additions & 53 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio_proto::util::client_proxy::ClientProxy;
pub use tokio_service::Service;

use header::{Headers, Host};
use proto::{self, TokioBody};
use proto::{self, RequestHead, TokioBody};
use proto::response;
use proto::request;
use method::Method;
Expand All @@ -45,7 +45,7 @@ pub mod compat;
pub struct Client<C, B = proto::Body> {
connector: C,
handle: Handle,
pool: Pool<TokioClient<B>>,
pool: Dispatch<B>,
}

impl Client<HttpConnector, proto::Body> {
Expand Down Expand Up @@ -93,7 +93,11 @@ impl<C, B> Client<C, B> {
Client {
connector: config.connector,
handle: handle.clone(),
pool: Pool::new(config.keep_alive, config.keep_alive_timeout),
pool: if config.no_proto {
Dispatch::Hyper(Pool::new(config.keep_alive, config.keep_alive_timeout))
} else {
Dispatch::Proto(Pool::new(config.keep_alive, config.keep_alive_timeout))
}
}
}
}
Expand Down Expand Up @@ -187,48 +191,100 @@ where C: Connect,
headers.extend(head.headers.iter());
head.headers = headers;

let checkout = self.pool.checkout(domain.as_ref());
let connect = {
let handle = self.handle.clone();
let pool = self.pool.clone();
let pool_key = Rc::new(domain.to_string());
self.connector.connect(url)
.map(move |io| {
let (tx, rx) = oneshot::channel();
let client = HttpClient {
client_rx: RefCell::new(Some(rx)),
}.bind_client(&handle, io);
let pooled = pool.pooled(pool_key, client);
drop(tx.send(pooled.clone()));
pooled
})
};
match self.pool {
Dispatch::Proto(ref pool) => {
trace!("proto_dispatch");
let checkout = pool.checkout(domain.as_ref());
let connect = {
let handle = self.handle.clone();
let pool = pool.clone();
let pool_key = Rc::new(domain.to_string());
self.connector.connect(url)
.map(move |io| {
let (tx, rx) = oneshot::channel();
let client = HttpClient {
client_rx: RefCell::new(Some(rx)),
}.bind_client(&handle, io);
let pooled = pool.pooled(pool_key, client);
drop(tx.send(pooled.clone()));
pooled
})
};

let race = checkout.select(connect)
.map(|(client, _work)| client)
.map_err(|(e, _work)| {
// the Pool Checkout cannot error, so the only error
// is from the Connector
// XXX: should wait on the Checkout? Problem is
// that if the connector is failing, it may be that we
// never had a pooled stream at all
e.into()
});
let resp = race.and_then(move |client| {
let msg = match body {
Some(body) => {
Message::WithBody(head, body.into())
},
None => Message::WithoutBody(head),
};
client.call(msg)
});
FutureResponse(Box::new(resp.map(|msg| {
match msg {
Message::WithoutBody(head) => response::from_wire(head, None),
Message::WithBody(head, body) => response::from_wire(head, Some(body.into())),
}
})))
},
Dispatch::Hyper(ref pool) => {
trace!("no_proto dispatch");
use futures::Sink;
use futures::sync::{mpsc, oneshot};

let checkout = pool.checkout(domain.as_ref());
let connect = {
let handle = self.handle.clone();
let pool = pool.clone();
let pool_key = Rc::new(domain.to_string());
self.connector.connect(url)
.map(move |io| {
let (tx, rx) = mpsc::channel(1);
let pooled = pool.pooled(pool_key, RefCell::new(tx));
let conn = proto::Conn::<_, _, proto::ClientTransaction, _>::new(io, pooled.clone());
let dispatch = proto::dispatch::Dispatcher::new(proto::dispatch::Client::new(rx), conn);
handle.spawn(dispatch.map_err(|err| error!("no_proto error: {}", err)));
pooled
})
};

let race = checkout.select(connect)
.map(|(client, _work)| client)
.map_err(|(e, _work)| {
// the Pool Checkout cannot error, so the only error
// is from the Connector
// XXX: should wait on the Checkout? Problem is
// that if the connector is failing, it may be that we
// never had a pooled stream at all
e.into()
});

let resp = race.and_then(move |client| {
let (callback, rx) = oneshot::channel();
client.borrow_mut().start_send((head, body, callback)).unwrap();
rx.then(|res| {
match res {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
Err(_) => panic!("dispatch dropped without returning error"),
}
})
});

FutureResponse(Box::new(resp))

let race = checkout.select(connect)
.map(|(client, _work)| client)
.map_err(|(e, _work)| {
// the Pool Checkout cannot error, so the only error
// is from the Connector
// XXX: should wait on the Checkout? Problem is
// that if the connector is failing, it may be that we
// never had a pooled stream at all
e.into()
});
let resp = race.and_then(move |client| {
let msg = match body {
Some(body) => {
Message::WithBody(head, body.into())
},
None => Message::WithoutBody(head),
};
client.call(msg)
});
FutureResponse(Box::new(resp.map(|msg| {
match msg {
Message::WithoutBody(head) => response::from_wire(head, None),
Message::WithBody(head, body) => response::from_wire(head, Some(body.into())),
}
})))
}
}

}
Expand All @@ -238,7 +294,10 @@ impl<C: Clone, B> Clone for Client<C, B> {
Client {
connector: self.connector.clone(),
handle: self.handle.clone(),
pool: self.pool.clone(),
pool: match self.pool {
Dispatch::Proto(ref pool) => Dispatch::Proto(pool.clone()),
Dispatch::Hyper(ref pool) => Dispatch::Hyper(pool.clone()),
}
}
}
}
Expand All @@ -249,10 +308,16 @@ impl<C, B> fmt::Debug for Client<C, B> {
}
}

type TokioClient<B> = ClientProxy<Message<proto::RequestHead, B>, Message<proto::ResponseHead, TokioBody>, ::Error>;
type ProtoClient<B> = ClientProxy<Message<RequestHead, B>, Message<proto::ResponseHead, TokioBody>, ::Error>;
type HyperClient<B> = RefCell<::futures::sync::mpsc::Sender<(RequestHead, Option<B>, ::futures::sync::oneshot::Sender<::Result<::Response>>)>>;

enum Dispatch<B> {
Proto(Pool<ProtoClient<B>>),
Hyper(Pool<HyperClient<B>>),
}

struct HttpClient<B> {
client_rx: RefCell<Option<oneshot::Receiver<Pooled<TokioClient<B>>>>>,
client_rx: RefCell<Option<oneshot::Receiver<Pooled<ProtoClient<B>>>>>,
}

impl<T, B> ClientProto<T> for HttpClient<B>
Expand All @@ -265,7 +330,7 @@ where T: AsyncRead + AsyncWrite + 'static,
type Response = proto::ResponseHead;
type ResponseBody = proto::Chunk;
type Error = ::Error;
type Transport = proto::Conn<T, B::Item, proto::ClientTransaction, Pooled<TokioClient<B>>>;
type Transport = proto::Conn<T, B::Item, proto::ClientTransaction, Pooled<ProtoClient<B>>>;
type BindTransport = BindingClient<T, B>;

fn bind_transport(&self, io: T) -> Self::BindTransport {
Expand All @@ -277,7 +342,7 @@ where T: AsyncRead + AsyncWrite + 'static,
}

struct BindingClient<T, B> {
rx: oneshot::Receiver<Pooled<TokioClient<B>>>,
rx: oneshot::Receiver<Pooled<ProtoClient<B>>>,
io: Option<T>,
}

Expand All @@ -286,7 +351,7 @@ where T: AsyncRead + AsyncWrite + 'static,
B: Stream<Error=::Error>,
B::Item: AsRef<[u8]>,
{
type Item = proto::Conn<T, B::Item, proto::ClientTransaction, Pooled<TokioClient<B>>>;
type Item = proto::Conn<T, B::Item, proto::ClientTransaction, Pooled<ProtoClient<B>>>;
type Error = io::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Expand All @@ -309,6 +374,7 @@ pub struct Config<C, B> {
keep_alive_timeout: Option<Duration>,
//TODO: make use of max_idle config
max_idle: usize,
no_proto: bool,
}

/// Phantom type used to signal that `Config` should create a `HttpConnector`.
Expand All @@ -324,6 +390,7 @@ impl Default for Config<UseDefaultConnector, proto::Body> {
keep_alive: true,
keep_alive_timeout: Some(Duration::from_secs(90)),
max_idle: 5,
no_proto: false,
}
}
}
Expand All @@ -347,6 +414,7 @@ impl<C, B> Config<C, B> {
keep_alive: self.keep_alive,
keep_alive_timeout: self.keep_alive_timeout,
max_idle: self.max_idle,
no_proto: self.no_proto,
}
}

Expand All @@ -360,6 +428,7 @@ impl<C, B> Config<C, B> {
keep_alive: self.keep_alive,
keep_alive_timeout: self.keep_alive_timeout,
max_idle: self.max_idle,
no_proto: self.no_proto,
}
}

Expand Down Expand Up @@ -393,6 +462,13 @@ impl<C, B> Config<C, B> {
self
}
*/

/// Disable tokio-proto internal usage.
#[inline]
pub fn no_proto(mut self) -> Config<C, B> {
self.no_proto = true;
self
}
}

impl<C, B> Config<C, B>
Expand Down Expand Up @@ -431,11 +507,8 @@ impl<C, B> fmt::Debug for Config<C, B> {
impl<C: Clone, B> Clone for Config<C, B> {
fn clone(&self) -> Config<C, B> {
Config {
_body_type: PhantomData::<B>,
connector: self.connector.clone(),
keep_alive: self.keep_alive,
keep_alive_timeout: self.keep_alive_timeout,
max_idle: self.max_idle,
.. *self
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/proto/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::borrow::Cow;
use super::Chunk;

pub type TokioBody = tokio_proto::streaming::Body<Chunk, ::Error>;
pub type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;

/// A `Stream` for `Chunk`s used in requests and responses.
#[must_use = "streams do nothing unless polled"]
Expand Down
Loading

0 comments on commit f7532b7

Please sign in to comment.