Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Replace reqwest with hyper #8099

Merged
merged 16 commits into from
Mar 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
527 changes: 221 additions & 306 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions dapps/src/apps/fetcher/installers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ use std::{fs, fmt};
use std::io::{self, Read, Write};
use std::path::PathBuf;
use ethereum_types::H256;
use fetch::{self, Mime};
use fetch;
use futures_cpupool::CpuPool;
use hash::keccak_buffer;
use mime_guess::Mime;

use apps::manifest::{MANIFEST_FILENAME, deserialize_manifest, serialize_manifest, Manifest};
use handlers::{ContentValidator, ValidatorResponse};
Expand Down Expand Up @@ -53,7 +54,7 @@ fn write_response_and_check_hash(

// Now write the response
let mut file = io::BufWriter::new(fs::File::create(&content_path)?);
let mut reader = io::BufReader::new(response);
let mut reader = io::BufReader::new(fetch::BodyReader::new(response));
io::copy(&mut reader, &mut file)?;
file.flush()?;

Expand Down
5 changes: 4 additions & 1 deletion dapps/src/apps/fetcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ impl<R: URLHint + 'static, F: Fetch> Endpoint for ContentFetcher<F, R> {
),
self.embeddable_on.clone(),
self.fetch.clone(),
self.pool.clone(),
)
},
URLHintResult::GithubDapp(content) => {
Expand All @@ -232,6 +233,7 @@ impl<R: URLHint + 'static, F: Fetch> Endpoint for ContentFetcher<F, R> {
),
self.embeddable_on.clone(),
self.fetch.clone(),
self.pool.clone(),
)
},
URLHintResult::Content(content) => {
Expand All @@ -248,6 +250,7 @@ impl<R: URLHint + 'static, F: Fetch> Endpoint for ContentFetcher<F, R> {
),
self.embeddable_on.clone(),
self.fetch.clone(),
self.pool.clone(),
)
},
};
Expand Down Expand Up @@ -280,7 +283,7 @@ impl<R: URLHint + 'static, F: Fetch> Endpoint for ContentFetcher<F, R> {
mod tests {
use std::env;
use std::sync::Arc;
use fetch::{Fetch, Client};
use fetch::Client;
use futures::{future, Future};
use hash_fetch::urlhint::{URLHint, URLHintResult};
use ethereum_types::H256;
Expand Down
2 changes: 1 addition & 1 deletion dapps/src/apps/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub fn all_endpoints<F: Fetch>(
insert::<parity_ui::old::App>(&mut pages, "v1", Embeddable::Yes(embeddable.clone()), pool.clone());

pages.insert("proxy".into(), ProxyPac::boxed(embeddable.clone(), dapps_domain.to_owned()));
pages.insert(WEB_PATH.into(), Web::boxed(embeddable.clone(), web_proxy_tokens.clone(), fetch.clone()));
pages.insert(WEB_PATH.into(), Web::boxed(embeddable.clone(), web_proxy_tokens.clone(), fetch.clone(), pool.clone()));

(local_endpoints, pages)
}
Expand Down
14 changes: 9 additions & 5 deletions dapps/src/handlers/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::time::{Instant, Duration};
use fetch::{self, Fetch};
use futures::sync::oneshot;
use futures::{self, Future};
use futures_cpupool::CpuPool;
use hyper::{self, Method, StatusCode};
use parking_lot::Mutex;

Expand All @@ -35,7 +36,7 @@ const FETCH_TIMEOUT: u64 = 300;

pub enum ValidatorResponse {
Local(local::Dapp),
Streaming(StreamingHandler<fetch::Response>),
Streaming(StreamingHandler<fetch::BodyReader>),
}

pub trait ContentValidator: Sized + Send + 'static {
Expand Down Expand Up @@ -252,6 +253,7 @@ impl ContentFetcherHandler {
installer: H,
embeddable_on: Embeddable,
fetch: F,
pool: CpuPool,
) -> Self {
let fetch_control = FetchControl::default();
let errors = Errors { embeddable_on };
Expand All @@ -262,6 +264,7 @@ impl ContentFetcherHandler {
Method::Get => {
trace!(target: "dapps", "Fetching content from: {:?}", url);
FetchState::InProgress(Self::fetch_content(
pool,
fetch,
url,
fetch_control.abort.clone(),
Expand All @@ -282,6 +285,7 @@ impl ContentFetcherHandler {
}

fn fetch_content<H: ContentValidator, F: Fetch>(
pool: CpuPool,
fetch: F,
url: &str,
abort: Arc<AtomicBool>,
Expand All @@ -290,8 +294,8 @@ impl ContentFetcherHandler {
installer: H,
) -> Box<Future<Item=FetchState, Error=()> + Send> {
// Start fetching the content
let fetch2 = fetch.clone();
let future = fetch.fetch_with_abort(url, abort.into()).then(move |result| {
let pool2 = pool.clone();
let future = fetch.fetch(url, abort.into()).then(move |result| {
trace!(target: "dapps", "Fetching content finished. Starting validation: {:?}", result);
Ok(match result {
Ok(response) => match installer.validate_and_install(response) {
Expand All @@ -303,7 +307,7 @@ impl ContentFetcherHandler {
Ok(ValidatorResponse::Streaming(stream)) => {
trace!(target: "dapps", "Validation OK. Streaming response.");
let (reading, response) = stream.into_response();
fetch2.process_and_forget(reading);
pool.spawn(reading).forget();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could probably be pass-through (Since we use BodyParser to convert async->sync and later we use into_response() to convert from sync->async). To be investigated in subsequent PRs.

FetchState::Streaming(response)
},
Err(e) => {
Expand All @@ -319,7 +323,7 @@ impl ContentFetcherHandler {
});

// make sure to run within fetch thread pool.
fetch.process(future)
Box::new(pool2.spawn(future))
}
}

Expand Down
27 changes: 6 additions & 21 deletions dapps/src/tests/helpers/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use std::{io, thread, time};
use std::{thread, time};
use std::sync::{atomic, mpsc, Arc};
use parking_lot::Mutex;
use hyper;

use futures::{self, Future};
use fetch::{self, Fetch};
use fetch::{self, Fetch, Url};

pub struct FetchControl {
sender: mpsc::Sender<()>,
Expand Down Expand Up @@ -96,11 +97,8 @@ impl FakeFetch {
impl Fetch for FakeFetch {
type Result = Box<Future<Item = fetch::Response, Error = fetch::Error> + Send>;

fn new() -> Result<Self, fetch::Error> where Self: Sized {
Ok(FakeFetch::default())
}

fn fetch_with_abort(&self, url: &str, _abort: fetch::Abort) -> Self::Result {
fn fetch(&self, url: &str, abort: fetch::Abort) -> Self::Result {
let u = Url::parse(url).unwrap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps Url crate should be re-exported from fetch if it's required to instantiate some other public types (Response)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this may be convenient.

self.requested.lock().push(url.into());
let manual = self.manual.clone();
let response = self.response.clone();
Expand All @@ -111,23 +109,10 @@ impl Fetch for FakeFetch {
// wait for manual resume
let _ = rx.recv();
}

let data = response.lock().take().unwrap_or(b"Some content");
let cursor = io::Cursor::new(data);
tx.send(fetch::Response::from_reader(cursor)).unwrap();
tx.send(fetch::Response::new(u, hyper::Response::new().with_body(data), abort)).unwrap();
});

Box::new(rx.map_err(|_| fetch::Error::Aborted))
}

fn process_and_forget<F, I, E>(&self, f: F) where
F: Future<Item=I, Error=E> + Send + 'static,
I: Send + 'static,
E: Send + 'static,
{
// Spawn the task in a separate thread.
thread::spawn(|| {
let _ = f.wait();
});
}
}
20 changes: 6 additions & 14 deletions dapps/src/tests/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub fn init_server<F, B>(process: F, io: IoHandler) -> (Server, Arc<FakeRegistra
let mut dapps_path = env::temp_dir();
dapps_path.push("non-existent-dir-to-prevent-fs-files-from-loading");

let mut builder = ServerBuilder::new(&dapps_path, registrar.clone());
let mut builder = ServerBuilder::new(FetchClient::new().unwrap(), &dapps_path, registrar.clone());
builder.signer_address = Some(("127.0.0.1".into(), SIGNER_PORT));
let server = process(builder).start_unsecured_http(&"127.0.0.1:0".parse().unwrap(), io).unwrap();
(
Expand Down Expand Up @@ -149,21 +149,21 @@ pub struct ServerBuilder<T: Fetch = FetchClient> {
web_proxy_tokens: Arc<WebProxyTokens>,
signer_address: Option<(String, u16)>,
allowed_hosts: DomainsValidation<Host>,
fetch: Option<T>,
fetch: T,
serve_ui: bool,
}

impl ServerBuilder {
/// Construct new dapps server
pub fn new<P: AsRef<Path>>(dapps_path: P, registrar: Arc<RegistrarClient<Call=Asynchronous>>) -> Self {
pub fn new<P: AsRef<Path>>(fetch: FetchClient, dapps_path: P, registrar: Arc<RegistrarClient<Call=Asynchronous>>) -> Self {
ServerBuilder {
dapps_path: dapps_path.as_ref().to_owned(),
registrar: registrar,
sync_status: Arc::new(FakeSync(false)),
web_proxy_tokens: Arc::new(|_| None),
signer_address: None,
allowed_hosts: DomainsValidation::Disabled,
fetch: None,
fetch: fetch,
serve_ui: false,
}
}
Expand All @@ -179,15 +179,14 @@ impl<T: Fetch> ServerBuilder<T> {
web_proxy_tokens: self.web_proxy_tokens,
signer_address: self.signer_address,
allowed_hosts: self.allowed_hosts,
fetch: Some(fetch),
fetch: fetch,
serve_ui: self.serve_ui,
}
}

/// Asynchronously start server with no authentication,
/// returns result with `Server` handle on success or an error.
pub fn start_unsecured_http(self, addr: &SocketAddr, io: IoHandler) -> io::Result<Server> {
let fetch = self.fetch_client();
Server::start_http(
addr,
io,
Expand All @@ -199,17 +198,10 @@ impl<T: Fetch> ServerBuilder<T> {
self.sync_status,
self.web_proxy_tokens,
Remote::new_sync(),
fetch,
self.fetch,
self.serve_ui,
)
}

fn fetch_client(&self) -> T {
match self.fetch.clone() {
Some(fetch) => fetch,
None => T::new().unwrap(),
}
}
}

const DAPPS_DOMAIN: &'static str = "web3.site";
Expand Down
7 changes: 6 additions & 1 deletion dapps/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use hyper::{mime, StatusCode};
use apps;
use endpoint::{Endpoint, EndpointPath, Request, Response};
use futures::future;
use futures_cpupool::CpuPool;
use handlers::{
ContentFetcherHandler, ContentHandler, ContentValidator, ValidatorResponse,
StreamingHandler,
Expand All @@ -35,18 +36,21 @@ pub struct Web<F> {
embeddable_on: Embeddable,
web_proxy_tokens: Arc<WebProxyTokens>,
fetch: F,
pool: CpuPool,
}

impl<F: Fetch> Web<F> {
pub fn boxed(
embeddable_on: Embeddable,
web_proxy_tokens: Arc<WebProxyTokens>,
fetch: F,
pool: CpuPool,
) -> Box<Endpoint> {
Box::new(Web {
embeddable_on,
web_proxy_tokens,
fetch,
pool,
})
}

Expand Down Expand Up @@ -129,6 +133,7 @@ impl<F: Fetch> Endpoint for Web<F> {
},
self.embeddable_on.clone(),
self.fetch.clone(),
self.pool.clone(),
))
}
}
Expand All @@ -146,7 +151,7 @@ impl ContentValidator for WebInstaller {
let is_html = response.is_html();
let mime = response.content_type().unwrap_or(mime::TEXT_HTML);
let mut handler = StreamingHandler::new(
response,
fetch::BodyReader::new(response),
status,
mime,
self.embeddable_on,
Expand Down
1 change: 1 addition & 0 deletions ethcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ ethjson = { path = "../json" }
ethkey = { path = "../ethkey" }
ethstore = { path = "../ethstore" }
evm = { path = "evm" }
futures-cpupool = "0.1"
hardware-wallet = { path = "../hw" }
heapsize = "0.4"
itertools = "0.5"
Expand Down
1 change: 1 addition & 0 deletions ethcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ extern crate ethcore_transaction as transaction;
extern crate ethereum_types;
extern crate ethjson;
extern crate ethkey;
extern crate futures_cpupool;
extern crate hardware_wallet;
extern crate hashdb;
extern crate itertools;
Expand Down
5 changes: 3 additions & 2 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use ethcore_miner::transaction_queue::{
AccountDetails,
TransactionOrigin,
};
use futures_cpupool::CpuPool;
use ethcore_miner::work_notify::{WorkPoster, NotifyWork};
use miner::service_transaction_checker::ServiceTransactionChecker;
use miner::{MinerService, MinerStatus};
Expand Down Expand Up @@ -218,11 +219,11 @@ pub enum GasPricer {

impl GasPricer {
/// Create a new Calibrated `GasPricer`.
pub fn new_calibrated(options: GasPriceCalibratorOptions, fetch: FetchClient) -> GasPricer {
pub fn new_calibrated(options: GasPriceCalibratorOptions, fetch: FetchClient, p: CpuPool) -> GasPricer {
GasPricer::Calibrated(GasPriceCalibrator {
options: options,
next_calibration: Instant::now(),
price_info: PriceInfoClient::new(fetch),
price_info: PriceInfoClient::new(fetch, p),
})
}

Expand Down
2 changes: 2 additions & 0 deletions hash-fetch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ authors = ["Parity Technologies <[email protected]>"]

[dependencies]
futures = "0.1"
futures-cpupool = "0.1"
log = "0.3"
mime = "0.3"
mime_guess = "2.0.0-alpha.2"
Expand All @@ -25,4 +26,5 @@ ethabi-derive = "5.0"
ethabi-contract = "5.0"

[dev-dependencies]
hyper = "0.11"
parking_lot = "0.5"
Loading