Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(deps): migrate to jsonrpsee 0.22 #5894

Merged
merged 17 commits into from
Apr 5, 2024
Merged
212 changes: 158 additions & 54 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ reth-trie-parallel = { path = "crates/trie-parallel" }
# revm
revm = { version = "8.0.0", features = ["std", "secp256k1"], default-features = false }
revm-primitives = { version = "3.1.0", features = ["std"], default-features = false }
revm-inspectors = { git = "https://github.com/paradigmxyz/evm-inspectors", rev = "0ef5814" }
revm-inspectors = { git = "https://github.com/paradigmxyz/evm-inspectors", rev = "c34b770" }

# eth
alloy-chains = { version = "0.1", feature = ["serde", "rlp", "arbitrary"] }
Expand All @@ -263,12 +263,12 @@ alloy-dyn-abi = "0.7.0"
alloy-sol-types = "0.7.0"
alloy-rlp = "0.3.4"
alloy-trie = "0.3"
alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "8c9dd0a" }
alloy-rpc-types-trace = { git = "https://github.com/alloy-rs/alloy", rev = "8c9dd0a" }
alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/alloy", rev = "8c9dd0a" }
alloy-genesis = { git = "https://github.com/alloy-rs/alloy", rev = "8c9dd0a" }
alloy-node-bindings = { git = "https://github.com/alloy-rs/alloy", rev = "8c9dd0a" }
alloy-eips = { git = "https://github.com/alloy-rs/alloy", rev = "8c9dd0a" }
alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "17633df" }
alloy-rpc-types-trace = { git = "https://github.com/alloy-rs/alloy", rev = "17633df" }
alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/alloy", rev = "17633df" }
alloy-genesis = { git = "https://github.com/alloy-rs/alloy", rev = "17633df" }
alloy-node-bindings = { git = "https://github.com/alloy-rs/alloy", rev = "17633df" }
alloy-eips = { git = "https://github.com/alloy-rs/alloy", rev = "17633df" }

# TODO: Remove
ethers-core = { version = "2.0.14", default-features = false }
Expand Down Expand Up @@ -328,9 +328,9 @@ discv5 = { git = "https://github.com/sigp/discv5", rev = "04ac004" }
igd-next = "0.14.3"

# rpc
jsonrpsee = "0.20"
jsonrpsee-core = "0.20"
jsonrpsee-types = "0.20"
jsonrpsee = "0.22"
jsonrpsee-core = "0.22"
jsonrpsee-types = "0.22"

