Skip to content

Commit

Permalink
[http client]: refactor with "syncronous-like" design (#156)
Browse files Browse the repository at this point in the history
* experimental

* ci(benches): sync and concurrent roundtrips

Improve benchmarks to take concurrent requests into account.

* ci(benches): sync and concurrent roundtrips

Improve benchmarks to take concurrent requests into account.

* fix(nits)

* feat(http client): limit max request body size

* test(http transport): request limit test

* test(http client): add tests.

* fix typo

* fix(benches): make it compile again.

* fix(ws example): revert unintentional change.

* test(http client): subscription response on call.

* fix(cleanup)

* fix(benches): make it compile again.

* Update src/client/http/transport.rs

* fix(http client): `&str` -> `AsRef<str>`

* docs(client types): better docs for Mismatch type.

* style: `Default::default` -> `HttpConfig::default`

* fix(http client): read body size from header.

Expermential to read number of bytes from `HTTP Content Length` to pre-allocate the number of bytes and bail early
if the length is bigger than the `max_request_body size`

Need to be benched with bigger requests.

* test(raw http): enable tests to works again.

* style: cargo fmt

* benches: address grumbles

* feat(jsonrpc response/request): impl `Display`

* refactor(logging): use display impl

* fix(http client): nits.

* Update benches/benches.rs

* fix bad merge.
  • Loading branch information
niklasad1 authored Nov 16, 2020
1 parent d647fb2 commit 42b2982
Show file tree
Hide file tree
Showing 14 changed files with 445 additions and 677 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ ws = ["async-tls", "bytes", "soketto", "url", "webpki"]
criterion = "0.3.3"
env_logger = "0.8.1"
jsonrpsee-test-utils = { path = "test-utils" }
num_cpus = "1.13.0"

[[bench]]
name = "benches"
Expand Down
89 changes: 67 additions & 22 deletions benches/benches.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
use async_std::task::block_on;
use criterion::*;
use futures::channel::oneshot::{self, Sender};
use jsonrpsee::client::{HttpClient, WsClient};
use jsonrpsee::client::{HttpClient, HttpConfig, WsClient};
use jsonrpsee::http::HttpServer;
use jsonrpsee::types::jsonrpc::{JsonValue, Params};
use jsonrpsee::ws::WsServer;
use std::net::SocketAddr;
use std::sync::Arc;

criterion_group!(benches, http, ws);
criterion_group!(benches, http_requests, websocket_requests);
criterion_main!(benches);

fn concurrent_tasks() -> Vec<usize> {
let cores = num_cpus::get();
vec![cores / 4, cores / 2, cores, cores * 2, cores * 4]
}

async fn http_server(tx: Sender<SocketAddr>) {
let server = HttpServer::new("127.0.0.1:0").await.unwrap();
let mut say_hello = server.register_method("say_hello".to_string()).unwrap();
Expand All @@ -30,36 +36,75 @@ async fn ws_server(tx: Sender<SocketAddr>) {
}
}

pub fn http(c: &mut criterion::Criterion) {
c.bench_function("http 100 requests", |b| {
let (tx_addr, rx_addr) = oneshot::channel::<SocketAddr>();
async_std::task::spawn(http_server(tx_addr));
let server_addr = block_on(rx_addr).unwrap();
let client = HttpClient::new(&format!("http://{}", server_addr));
pub fn http_requests(c: &mut criterion::Criterion) {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let (tx_addr, rx_addr) = oneshot::channel::<SocketAddr>();
async_std::task::spawn(http_server(tx_addr));
let server_addr = block_on(rx_addr).unwrap();
let client = Arc::new(HttpClient::new(&format!("http://{}", server_addr), HttpConfig::default()).unwrap());

c.bench_function("synchronous http round trip", |b| {
b.iter(|| {
block_on(async {
for _ in 0..100 {
let _: JsonValue = black_box(client.request("say_hello", Params::None).await.unwrap());
}
rt.block_on(async {
let _: JsonValue = black_box(client.request("say_hello", Params::None).await.unwrap());
})
})
});

c.bench_function_over_inputs(
"concurrent http round trip",
move |b: &mut Bencher, size: &usize| {
b.iter(|| {
let mut tasks = Vec::new();
for _ in 0..*size {
let client_rc = client.clone();
let task = rt.spawn(async move {
let _: Result<JsonValue, _> = black_box(client_rc.request("say_hello", Params::None)).await;
});
tasks.push(task);
}
for task in tasks {
rt.block_on(task).unwrap();
}
})
},
concurrent_tasks(),
);
}

pub fn ws(c: &mut criterion::Criterion) {
c.bench_function("ws 100 request", |b| {
let (tx_addr, rx_addr) = oneshot::channel::<SocketAddr>();
async_std::task::spawn(ws_server(tx_addr));
let server_addr = block_on(rx_addr).unwrap();
let client = block_on(WsClient::new(&format!("ws://{}", server_addr))).unwrap();
pub fn websocket_requests(c: &mut criterion::Criterion) {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let (tx_addr, rx_addr) = oneshot::channel::<SocketAddr>();
async_std::task::spawn(ws_server(tx_addr));
let server_addr = block_on(rx_addr).unwrap();
let client = Arc::new(block_on(WsClient::new(&format!("ws://{}", server_addr))).unwrap());

c.bench_function("synchronous WebSocket round trip", |b| {
b.iter(|| {
block_on(async {
for _ in 0..100 {
let _: JsonValue = black_box(client.request("say_hello", Params::None).await.unwrap());
}
rt.block_on(async {
let _: JsonValue = black_box(client.request("say_hello", Params::None).await.unwrap());
})
})
});

c.bench_function_over_inputs(
"concurrent WebSocket round trip",
move |b: &mut Bencher, size: &usize| {
b.iter(|| {
let mut tasks = Vec::new();
for _ in 0..*size {
let client_rc = client.clone();
let task = rt.spawn(async move {
let _: Result<JsonValue, _> = black_box(client_rc.request("say_hello", Params::None)).await;
});
tasks.push(task);
}
for task in tasks {
rt.block_on(task).unwrap();
}
})
},
// TODO(niklasad1): This deadlocks when more than 8 tasks are spawned.
concurrent_tasks(),
);
}
6 changes: 3 additions & 3 deletions examples/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@

use async_std::task;
use futures::channel::oneshot::{self, Sender};
use jsonrpsee::client::HttpClient;
use jsonrpsee::client::{HttpClient, HttpConfig};
use jsonrpsee::http::HttpServer;
use jsonrpsee::types::jsonrpc::{JsonValue, Params};

const SOCK_ADDR: &str = "127.0.0.1:9933";
const SERVER_URI: &str = "http://localhost:9933";

#[async_std::main]
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();

Expand All @@ -44,7 +44,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

server_started_rx.await?;

let client = HttpClient::new(SERVER_URI);
let client = HttpClient::new(SERVER_URI, HttpConfig::default())?;
let response: Result<JsonValue, _> = client.request("say_hello", Params::None).await;
println!("r: {:?}", response);

Expand Down
202 changes: 79 additions & 123 deletions src/client/http/client.rs
Original file line number Diff line number Diff line change
@@ -1,155 +1,111 @@
use std::collections::HashMap;
use std::io;

use crate::client::http::raw::*;
use crate::client::http::transport::HttpTransportClient;
use crate::types::client::Error;
use crate::types::client::{Error, Mismatch};
use crate::types::jsonrpc::{self, JsonValue};
use std::sync::atomic::{AtomicU64, Ordering};

use futures::{channel::mpsc, channel::oneshot, future::Either, pin_mut, prelude::*};
/// Default maximum request body size (10 MB).
const DEFAULT_MAX_BODY_SIZE_TEN_MB: u32 = 10 * 1024 * 1024;

/// Client that wraps a `RawClient` where the `RawClient` is spawned in a background worker tasks.
///
/// The communication is performed via a `mpsc` channel where the `Client` acts as simple frontend
/// and just passes requests along to the backend (worker thread)
#[derive(Clone)]
pub struct Client {
backend: mpsc::Sender<FrontToBack>,
/// HTTP configuration.
#[derive(Copy, Clone)]
pub struct HttpConfig {
/// Maximum request body size in bytes.
pub max_request_body_size: u32,
}

/// Message that the [`Client`] can send to the background task.
enum FrontToBack {
/// Send a one-shot notification to the server. The server doesn't give back any feedback.
Notification {
/// Method for the notification.
method: String,
/// Parameters to send to the server.
params: jsonrpc::Params,
},

/// Send a request to the server.
StartRequest {
/// Method for the request.
method: String,
/// Parameters of the request.
params: jsonrpc::Params,
/// One-shot channel where to send back the outcome of that request.
send_back: oneshot::Sender<Result<JsonValue, Error>>,
},
/// JSON-RPC HTTP Client that provides functionality to perform method calls and notifications.
///
/// WARNING: The async methods must be executed on [Tokio 0.2](https://docs.rs/tokio/0.2.22/tokio).
pub struct HttpClient {
/// HTTP transport client.
transport: HttpTransportClient,
/// Request ID that wraps around when overflowing.
request_id: AtomicU64,
}

impl Client {
/// Create a client to connect to the server at address `endpoint`
pub fn new(endpoint: &str) -> Self {
let client = RawClient::new(HttpTransportClient::new(endpoint));

let (to_back, from_front) = mpsc::channel(16);
async_std::task::spawn(async move {
background_task(client, from_front).await;
});
impl Default for HttpConfig {
fn default() -> Self {
Self { max_request_body_size: DEFAULT_MAX_BODY_SIZE_TEN_MB }
}
}

Self { backend: to_back }
impl HttpClient {
/// Initializes a new HTTP client.
///
/// Fails when the URL is invalid.
pub fn new(target: impl AsRef<str>, config: HttpConfig) -> Result<Self, Error> {
let transport = HttpTransportClient::new(target, config.max_request_body_size)
.map_err(|e| Error::TransportError(Box::new(e)))?;
Ok(Self { transport, request_id: AtomicU64::new(0) })
}

/// Send a notification to the server.
///
/// WARNING: This method must be executed on [Tokio 0.2](https://docs.rs/tokio/0.2.22/tokio).
pub async fn notification(
&self,
method: impl Into<String>,
params: impl Into<jsonrpc::Params>,
) -> Result<(), Error> {
let method = method.into();
let params = params.into();
log::trace!("[frontend]: client send notification: method={:?}, params={:?}", method, params);
self.backend.clone().send(FrontToBack::Notification { method, params }).await.map_err(Error::InternalChannel)
let request = jsonrpc::Request::Single(jsonrpc::Call::Notification(jsonrpc::Notification {
jsonrpc: jsonrpc::Version::V2,
method: method.into(),
params: params.into(),
}));

self.transport.send_notification(request).await.map_err(|e| Error::TransportError(Box::new(e)))
}

/// Perform a request towards the server.
pub async fn request<Ret>(
///
/// WARNING: This method must be executed on [Tokio 0.2](https://docs.rs/tokio/0.2.22/tokio).
pub async fn request(
&self,
method: impl Into<String>,
params: impl Into<jsonrpc::Params>,
) -> Result<Ret, Error>
where
Ret: jsonrpc::DeserializeOwned,
{
let method = method.into();
let params = params.into();
log::trace!("[frontend]: send request: method={:?}, params={:?}", method, params);
let (send_back_tx, send_back_rx) = oneshot::channel();

// TODO: send a `ChannelClosed` message if we close the channel unexpectedly

self.backend.clone().send(FrontToBack::StartRequest { method, params, send_back: send_back_tx }).await?;
let json_value = match send_back_rx.await {
Ok(Ok(v)) => v,
Ok(Err(err)) => return Err(err),
Err(_) => {
let err = io::Error::new(io::ErrorKind::Other, "background task closed");
return Err(Error::TransportError(Box::new(err)));
) -> Result<JsonValue, Error> {
// NOTE: `fetch_add` wraps on overflow which is intended.
let id = self.request_id.fetch_add(1, Ordering::SeqCst);
let request = jsonrpc::Request::Single(jsonrpc::Call::MethodCall(jsonrpc::MethodCall {
jsonrpc: jsonrpc::Version::V2,
method: method.into(),
params: params.into(),
id: jsonrpc::Id::Num(id),
}));

let response = self
.transport
.send_request_and_wait_for_response(request)
.await
.map_err(|e| Error::TransportError(Box::new(e)))?;

match response {
jsonrpc::Response::Single(rp) => Self::process_response(rp, id),
// Server should not send batch response to a single request.
jsonrpc::Response::Batch(_rps) => {
Err(Error::Custom("Server replied with batch response to a single request".to_string()))
}
};
jsonrpc::from_value(json_value).map_err(Error::ParseError)
}
}

/// Function being run in the background that processes messages from the frontend.
async fn background_task(mut client: RawClient, mut from_front: mpsc::Receiver<FrontToBack>) {
// List of requests that the server must answer.
let mut ongoing_requests: HashMap<RawClientRequestId, oneshot::Sender<Result<_, _>>> = HashMap::new();

loop {
// We need to do a little transformation in order to destroy the borrow to `client`
// and `from_front`.
let outcome = {
let next_message = from_front.next();
let next_event = client.next_event();
pin_mut!(next_message);
pin_mut!(next_event);
match future::select(next_message, next_event).await {
Either::Left((v, _)) => Either::Left(v),
Either::Right((v, _)) => Either::Right(v),
}
};

match outcome {
// If the channel is closed, then the `Client` has been destroyed and we
// stop this task.
Either::Left(None) => {
log::trace!("[backend]: background task terminated");
if !ongoing_requests.is_empty() {
log::warn!("client was dropped with {} pending requests", ongoing_requests.len());
}
return;
// Server should not reply to a Notification.
jsonrpc::Response::Notif(_notif) => {
Err(Error::Custom(format!("Server replied with notification response to request ID: {}", id)))
}
}
}

// User called `notification` on the front-end.
Either::Left(Some(FrontToBack::Notification { method, params })) => {
log::trace!("[backend]: send notification");
let _ = client.send_notification(method, params).await;
fn process_response(response: jsonrpc::Output, expected_id: u64) -> Result<JsonValue, Error> {
match response.id() {
jsonrpc::Id::Num(n) if n == &expected_id => {
let ret: Result<JsonValue, _> = response.into();
ret.map_err(Error::Request)
}

// User called `request` on the front-end.
Either::Left(Some(FrontToBack::StartRequest { method, params, send_back })) => {
match client.start_request(&method, params).await {
Ok(id) => {
log::trace!("[backend]; send request: {:?} id: {:?}", method, id);
ongoing_requests.insert(id, send_back);
}
Err(err) => {
let _ = send_back.send(Err(Error::TransportError(Box::new(err))));
}
}
jsonrpc::Id::Num(n) => {
Err(Error::InvalidRequestId(Mismatch { expected: expected_id.into(), got: (*n).into() }))
}

// Received a response to a request from the server.
Either::Right(Ok(RawClientEvent::Response { request_id, result })) => {
log::trace!("[backend] received response to req={:?}, result={:?}", request_id, result);
let _ = ongoing_requests.remove(&request_id).unwrap().send(result.map_err(Error::Request));
jsonrpc::Id::Str(s) => {
Err(Error::InvalidRequestId(Mismatch { expected: expected_id.into(), got: s.to_string().into() }))
}

Either::Right(Err(e)) => {
// TODO: https://github.com/paritytech/jsonrpsee/issues/67
log::error!("Client Error: {:?}", e);
jsonrpc::Id::Null => {
Err(Error::InvalidRequestId(Mismatch { expected: expected_id.into(), got: JsonValue::Null }))
}
}
}
Expand Down
Loading

0 comments on commit 42b2982

Please sign in to comment.