Skip to content

Commit

Permalink
feat(client): proper proxy and tunneling in Client
Browse files Browse the repository at this point in the history
Closes #774
  • Loading branch information
seanmonstar committed May 2, 2016
1 parent 3a3e086 commit f36c6b2
Show file tree
Hide file tree
Showing 10 changed files with 406 additions and 64 deletions.
15 changes: 14 additions & 1 deletion examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,20 @@ fn main() {
}
};

let client = Client::new();
let client = match env::var("HTTP_PROXY") {
Ok(mut proxy) => {
// parse the proxy, message if it doesn't make sense
let mut port = 80;
if let Some(colon) = proxy.rfind(':') {
port = proxy[colon + 1..].parse().unwrap_or_else(|e| {
panic!("HTTP_PROXY is malformed: {:?}, port parse error: {}", proxy, e);
});
proxy.truncate(colon);
}
Client::with_http_proxy(proxy, port)
},
_ => Client::new()
};

let mut res = client.get(&*url)
.header(Connection::close())
Expand Down
96 changes: 69 additions & 27 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,12 @@ use method::Method;
use net::{NetworkConnector, NetworkStream};
use Error;

use self::proxy::tunnel;
pub use self::pool::Pool;
pub use self::request::Request;
pub use self::response::Response;

