Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use global Tokio runtime #543

Merged
merged 1 commit into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion impls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ring = "0.16"
tokio = { version = "0.2", features = ["full"] }
uuid = { version = "0.8", features = ["serde", "v4"] }
chrono = { version = "0.4.11", features = ["serde"] }
crossbeam-utils = "0.7"
lazy_static = "1.4"

#http client (copied from grin)
http = "0.2"
Expand Down
37 changes: 22 additions & 15 deletions impls/src/client_utils/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,35 @@
//! High level JSON/HTTP client API

use crate::util::to_base64;
use crossbeam_utils::thread::scope;
use failure::{Backtrace, Context, Fail, ResultExt};
use hyper::body;
use hyper::header::{ACCEPT, AUTHORIZATION, CONTENT_TYPE, USER_AGENT};
use hyper::{self, Body, Client as HyperClient, Request, Uri};
use hyper_rustls;
use hyper_timeout::TimeoutConnector;
use lazy_static::lazy_static;
use serde::{Deserialize, Serialize};
use serde_json;
use std::fmt::{self, Display};
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::runtime::Builder;
use tokio::runtime::{Builder, Runtime};

// Global Tokio runtime.
// Needs a `Mutex` because `Runtime::block_on` requires mutable access.
// Tokio v0.3 requires immutable self, but we are waiting on upstream
// updates before we can upgrade.
// See: https://github.com/seanmonstar/reqwest/pull/1076
lazy_static! {
pub static ref RUNTIME: Arc<Mutex<Runtime>> = Arc::new(Mutex::new(
Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.unwrap()
));
}

/// Errors that can be returned by an ApiEndpoint implementation.
#[derive(Debug)]
Expand Down Expand Up @@ -323,18 +339,9 @@ impl Client {
}

pub fn send_request(&self, req: Request<Body>) -> Result<String, Error> {
let task = self.send_request_async(req);
scope(|s| {
let handle = s.spawn(|_| {
let mut rt = Builder::new()
.basic_scheduler()
.enable_all()
.build()
.context(ErrorKind::Internal("can't create Tokio runtime".to_owned()))?;
rt.block_on(task)
});
handle.join().unwrap()
})
.unwrap()
RUNTIME
.lock()
.unwrap()
.block_on(self.send_request_async(req))
}
}
2 changes: 1 addition & 1 deletion impls/src/client_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
mod client;
pub mod json_rpc;

pub use client::{Client, Error as ClientError};
pub use client::{Client, Error as ClientError, RUNTIME};
18 changes: 2 additions & 16 deletions impls/src/node_clients/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
use crate::api::{self, LocatedTxKernel, OutputListing, OutputPrintable};
use crate::core::core::{Transaction, TxKernel};
use crate::libwallet::{NodeClient, NodeVersionInfo};
use crossbeam_utils::thread::scope;
use futures::stream::FuturesUnordered;
use futures::TryStreamExt;
use std::collections::HashMap;
use std::env;
use tokio::runtime::Builder;

use crate::client_utils::Client;
use crate::client_utils::{Client, RUNTIME};
use crate::libwallet;
use crate::util::secp::pedersen;
use crate::util::ToHex;
Expand Down Expand Up @@ -251,19 +249,7 @@ impl NodeClient for HTTPNodeClient {
task.try_collect().await
};

let res = scope(|s| {
let handle = s.spawn(|_| {
let mut rt = Builder::new()
.threaded_scheduler()
.enable_all()
.build()
.unwrap();
let res: Result<Vec<Response>, _> = rt.block_on(task);
res
});
handle.join().unwrap()
})
.unwrap();
let res: Result<Vec<Response>, _> = RUNTIME.lock().unwrap().block_on(task);

let results: Vec<OutputPrintable> = match res {
Ok(resps) => {
Expand Down