Skip to content

Commit

Permalink
Merge pull request #11 from ebkalderon/async-responses
Browse files Browse the repository at this point in the history
Implement asynchronous responses
  • Loading branch information
kpcyrd authored Dec 11, 2018
2 parents f5e04aa + e6a4c87 commit 7658bed
Show file tree
Hide file tree
Showing 7 changed files with 342 additions and 339 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ failure = "0.1.1"
log = "0.4"

trust-dns = "0.15"
trust-dns-proto = "0.5"

serde = "1.0"
serde_derive = "1.0"
Expand Down
15 changes: 10 additions & 5 deletions examples/get.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
extern crate chrootable_https;
extern crate env_logger;
extern crate structopt;
extern crate chrootable_https;

use chrootable_https::{Resolver, Client};
use chrootable_https::{Client, Resolver};
use std::io;
use std::io::prelude::*;
use std::time::Duration;
use structopt::StructOpt;

#[derive(Debug, StructOpt)]
pub struct Args {
#[structopt(short="-t", long="--timeout")]
#[structopt(short = "-t", long = "--timeout")]
timeout: Option<u64>,
urls: Vec<String>,
}
Expand All @@ -27,8 +27,13 @@ fn main() {
}

for url in &args.urls {
let reply = client.get(&url).expect("request failed");
let reply = client
.get(&url)
.wait_for_response()
.expect("request failed");
eprintln!("{:#?}", reply);
io::stdout().write(&reply.body).expect("failed to write body");
io::stdout()
.write(&reply.body)
.expect("failed to write body");
}
}
135 changes: 70 additions & 65 deletions src/connector.rs
Original file line number Diff line number Diff line change
@@ -1,57 +1,65 @@
use hyper_rustls::HttpsConnector;
use hyper::rt::Future;
use hyper::client::connect::{self, Connect};
use hyper::client::connect::HttpConnector;
use hyper::client::connect::Destination;
use ct_logs;
use dns::{DnsResolver, RecordType};
use futures::{future, Poll};
use hyper::client::connect::Destination;
use hyper::client::connect::HttpConnector;
use hyper::client::connect::{self, Connect};
use hyper::rt::Future;
use hyper_rustls::HttpsConnector;
use rustls::ClientConfig;
use webpki_roots;
use ct_logs;

use errors::Error;
use std::io;
use std::net::IpAddr;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use errors::Result;

use std::sync::Arc;

pub struct Connector<T> {
pub struct Connector<T, R: DnsResolver> {
http: T,
// resolver: ResolverFuture,
records: Arc<Mutex<HashMap<String, IpAddr>>>,
resolver: Arc<R>,
}

impl<T> Connector<T> {
pub fn resolve_dest(&self, mut dest: Destination) -> Result<Destination> {
let ip = {
let cache = self.records.lock().unwrap();
cache.get(dest.host()).map(|x| x.to_owned())
};

let ip = match ip {
Some(IpAddr::V4(ip)) => ip.to_string(),
Some(IpAddr::V6(ip)) => format!("[{}]", ip),
None => bail!("host wasn't pre-resolved"),
};

dest.set_host(&ip)?;
impl<T, R: DnsResolver + 'static> Connector<T, R> {
pub fn resolve_dest(&self, mut dest: Destination) -> Resolving {
let resolver = self.resolver.clone();
let host = dest.host().to_string();

let resolve = future::lazy(move || {
resolver
.resolve(&host, RecordType::A)
});

let resolved = Box::new(resolve.and_then(move |record| {
// TODO: we might have more than one record available
match record.success()?.into_iter().next() {
Some(record) => {
let ip = match record {
IpAddr::V4(ip) => ip.to_string(),
IpAddr::V6(ip) => format!("[{}]", ip),
};

dest.set_host(&ip)?;
Ok(dest)
}
None => bail!("no record found"),
}
}));

Ok(dest)
Resolving(resolved)
}
}

impl Connector<HttpConnector> {
pub fn new(records: Arc<Mutex<HashMap<String, IpAddr>>>) -> Connector<HttpConnector> {
impl<R: DnsResolver> Connector<HttpConnector, R> {
pub fn new(resolver: Arc<R>) -> Connector<HttpConnector, R> {
let mut http = HttpConnector::new(4);
http.enforce_http(false);
Connector {
http,
records,
}
Connector { http, resolver }
}

pub fn https(records: Arc<Mutex<HashMap<String, IpAddr>>>) -> HttpsConnector<Connector<HttpConnector>> {
let http = Connector::new(records);
pub fn https(
resolver: Arc<R>,
) -> HttpsConnector<Connector<HttpConnector, R>> {
let http = Connector::new(resolver);

let mut config = ClientConfig::new();
config
Expand All @@ -63,55 +71,39 @@ impl Connector<HttpConnector> {
}
}

impl<T> Connect for Connector<T>
impl<T, R> Connect for Connector<T, R>
where
T: Connect<Error=io::Error>,
T: Connect<Error = io::Error>,
T: Clone,
T: 'static,
T::Transport: 'static,
T::Future: 'static,
R: DnsResolver,
R: 'static,
{
type Transport = T::Transport;
type Error = io::Error;
type Future = Connecting<T::Transport>;

fn connect(&self, dest: connect::Destination) -> Self::Future {
debug!("original destination: {:?}", dest);
let dest = match self.resolve_dest(dest) {
Ok(dest) => dest,
Err(err) => {
let err = io::Error::new(io::ErrorKind::Other, err.to_string());
return Connecting(Box::new(future::err(err)));
},
};
debug!("resolved destination: {:?}", dest);
let connecting = self.http.connect(dest);
let fut = Box::new(connecting);
Connecting(fut)

/*
// async implementation
// compiles but hangs forever
println!("creating resolve");
let resolving = self.resolve_dest(&dest);
let resolving = self
.resolve_dest(dest)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()));

let http = self.http.clone();
println!("chaining resolve");
let fut = Box::new(resolving.and_then(move |records| {
// unimplemented!()
println!("records: {:?}", records);
let fut = Box::new(resolving.and_then(move |dest| {
debug!("resolved destination: {:?}", dest);
http.connect(dest)
}));
println!("returning future");

Connecting(fut)
*/
}
}

/// A Future representing work to connect to a URL
pub struct Connecting<T>(
Box<Future<Item = (T, connect::Connected), Error = io::Error> + Send>,
);
/// A Future representing work to connect to a URL.
#[must_use = "futures do nothing unless polled"]
pub struct Connecting<T>(Box<Future<Item = (T, connect::Connected), Error = io::Error> + Send>);

impl<T> Future for Connecting<T> {
type Item = (T, connect::Connected);
Expand All @@ -121,3 +113,16 @@ impl<T> Future for Connecting<T> {
self.0.poll()
}
}

/// A Future representing work to resolve a DNS query.
#[must_use = "futures do nothing unless polled"]
pub struct Resolving(Box<Future<Item = Destination, Error = Error> + Send>);

impl Future for Resolving {
type Item = Destination;
type Error = Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll()
}
}
Loading

0 comments on commit 7658bed

Please sign in to comment.