# crypto
secp256k1 = { version = "0.27.0", default-features = false, features = [
Expand Down
4 changes: 2 additions & 2 deletions crates/node-core/src/args/rpc_server_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use reth_rpc_builder::{
auth::{AuthServerConfig, AuthServerHandle},
constants,
error::RpcError,
EthConfig, IpcServerBuilder, RethRpcModule, RpcModuleConfig, RpcModuleSelection,
EthConfig, Identity, IpcServerBuilder, RethRpcModule, RpcModuleConfig, RpcModuleSelection,
RpcServerConfig, RpcServerHandle, ServerBuilder, TransportRpcModuleConfig,
};
use reth_rpc_engine_api::EngineApi;
Expand Down Expand Up @@ -414,7 +414,7 @@ impl RethRpcConfig for RpcServerArgs {
config
}

fn http_ws_server_builder(&self) -> ServerBuilder {
fn http_ws_server_builder(&self) -> ServerBuilder<Identity, Identity> {
ServerBuilder::new()
.max_connections(self.rpc_max_connections.get())
.max_request_body_size(self.rpc_max_request_size_bytes())
Expand Down
6 changes: 3 additions & 3 deletions crates/node-core/src/cli/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use reth_rpc::{
JwtError, JwtSecret,
};
use reth_rpc_builder::{
auth::AuthServerConfig, error::RpcError, EthConfig, IpcServerBuilder, RpcServerConfig,
ServerBuilder, TransportRpcModuleConfig,
auth::AuthServerConfig, error::RpcError, EthConfig, Identity, IpcServerBuilder,
RpcServerConfig, ServerBuilder, TransportRpcModuleConfig,
};
use reth_transaction_pool::PoolConfig;
use std::{borrow::Cow, path::PathBuf, time::Duration};
Expand Down Expand Up @@ -46,7 +46,7 @@ pub trait RethRpcConfig {
fn transport_rpc_module_config(&self) -> TransportRpcModuleConfig;

/// Returns the default server builder for http/ws
fn http_ws_server_builder(&self) -> ServerBuilder;
fn http_ws_server_builder(&self) -> ServerBuilder<Identity, Identity>;

/// Returns the default ipc server builder
fn ipc_server_builder(&self) -> IpcServerBuilder;
Expand Down
129 changes: 38 additions & 91 deletions crates/rpc/ipc/src/server/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,50 @@

use futures::{stream::FuturesOrdered, StreamExt};
use jsonrpsee::{
batch_response_error,
core::{
server::helpers::{prepare_error, BatchResponseBuilder, MethodResponse},
tracing::{rx_log_from_json, tx_log_from_str},
server::helpers::prepare_error,
tracing::server::{rx_log_from_json, tx_log_from_str},
JsonRawValue,
},
helpers::{batch_response_error, MethodResponseResult},
server::{
logger,
logger::{Logger, TransportProtocol},
IdProvider,
},
server::IdProvider,
types::{
error::{reject_too_many_subscriptions, ErrorCode},
ErrorObject, Id, InvalidRequest, Notification, Params, Request,
},
BoundedSubscriptions, CallOrSubscription, MethodCallback, MethodSink, Methods,
SubscriptionState,
BatchResponseBuilder, BoundedSubscriptions, CallOrSubscription, MethodCallback, MethodResponse,
MethodSink, Methods, ResponsePayload, SubscriptionState,
};
use std::sync::Arc;
use std::{sync::Arc, time::Instant};
use tokio::sync::OwnedSemaphorePermit;
use tokio_util::either::Either;
use tracing::instrument;

type Notif<'a> = Notification<'a, Option<&'a JsonRawValue>>;

#[derive(Debug, Clone)]
pub(crate) struct Batch<'a, L: Logger> {
pub(crate) struct Batch<'a> {
data: Vec<u8>,
call: CallData<'a, L>,
call: CallData<'a>,
}

#[derive(Debug, Clone)]
pub(crate) struct CallData<'a, L: Logger> {
pub(crate) struct CallData<'a> {
conn_id: usize,
logger: &'a L,
methods: &'a Methods,
id_provider: &'a dyn IdProvider,
sink: &'a MethodSink,
max_response_body_size: u32,
max_log_length: u32,
request_start: L::Instant,
request_start: Instant,
bounded_subscriptions: BoundedSubscriptions,
}

// Batch responses must be sent back as a single message so we read the results from each
// request in the batch and read the results off of a new channel, `rx_batch`, and then send the
// complete batch response back to the client over `tx`.
#[instrument(name = "batch", skip(b), level = "TRACE")]
pub(crate) async fn process_batch_request<L>(b: Batch<'_, L>) -> Option<String>
where
L: Logger,
{
pub(crate) async fn process_batch_request(b: Batch<'_>) -> Option<String> {
let Batch { data, call } = b;

if let Ok(batch) = serde_json::from_slice::<Vec<&JsonRawValue>>(&data) {
Expand Down Expand Up @@ -88,23 +80,24 @@ where

while let Some(response) = pending_calls.next().await {
if let Err(too_large) = batch_response.append(&response) {
return Some(too_large)
return Some(too_large.to_result())
}
}

if got_notif && batch_response.is_empty() {
None
} else {
Some(batch_response.finish())
let batch_resp = batch_response.finish();
Some(MethodResponse::from_batch(batch_resp).to_result())
}
} else {
Some(batch_response_error(Id::Null, ErrorObject::from(ErrorCode::ParseError)))
}
}

pub(crate) async fn process_single_request<L: Logger>(
pub(crate) async fn process_single_request(
data: Vec<u8>,
call: CallData<'_, L>,
call: CallData<'_>,
) -> Option<CallOrSubscription> {
if let Ok(req) = serde_json::from_slice::<Request<'_>>(&data) {
Some(execute_call_with_tracing(req, call).await)
Expand All @@ -117,84 +110,57 @@ pub(crate) async fn process_single_request<L: Logger>(
}

#[instrument(name = "method_call", fields(method = req.method.as_ref()), skip(call, req), level = "TRACE")]
pub(crate) async fn execute_call_with_tracing<'a, L: Logger>(
pub(crate) async fn execute_call_with_tracing<'a>(
req: Request<'a>,
call: CallData<'_, L>,
call: CallData<'_>,
) -> CallOrSubscription {
execute_call(req, call).await
}

pub(crate) async fn execute_call<L: Logger>(
req: Request<'_>,
call: CallData<'_, L>,
) -> CallOrSubscription {
pub(crate) async fn execute_call(req: Request<'_>, call: CallData<'_>) -> CallOrSubscription {
let CallData {
methods,
max_response_body_size,
max_log_length,
conn_id,
id_provider,
sink,
logger,
request_start,
bounded_subscriptions,
} = call;

rx_log_from_json(&req, call.max_log_length);

let params = Params::new(req.params.map(|params| params.get()));
let params = Params::new(req.params.as_ref().map(|params| params.get()));
let name = &req.method;
let id = req.id;

let response = match methods.method_with_name(name) {
None => {
logger.on_call(
name,
params.clone(),
logger::MethodKind::Unknown,
TransportProtocol::Http,
);
let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound));
CallOrSubscription::Call(response)
}
Some((name, method)) => match method {
Some((_name, method)) => match method {
MethodCallback::Sync(callback) => {
logger.on_call(
name,
params.clone(),
logger::MethodKind::MethodCall,
TransportProtocol::Http,
);
let response = (callback)(id, params, max_response_body_size as usize);
CallOrSubscription::Call(response)
}
MethodCallback::Async(callback) => {
logger.on_call(
name,
params.clone(),
logger::MethodKind::MethodCall,
TransportProtocol::Http,
);
let id = id.into_owned();
let params = params.into_owned();
let response =
(callback)(id, params, conn_id, max_response_body_size as usize).await;
CallOrSubscription::Call(response)
}
MethodCallback::AsyncWithDetails(_callback) => {
unimplemented!()
}
MethodCallback::Subscription(callback) => {
if let Some(p) = bounded_subscriptions.acquire() {
let conn_state =
SubscriptionState { conn_id, id_provider, subscription_permit: p };
match callback(id, params, sink.clone(), conn_state).await {
Ok(r) => CallOrSubscription::Subscription(r),
Err(id) => {
let response = MethodResponse::error(
id,
ErrorObject::from(ErrorCode::InternalError),
);
CallOrSubscription::Call(response)
}
}
let response = callback(id, params, sink.clone(), conn_state).await;
CallOrSubscription::Subscription(response)
} else {
let response = MethodResponse::error(
id,
Expand All @@ -204,13 +170,6 @@ pub(crate) async fn execute_call<L: Logger>(
}
}
MethodCallback::Unsubscription(callback) => {
logger.on_call(
name,
params.clone(),
logger::MethodKind::Unsubscription,
TransportProtocol::WebSocket,
);

// Don't adhere to any resource or subscription limits; always let unsubscribing
// happen!
let result = callback(id, params, conn_id, max_response_body_size as usize);
Expand All @@ -219,48 +178,38 @@ pub(crate) async fn execute_call<L: Logger>(
},
};

tx_log_from_str(&response.as_response().result, max_log_length);
logger.on_result(
name,
response.as_response().success_or_error,
request_start,
TransportProtocol::Http,
);
tx_log_from_str(response.as_response().as_result(), max_log_length);
let _ = request_start;
response
}

#[instrument(name = "notification", fields(method = notif.method.as_ref()), skip(notif, max_log_length), level = "TRACE")]
fn execute_notification(notif: Notif<'_>, max_log_length: u32) -> MethodResponse {
rx_log_from_json(&notif, max_log_length);
fn execute_notification(notif: &Notif<'_>, max_log_length: u32) -> MethodResponse {
rx_log_from_json(notif, max_log_length);
let response =
MethodResponse { result: String::new(), success_or_error: MethodResponseResult::Success };
tx_log_from_str(&response.result, max_log_length);
MethodResponse::response(Id::Null, ResponsePayload::success(String::new()), usize::MAX);
tx_log_from_str(response.as_result(), max_log_length);
response
}

#[allow(dead_code)]
pub(crate) struct HandleRequest<L: Logger> {
pub(crate) struct HandleRequest {
pub(crate) methods: Methods,
pub(crate) max_request_body_size: u32,
pub(crate) max_response_body_size: u32,
pub(crate) max_log_length: u32,
pub(crate) batch_requests_supported: bool,
pub(crate) logger: L,
pub(crate) conn: Arc<OwnedSemaphorePermit>,
pub(crate) bounded_subscriptions: BoundedSubscriptions,
pub(crate) method_sink: MethodSink,
pub(crate) id_provider: Arc<dyn IdProvider>,
}

pub(crate) async fn handle_request<L: Logger>(
request: String,
input: HandleRequest<L>,
) -> Option<String> {
pub(crate) async fn handle_request(request: String, input: HandleRequest) -> Option<String> {
let HandleRequest {
methods,
max_response_body_size,
max_log_length,
logger,
conn,
bounded_subscriptions,
method_sink,
Expand All @@ -282,24 +231,22 @@ pub(crate) async fn handle_request<L: Logger>(
})
.unwrap_or(Kind::Single);

let request_start = logger.on_request(TransportProtocol::Http);

let call = CallData {
conn_id: 0,
logger: &logger,
methods: &methods,
id_provider: &*id_provider,
sink: &method_sink,
max_response_body_size,
max_log_length,
request_start,
request_start: Instant::now(),
bounded_subscriptions,
};

// Single request or notification
let res = if matches!(request_kind, Kind::Single) {
let response = process_single_request(request.into_bytes(), call).await;
match response {
Some(CallOrSubscription::Call(response)) => Some(response.result),
Some(CallOrSubscription::Call(response)) => Some(response.to_result()),
Some(CallOrSubscription::Subscription(_)) => {
// subscription responses are sent directly over the sink, return a response here
// would lead to duplicate responses for the subscription response
Expand Down
Loading
Loading