Skip to content

Commit

Permalink
middleware refactoring (#793)
Browse files Browse the repository at this point in the history
* WIP: refactoring

* refactor http server

* fix tests

* Delete TODO.txt

* fix tests again

* add benches/src/lib.rs

* remove bench changes; fast less deps

* no more env_logger

* update examples

* ws server; expose headers in middleware

* add back uncommented code

* fix nits

* make the code more readable

* add back the tracing stuff

* simplify code but one extra clone

* fix tests again

* revert async accept API

* fix nits

* different traits for WS and HTTP middleware

* fix tests

* revert benchmark change

* Update core/src/server/helpers.rs

* Update ws-server/Cargo.toml

* add limit to batch responses as well

* pre-allocate string for batches

* small refactor
  • Loading branch information
niklasad1 authored Jul 6, 2022
1 parent d974914 commit 3ee635f
Show file tree
Hide file tree
Showing 27 changed files with 1,272 additions and 755 deletions.
2 changes: 1 addition & 1 deletion client/wasm-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jsonrpsee-client-transport = { path = "../transport", version = "0.14.0", featur
jsonrpsee-core = { path = "../../core", version = "0.14.0", features = ["async-wasm-client"] }

[dev-dependencies]
env_logger = "0.9"
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
jsonrpsee-test-utils = { path = "../../test-utils" }
tokio = { version = "1", features = ["macros"] }
serde_json = "1"
2 changes: 1 addition & 1 deletion client/ws-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jsonrpsee-client-transport = { path = "../transport", version = "0.14.0", featur
jsonrpsee-core = { path = "../../core", version = "0.14.0", features = ["async-client"] }

[dev-dependencies]
env_logger = "0.9"
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
jsonrpsee-test-utils = { path = "../../test-utils" }
tokio = { version = "1", features = ["macros"] }
serde_json = "1"
Expand Down
12 changes: 9 additions & 3 deletions client/ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ async fn notification_without_polling_doesnt_make_client_unuseable() {
client.subscribe_to_method("test").with_default_timeout().await.unwrap().unwrap();

// don't poll the notification stream for 2 seconds, should be full now.
std::thread::sleep(std::time::Duration::from_secs(2));
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

// Capacity is `num_sender` + `capacity`
for _ in 0..5 {
Expand Down Expand Up @@ -244,6 +244,11 @@ async fn batch_request_out_of_order_response() {

#[tokio::test]
async fn is_connected_works() {
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.expect("setting default subscriber failed");

let server = WebSocketTestServer::with_hardcoded_response(
"127.0.0.1:0".parse().unwrap(),
ok_response(JsonValue::String("foo".into()), Id::Num(99_u64)),
Expand All @@ -254,9 +259,11 @@ async fn is_connected_works() {
let uri = to_ws_uri_string(server.local_addr());
let client = WsClientBuilder::default().build(&uri).with_default_timeout().await.unwrap().unwrap();
assert!(client.is_connected());

client.request::<String>("say_hello", None).with_default_timeout().await.unwrap().unwrap_err();

// give the background thread some time to terminate.
std::thread::sleep(std::time::Duration::from_millis(100));
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert!(!client.is_connected())
}

Expand Down Expand Up @@ -295,7 +302,6 @@ fn assert_error_response(err: Error, exp: ErrorObjectOwned) {

#[tokio::test]
async fn redirections() {
let _ = env_logger::try_init();
let expected = "abc 123";
let server = WebSocketTestServer::with_hardcoded_response(
"127.0.0.1:0".parse().unwrap(),
Expand Down
2 changes: 2 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ futures-timer = { version = "3", optional = true }
globset = { version = "0.4", optional = true }
lazy_static = { version = "1", optional = true }
unicase = { version = "2.6.0", optional = true }
http = { version = "0.2.7", optional = true }

[features]
default = []
Expand All @@ -49,6 +50,7 @@ server = [
"tokio/sync",
"lazy_static",
"unicase",
"http",
]
client = ["futures-util/sink", "futures-channel/sink", "futures-channel/std"]
async-client = [
Expand Down
4 changes: 1 addition & 3 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,14 @@ pub mod error;
/// Traits
pub mod traits;

/// Middleware trait and implementation.
pub mod middleware;

cfg_http_helpers! {
pub mod http_helpers;
}

cfg_server! {
pub mod id_providers;
pub mod server;
pub mod middleware;
}

cfg_client! {
Expand Down
138 changes: 110 additions & 28 deletions core/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,67 +26,149 @@

//! Middleware for `jsonrpsee` servers.

/// Defines a middleware with callbacks during the RPC request life-cycle. The primary use case for
/// this is to collect timings for a larger metrics collection solution but the only constraints on
/// the associated type is that it be [`Send`] and [`Copy`], giving users some freedom to do what
/// they need to do.
use std::net::SocketAddr;

pub use http::HeaderMap as Headers;
pub use jsonrpsee_types::Params;

/// Defines a middleware specifically for HTTP requests with callbacks during the RPC request life-cycle.
/// The primary use case for this is to collect timings for a larger metrics collection solution.
///
/// See [`HttpServerBuilder::set_middleware`](../../jsonrpsee_http_server/struct.HttpServerBuilder.html#method.set_middleware) method
/// for examples.
pub trait HttpMiddleware: Send + Sync + Clone + 'static {
/// Intended to carry timestamp of a request, for example `std::time::Instant`. How the middleware
/// measures time, if at all, is entirely up to the implementation.
type Instant: std::fmt::Debug + Send + Sync + Copy;

/// Called when a new JSON-RPC request comes to the server.
fn on_request(&self, remote_addr: SocketAddr, headers: &Headers) -> Self::Instant;

/// Called on each JSON-RPC method call, batch requests will trigger `on_call` multiple times.
fn on_call(&self, method_name: &str, params: Params);

/// Called on each JSON-RPC method completion, batch requests will trigger `on_result` multiple times.
fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant);

/// Called once the JSON-RPC request is finished and response is sent to the output buffer.
fn on_response(&self, result: &str, _started_at: Self::Instant);
}

/// Defines a middleware specifically for WebSocket connections with callbacks during the RPC request life-cycle.
/// The primary use case for this is to collect timings for a larger metrics collection solution.
///
/// See the [`WsServerBuilder::set_middleware`](../../jsonrpsee_ws_server/struct.WsServerBuilder.html#method.set_middleware)
/// or the [`HttpServerBuilder::set_middleware`](../../jsonrpsee_http_server/struct.HttpServerBuilder.html#method.set_middleware) method
/// for examples.
pub trait Middleware: Send + Sync + Clone + 'static {
pub trait WsMiddleware: Send + Sync + Clone + 'static {
/// Intended to carry timestamp of a request, for example `std::time::Instant`. How the middleware
/// measures time, if at all, is entirely up to the implementation.
type Instant: Send + Copy;
type Instant: std::fmt::Debug + Send + Sync + Copy;

/// Called when a new client connects (WebSocket only)
fn on_connect(&self) {}
/// Called when a new client connects
fn on_connect(&self, remote_addr: SocketAddr, headers: &Headers);

/// Called when a new JSON-RPC comes to the server.
/// Called when a new JSON-RPC request comes to the server.
fn on_request(&self) -> Self::Instant;

/// Called on each JSON-RPC method call, batch requests will trigger `on_call` multiple times.
fn on_call(&self, _name: &str) {}
fn on_call(&self, method_name: &str, params: Params);

/// Called on each JSON-RPC method completion, batch requests will trigger `on_result` multiple times.
fn on_result(&self, _name: &str, _success: bool, _started_at: Self::Instant) {}
fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant);

/// Called once the JSON-RPC request is finished and response is sent to the output buffer.
fn on_response(&self, _started_at: Self::Instant) {}
fn on_response(&self, result: &str, started_at: Self::Instant);

/// Called when a client disconnects
fn on_disconnect(&self, remote_addr: std::net::SocketAddr);
}

impl HttpMiddleware for () {
type Instant = ();

fn on_request(&self, _: std::net::SocketAddr, _: &Headers) -> Self::Instant {}

fn on_call(&self, _: &str, _: Params) {}

fn on_result(&self, _: &str, _: bool, _: Self::Instant) {}

/// Called when a client disconnects (WebSocket only)
fn on_disconnect(&self) {}
fn on_response(&self, _: &str, _: Self::Instant) {}
}

impl Middleware for () {
impl WsMiddleware for () {
type Instant = ();

fn on_connect(&self, _: std::net::SocketAddr, _: &Headers) {}

fn on_request(&self) -> Self::Instant {}

fn on_call(&self, _: &str, _: Params) {}

fn on_result(&self, _: &str, _: bool, _: Self::Instant) {}

fn on_response(&self, _: &str, _: Self::Instant) {}

fn on_disconnect(&self, _: std::net::SocketAddr) {}
}

impl<A, B> Middleware for (A, B)
impl<A, B> WsMiddleware for (A, B)
where
A: Middleware,
B: Middleware,
A: WsMiddleware,
B: WsMiddleware,
{
type Instant = (A::Instant, B::Instant);

fn on_connect(&self, remote_addr: std::net::SocketAddr, headers: &Headers) {
(self.0.on_connect(remote_addr, headers), self.1.on_connect(remote_addr, headers));
}

fn on_request(&self) -> Self::Instant {
(self.0.on_request(), self.1.on_request())
}

fn on_call(&self, name: &str) {
self.0.on_call(name);
self.1.on_call(name);
fn on_call(&self, method_name: &str, params: Params) {
self.0.on_call(method_name, params.clone());
self.1.on_call(method_name, params);
}

fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant) {
self.0.on_result(method_name, success, started_at.0);
self.1.on_result(method_name, success, started_at.1);
}

fn on_response(&self, result: &str, started_at: Self::Instant) {
self.0.on_response(result, started_at.0);
self.1.on_response(result, started_at.1);
}

fn on_disconnect(&self, remote_addr: std::net::SocketAddr) {
(self.0.on_disconnect(remote_addr), self.1.on_disconnect(remote_addr));
}
}

impl<A, B> HttpMiddleware for (A, B)
where
A: HttpMiddleware,
B: HttpMiddleware,
{
type Instant = (A::Instant, B::Instant);

fn on_request(&self, remote_addr: std::net::SocketAddr, headers: &Headers) -> Self::Instant {
(self.0.on_request(remote_addr, headers), self.1.on_request(remote_addr, headers))
}

fn on_call(&self, method_name: &str, params: Params) {
self.0.on_call(method_name, params.clone());
self.1.on_call(method_name, params);
}

fn on_result(&self, name: &str, success: bool, started_at: Self::Instant) {
self.0.on_result(name, success, started_at.0);
self.1.on_result(name, success, started_at.1);
fn on_result(&self, method_name: &str, success: bool, started_at: Self::Instant) {
self.0.on_result(method_name, success, started_at.0);
self.1.on_result(method_name, success, started_at.1);
}

fn on_response(&self, started_at: Self::Instant) {
self.0.on_response(started_at.0);
self.1.on_response(started_at.1);
fn on_response(&self, result: &str, started_at: Self::Instant) {
self.0.on_response(result, started_at.0);
self.1.on_response(result, started_at.1);
}
}
Loading

0 comments on commit 3ee635f

Please sign in to comment.