mod proxy;
pub mod pool;
pub mod request;
pub mod response;
Expand All @@ -90,7 +92,7 @@ pub struct Client {
redirect_policy: RedirectPolicy,
read_timeout: Option<Duration>,
write_timeout: Option<Duration>,
proxy: Option<(Cow<'static, str>, Cow<'static, str>, u16)>
proxy: Option<(Cow<'static, str>, u16)>
}

impl fmt::Debug for Client {
Expand All @@ -116,6 +118,15 @@ impl Client {
Client::with_connector(Pool::new(config))
}

pub fn with_http_proxy<H>(host: H, port: u16) -> Client
where H: Into<Cow<'static, str>> {
let host = host.into();
let proxy = tunnel((host.clone(), port));
let mut client = Client::with_connector(Pool::with_connector(Default::default(), proxy));
client.proxy = Some((host, port));
client
}

/// Create a new client with a specific connector.
pub fn with_connector<C, S>(connector: C) -> Client
where C: NetworkConnector<Stream=S> + Send + Sync + 'static, S: NetworkStream + Send {
Expand Down Expand Up @@ -148,12 +159,6 @@ impl Client {
self.write_timeout = dur;
}

/// Set a proxy for requests of this Client.
pub fn set_proxy<S, H>(&mut self, scheme: S, host: H, port: u16)
where S: Into<Cow<'static, str>>, H: Into<Cow<'static, str>> {
self.proxy = Some((scheme.into(), host.into(), port));
}

/// Build a Get request.
pub fn get<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::Get, url)
Expand Down Expand Up @@ -271,13 +276,12 @@ impl<'a> RequestBuilder<'a> {

loop {
let mut req = {
let (scheme, host, port) = match client.proxy {
Some(ref proxy) => (proxy.0.as_ref(), proxy.1.as_ref(), proxy.2),
None => {
let hp = try!(get_host_and_port(&url));
(url.scheme(), hp.0, hp.1)
}
};
let (host, port) = try!(get_host_and_port(&url));
let mut message = try!(client.protocol.new_message(&host, port, url.scheme()));
if url.scheme() == "http" && client.proxy.is_some() {
message.set_proxied(true);
}

let mut headers = match headers {
Some(ref headers) => headers.clone(),
None => Headers::new(),
Expand All @@ -286,7 +290,6 @@ impl<'a> RequestBuilder<'a> {
hostname: host.to_owned(),
port: Some(port),
});
let message = try!(client.protocol.new_message(&host, port, scheme));
Request::with_headers_and_message(method.clone(), url.clone(), headers, message)
};

Expand Down Expand Up @@ -460,6 +463,7 @@ impl Default for RedirectPolicy {
}
}


fn get_host_and_port(url: &Url) -> ::Result<(&str, u16)> {
let host = match url.host_str() {
Some(host) => host,
Expand All @@ -479,8 +483,9 @@ mod tests {
use std::io::Read;
use header::Server;
use http::h1::Http11Message;
use mock::{MockStream};
use mock::{MockStream, MockSsl};
use super::{Client, RedirectPolicy};
use super::proxy::Proxy;
use super::pool::Pool;
use url::Url;

Expand All @@ -505,24 +510,61 @@ mod tests {
#[test]
fn test_proxy() {
use super::pool::PooledStream;
type MessageStream = PooledStream<super::proxy::Proxied<MockStream, MockStream>>;
mock_connector!(ProxyConnector {
b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"
});
let mut client = Client::with_connector(Pool::with_connector(Default::default(), ProxyConnector));
client.set_proxy("http", "example.proxy", 8008);
let tunnel = Proxy {
connector: ProxyConnector,
proxy: ("example.proxy".into(), 8008),
ssl: MockSsl,
};
let mut client = Client::with_connector(Pool::with_connector(Default::default(), tunnel));
client.proxy = Some(("example.proxy".into(), 8008));
let mut dump = vec![];
client.get("http://127.0.0.1/foo/bar").send().unwrap().read_to_end(&mut dump).unwrap();

{
let box_message = client.protocol.new_message("example.proxy", 8008, "http").unwrap();
let message = box_message.downcast::<Http11Message>().unwrap();
let stream = message.into_inner().downcast::<PooledStream<MockStream>>().unwrap().into_inner();
let s = ::std::str::from_utf8(&stream.write).unwrap();
let request_line = "GET http://127.0.0.1/foo/bar HTTP/1.1\r\n";
assert_eq!(&s[..request_line.len()], request_line);
assert!(s.contains("Host: example.proxy:8008\r\n"));
}
let box_message = client.protocol.new_message("127.0.0.1", 80, "http").unwrap();
let message = box_message.downcast::<Http11Message>().unwrap();
let stream = message.into_inner().downcast::<MessageStream>().unwrap().into_inner().into_normal().unwrap();;

let s = ::std::str::from_utf8(&stream.write).unwrap();
let request_line = "GET http://127.0.0.1/foo/bar HTTP/1.1\r\n";
assert!(s.starts_with(request_line), "{:?} doesn't start with {:?}", s, request_line);
assert!(s.contains("Host: 127.0.0.1\r\n"));
}

#[test]
fn test_proxy_tunnel() {
use super::pool::PooledStream;
type MessageStream = PooledStream<super::proxy::Proxied<MockStream, MockStream>>;

mock_connector!(ProxyConnector {
b"HTTP/1.1 200 OK\r\n\r\n",
b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"
});
let tunnel = Proxy {
connector: ProxyConnector,
proxy: ("example.proxy".into(), 8008),
ssl: MockSsl,
};
let mut client = Client::with_connector(Pool::with_connector(Default::default(), tunnel));
client.proxy = Some(("example.proxy".into(), 8008));
let mut dump = vec![];
client.get("https://127.0.0.1/foo/bar").send().unwrap().read_to_end(&mut dump).unwrap();

let box_message = client.protocol.new_message("127.0.0.1", 443, "https").unwrap();
let message = box_message.downcast::<Http11Message>().unwrap();
let stream = message.into_inner().downcast::<MessageStream>().unwrap().into_inner().into_tunneled().unwrap();

let s = ::std::str::from_utf8(&stream.write).unwrap();
let connect_line = "CONNECT 127.0.0.1:443 HTTP/1.1\r\nHost: 127.0.0.1:443\r\n\r\n";
assert_eq!(&s[..connect_line.len()], connect_line);

let s = &s[connect_line.len()..];
let request_line = "GET /foo/bar HTTP/1.1\r\n";
assert_eq!(&s[..request_line.len()], request_line);
assert!(s.contains("Host: 127.0.0.1\r\n"));
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions src/client/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ impl<C: NetworkConnector<Stream=S>, S: NetworkStream + Send> NetworkConnector fo
}

/// A Stream that will try to be returned to the Pool when dropped.
#[derive(Debug)]
pub struct PooledStream<S> {
inner: Option<PooledStreamInner<S>>,
is_closed: bool,
Expand Down
Loading

0 comments on commit f36c6b2

Please sign in to comment.