From eb8c3ebf4c2e85aa5ec2dff3d1306ff1ea82afed Mon Sep 17 00:00:00 2001 From: James Date: Wed, 15 Nov 2023 09:16:22 -0800 Subject: [PATCH 1/4] refactor: RpcError and RpcResult and TransportError and TransportResult --- crates/json-rpc/Cargo.toml | 1 + crates/json-rpc/src/common.rs | 12 + crates/json-rpc/src/error.rs | 67 +++++ crates/json-rpc/src/lib.rs | 15 +- crates/json-rpc/src/response/error.rs | 12 + crates/json-rpc/src/result.rs | 349 ++++---------------------- crates/providers/Cargo.toml | 2 - crates/providers/src/lib.rs | 27 +- crates/providers/src/provider.rs | 84 +++---- crates/pubsub/src/frontend.rs | 18 +- crates/pubsub/src/service.rs | 33 ++- crates/rpc-client/Cargo.toml | 1 + crates/rpc-client/src/batch.rs | 36 ++- crates/rpc-client/src/call.rs | 19 +- crates/rpc-client/tests/it/http.rs | 2 +- crates/transport-http/src/hyper.rs | 9 +- crates/transport-http/src/reqwest.rs | 6 +- crates/transport-ws/src/native.rs | 6 +- crates/transport/src/error.rs | 67 +++-- crates/transport/src/lib.rs | 23 +- crates/transport/src/utils.rs | 23 +- 21 files changed, 308 insertions(+), 504 deletions(-) create mode 100644 crates/json-rpc/src/error.rs diff --git a/crates/json-rpc/Cargo.toml b/crates/json-rpc/Cargo.toml index 936bf7aef53..65837e5b76d 100644 --- a/crates/json-rpc/Cargo.toml +++ b/crates/json-rpc/Cargo.toml @@ -15,3 +15,4 @@ exclude.workspace = true alloy-primitives.workspace = true serde.workspace = true serde_json = { workspace = true, features = ["raw_value"] } +thiserror.workspace = true diff --git a/crates/json-rpc/src/common.rs b/crates/json-rpc/src/common.rs index 4aca16fa6b5..d2a0f47b7b6 100644 --- a/crates/json-rpc/src/common.rs +++ b/crates/json-rpc/src/common.rs @@ -1,3 +1,5 @@ +use std::fmt::Display; + use serde::{de::Visitor, Deserialize, Serialize}; /// A JSON-RPC 2.0 ID object. This may be a number, a string, or null. @@ -31,6 +33,16 @@ pub enum Id { None, } +impl Display for Id { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Id::Number(n) => write!(f, "{}", n), + Id::String(s) => write!(f, "{}", s), + Id::None => write!(f, "null"), + } + } +} + impl Serialize for Id { fn serialize(&self, serializer: S) -> Result { match self { diff --git a/crates/json-rpc/src/error.rs b/crates/json-rpc/src/error.rs new file mode 100644 index 00000000000..8b00e15ba3e --- /dev/null +++ b/crates/json-rpc/src/error.rs @@ -0,0 +1,67 @@ +use serde_json::value::RawValue; + +use crate::{ErrorPayload, RpcReturn}; + +/// An RPC error. +#[derive(thiserror::Error, Debug)] +pub enum RpcError> { + /// Server returned an error response. + #[error("Server returned an error response: {0}")] + ErrorResp(ErrorPayload), + + /// JSON serialization error. + #[error("Serialization error: {0}")] + SerError( + /// The underlying serde_json error. + // To avoid accidentally confusing ser and deser errors, we do not use + // the `#[from]` tag. + #[source] + serde_json::Error, + ), + /// JSON deserialization error. + #[error("Deserialization error: {err}")] + DeserError { + /// The underlying serde_json error. + // To avoid accidentally confusing ser and deser errors, we do not use + // the `#[from]` tag. + #[source] + err: serde_json::Error, + /// For deser errors, the text that failed to deserialize. + text: String, + }, + + /// Transport error. + /// + /// This variant is used when the error occurs during communication. + #[error("Error during transport: {0}")] + Transport( + /// The underlying transport error. + #[from] + E, + ), +} + +impl RpcError +where + ErrResp: RpcReturn, +{ + /// Instantiate a new `TransportError` from an error response. + pub const fn err_resp(err: ErrorPayload) -> Self { + Self::ErrorResp(err) + } +} + +impl RpcError { + /// Instantiate a new `TransportError` from a [`serde_json::Error`]. This + /// should be called when the error occurs during serialization. + pub const fn ser_err(err: serde_json::Error) -> Self { + Self::SerError(err) + } + + /// Instantiate a new `TransportError` from a [`serde_json::Error`] and the + /// text. This should be called when the error occurs during + /// deserialization. + pub fn deser_err(err: serde_json::Error, text: impl AsRef) -> Self { + Self::DeserError { err, text: text.as_ref().to_owned() } + } +} diff --git a/crates/json-rpc/src/lib.rs b/crates/json-rpc/src/lib.rs index 746a2c6f470..54985486012 100644 --- a/crates/json-rpc/src/lib.rs +++ b/crates/json-rpc/src/lib.rs @@ -55,7 +55,11 @@ #![deny(unused_must_use, rust_2018_idioms)] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -use serde::{de::DeserializeOwned, Serialize}; +mod common; +pub use common::Id; + +mod error; +pub use error::RpcError; mod notification; pub use notification::{EthNotification, PubSubItem}; @@ -72,11 +76,12 @@ pub use response::{ ResponsePayload, }; -mod common; -pub use common::Id; - mod result; -pub use result::{BorrowedRpcResult, RpcResult}; +pub use result::{ + transform_response, transform_result, try_deserialize_success, BorrowedRpcResult, RpcResult, +}; + +use serde::{de::DeserializeOwned, Serialize}; /// An object that can be used as a JSON-RPC parameter. /// diff --git a/crates/json-rpc/src/response/error.rs b/crates/json-rpc/src/response/error.rs index d71097892a7..11632f4df6e 100644 --- a/crates/json-rpc/src/response/error.rs +++ b/crates/json-rpc/src/response/error.rs @@ -20,6 +20,18 @@ pub struct ErrorPayload> { pub data: Option, } +impl std::fmt::Display for ErrorPayload { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "ErrorPayload code {}, message: \"{}\", contains payload: {}", + self.code, + self.message, + self.data.is_some() + ) + } +} + /// A [`ErrorPayload`] that has been partially deserialized, borrowing its /// contents from the deserializer. This is used primarily for intermediate /// deserialization. Most users will not require it. diff --git a/crates/json-rpc/src/result.rs b/crates/json-rpc/src/result.rs index 8ee5498335a..93ba6b8dad4 100644 --- a/crates/json-rpc/src/result.rs +++ b/crates/json-rpc/src/result.rs @@ -1,7 +1,8 @@ -use crate::{ErrorPayload, Response, ResponsePayload, RpcReturn}; -use serde::Deserialize; +use std::borrow::Borrow; + +use crate::{Response, ResponsePayload, RpcError, RpcReturn}; + use serde_json::value::RawValue; -use std::{borrow::Borrow, fmt::Debug}; /// The result of a JSON-RPC request. /// @@ -9,316 +10,60 @@ use std::{borrow::Borrow, fmt::Debug}; /// non-response error is intended to be used for errors returned by a /// transport, or serde errors. /// -/// The three cases are -/// - Success: The server returned a succesful response. -/// - Failure: The server returned an error response. -/// - Err: Some client-side or communication error occurred. -#[must_use = "Results must be handled."] -#[derive(Debug)] -pub enum RpcResult { - /// Server returned a succesful response. - Success(T), - /// Server returned an error response. No communication or serialization - /// errors occurred. - Failure(ErrorPayload), - /// Some other error occurred. This could indicate a transport error, a - /// serialization error, or anything else. - Err(E), -} - -/// A [`RpcResult`] that has been partially deserialized, borrowing its -/// contents from the deserializer. This is used primarily for intermediate -/// deserialization. Most users will not require it. -/// -/// See the [top-level docs] for more info. -/// -/// [top-level docs]: crate -pub type BorrowedRpcResult<'a, E> = RpcResult<&'a RawValue, &'a RawValue, E>; - -impl<'a, E> BorrowedRpcResult<'a, E> { - /// Convert this borrowed RpcResult into an owned RpcResult by copying - /// the data from the deserializer (if necessary). - pub fn into_owned(self) -> RpcResult, Box, E> { - match self { - RpcResult::Success(val) => RpcResult::Success(val.to_owned()), - RpcResult::Failure(err) => RpcResult::Failure(err.into_owned()), - RpcResult::Err(err) => RpcResult::Err(err), - } - } -} - -impl RpcResult { - /// `true` if the result is an `Ok` value. - pub const fn is_success(&self) -> bool { - matches!(self, RpcResult::Success(_)) - } - - /// `true` if the result is an `Failure` value. - pub const fn is_failure(&self) -> bool { - matches!(self, RpcResult::Failure(_)) - } - - /// `true` if the result is an `Err` value. - pub const fn is_err(&self) -> bool { - matches!(self, RpcResult::Err(_)) - } - - /// Unwrap the inner value if it is `Ok`, panic otherwise. - pub fn unwrap(self) -> T - where - ErrData: Debug, - E: Debug, - { - match self { - RpcResult::Success(val) => val, - RpcResult::Failure(err) => panic!("Error response: {:?}", err), - RpcResult::Err(err) => panic!("Error: {:?}", err), - } - } - - /// Unwrap the inner value if it is `Failure`, panic otherwise. - pub fn unwrap_failure(self) -> ErrorPayload - where - T: Debug, - E: Debug, - { - match self { - RpcResult::Success(val) => panic!("Ok: {:?}", val), - RpcResult::Failure(err) => err, - RpcResult::Err(err) => panic!("Error: {:?}", err), - } - } - - /// Unwrap the inner value if it is `Err`, panic otherwise. - pub fn unwrap_err(self) -> E - where - T: Debug, - ErrData: Debug, - E: Debug, - { - match self { - RpcResult::Success(val) => panic!("Ok: {:?}", val), - RpcResult::Failure(err) => panic!("Error response: {:?}", err), - RpcResult::Err(err) => err, - } - } - - /// Apply `op` to the inner value if it is `Ok`. - pub fn map(self, op: F) -> RpcResult - where - F: FnOnce(T) -> U, - { - match self { - RpcResult::Success(val) => RpcResult::Success(op(val)), - RpcResult::Failure(err) => RpcResult::Failure(err), - RpcResult::Err(err) => RpcResult::Err(err), - } - } - - /// Calls `op` if the result is `Ok`, otherwise returns the `Err` or - /// `Failure` value of `self` - pub fn and_then(self, op: F) -> RpcResult - where - F: FnOnce(T) -> RpcResult, - { - match self { - RpcResult::Success(val) => op(val), - RpcResult::Failure(err) => RpcResult::Failure(err), - RpcResult::Err(err) => RpcResult::Err(err), - } - } - - /// Apply `op` to the inner value if it is `Err`. - pub fn map_err(self, op: F) -> RpcResult - where - F: FnOnce(E) -> U, - { - match self { - RpcResult::Success(val) => RpcResult::Success(val), - RpcResult::Failure(err) => RpcResult::Failure(err), - RpcResult::Err(err) => RpcResult::Err(op(err)), - } - } - - /// Shortcut for `map_err(Into::into)`. Useful for converting between error - /// types. - pub fn convert_err(self) -> RpcResult - where - U: From, - { - self.map_err(Into::into) - } - - /// Drop the inner value if it is `Ok`, returning `()` instead. Used when - /// we only want success/failure status, and don't care about the response - /// value. - pub fn empty(self) -> RpcResult<(), ErrData, E> { - self.map(|_| ()) - } - - /// Converts from `RpcResult` to `Option`. - #[allow(clippy::missing_const_for_fn)] // erroneous lint - pub fn success(self) -> Option { - match self { - RpcResult::Success(val) => Some(val), - _ => None, - } - } - - /// Converts from `RpcResult` to `Option`. - #[allow(clippy::missing_const_for_fn)] // erroneous lint - pub fn failure(self) -> Option> { - match self { - RpcResult::Failure(err) => Some(err), - _ => None, - } - } - - /// Converts from `RpcResult` to `Option`. - #[allow(clippy::missing_const_for_fn)] // erroneous lint - pub fn err(self) -> Option { - match self { - RpcResult::Err(err) => Some(err), - _ => None, +/// The common cases are: +/// - `Ok(T)` - The server returned a succesful response. +/// - `Err(RpcError::ErrorResponse(ErrResp))` - The server returned an error response. +/// - `Err(RpcError::SerError(E))` - A serialization error occurred. +/// - `Err(RpcError::DeserError { err: E, text: String })` - A deserialization error occurred. +/// - `Err(RpcError::TransportError(E))` - Some client-side or communication error occurred. +pub type RpcResult> = Result>; + +/// A partially deserialized [`RpcResult`], borrowing from the deserializer. +pub type BorrowedRpcResult<'a, E> = RpcResult<&'a RawValue, E, &'a RawValue>; + +/// Transform a transport response into an [`RpcResult`], discarding the [`Id`]. +pub fn transform_response( + response: Response, +) -> Result> +where + ErrResp: RpcReturn, +{ + match response { + Response { payload: ResponsePayload::Failure(err_resp), .. } => { + Err(RpcError::err_resp(err_resp)) } + Response { payload: ResponsePayload::Success(result), .. } => Ok(result), } } -impl RpcResult +/// Transform a transport outcome into an [`RpcResult`], discarding the [`Id`]. +/// +/// [`Id`]: crate::Id +pub fn transform_result( + response: Result, E>, +) -> Result> where - B: Borrow, + ErrResp: RpcReturn, { - /// Deserialize a response, if it is `Success`. - /// - /// # Returns - /// - `None` if the response is not `Success`. - /// - `Some(Ok(Resp))` if the response is `Success` and the `result` field can be deserialized. - /// - `Some(Err(err))` if the response is `Success` and the `result` field can't be - /// deserialized. - pub fn try_success_as<'a, Resp: Deserialize<'a>>(&'a self) -> Option> { - match self { - Self::Success(val) => Some(serde_json::from_str(val.borrow().get())), - _ => None, - } - } - - /// Deserialize the inner value, if it is `Ok`. Pass through other values. - pub fn deserialize_success(self) -> Result, Self> { - match self { - RpcResult::Success(ref ok) => match serde_json::from_str(ok.borrow().get()) { - Ok(val) => Ok(RpcResult::Success(val)), - Err(_) => Err(self), - }, - RpcResult::Failure(err) => Ok(RpcResult::Failure(err)), - RpcResult::Err(err) => Ok(RpcResult::Err(err)), - } - } - - /// Deserialize the inner value, if it is `Ok`. Pass through other values. - /// Transform deser errors with `F`. - #[doc(hidden)] - pub fn try_deserialize_success_or_else(self, f: F) -> RpcResult - where - T: RpcReturn, - F: FnOnce(serde_json::Error, &str) -> E, - { - match self { - RpcResult::Success(val) => { - let text = val.borrow().get(); - match serde_json::from_str(text) { - Ok(val) => RpcResult::Success(val), - Err(e) => RpcResult::Err(f(e, text)), - } - } - RpcResult::Failure(f) => RpcResult::Failure(f), - RpcResult::Err(e) => RpcResult::Err(e), - } + match response { + Ok(resp) => transform_response(resp), + Err(e) => Err(RpcError::Transport(e)), } } -impl RpcResult +/// Attempt to deserialize the `Ok(_)` variant of an [`RpcResult`]. +pub fn try_deserialize_success<'a, J, T, E, ErrResp>( + result: RpcResult, +) -> RpcResult where - B: Borrow, + J: Borrow + 'a, + T: RpcReturn, + ErrResp: RpcReturn, { - /// Deserialize a response, if it is `Failure`. - /// - /// # Returns - /// - `None` if the response is not `Failure` - /// - `Some(Ok(ErrorPayload))` if the response is `Failure` and the `data` field can be - /// deserialized. - /// - `Some(Err(err))` if the response is `Failure` and the `data` field can't be deserialized. - pub fn try_failure_as<'a, ErrData: Deserialize<'a>>( - &'a self, - ) -> Option> { - match self { - RpcResult::Failure(err) => err.try_data_as::(), - _ => None, - } - } + let json = result?; + let text = json.borrow().get(); - /// Deserialize the inner value, if it is `Failure`. Pass through other - /// values. - pub fn deserialize_failure(self) -> Result, Self> { - match self { - RpcResult::Success(ok) => Ok(RpcResult::Success(ok)), - RpcResult::Failure(err_resp) => { - err_resp.deser_data::().map(RpcResult::Failure).map_err(RpcResult::Failure) - } - RpcResult::Err(err) => Ok(RpcResult::Err(err)), - } - } + let val = serde_json::from_str::(text).map_err(|err| RpcError::deser_err(err, text))?; - /// Deserialize the inner value, if it is `Failure`. Pass through other - /// values. Transform deser errors with `F`. - #[doc(hidden)] - pub fn try_deserialize_failure_or_else( - self, - f: F, - ) -> Result, E> - where - ErrData: RpcReturn, - F: FnOnce(serde_json::Error, &str) -> E, - { - match self { - RpcResult::Success(ok) => Ok(RpcResult::Success(ok)), - RpcResult::Failure(err_resp) => match err_resp.try_data_as::() { - None => Ok(RpcResult::Failure(ErrorPayload { - code: err_resp.code, - message: err_resp.message, - data: None, - })), - Some(Ok(data)) => Ok(RpcResult::Failure(ErrorPayload { - code: err_resp.code, - message: err_resp.message, - data: Some(data), - })), - Some(Err(e)) => { - let text = err_resp.data.as_ref().map(|d| d.borrow().get()).unwrap_or(""); - Err(f(e, text)) - } - }, - - RpcResult::Err(err) => Ok(RpcResult::Err(err)), - } - } -} - -impl From> for RpcResult { - fn from(value: Response) -> Self { - match value.payload { - ResponsePayload::Success(res) => Self::Success(res), - ResponsePayload::Failure(e) => Self::Failure(e), - } - } -} - -impl From, E>> - for RpcResult -{ - fn from(value: Result, E>) -> Self { - match value { - Ok(res) => res.into(), - Err(err) => Self::Err(err), - } - } + Ok(val) } diff --git a/crates/providers/Cargo.toml b/crates/providers/Cargo.toml index ac4926b84f7..1fed9b3f1fb 100644 --- a/crates/providers/Cargo.toml +++ b/crates/providers/Cargo.toml @@ -12,7 +12,6 @@ repository.workspace = true exclude.workspace = true [dependencies] -alloy-json-rpc.workspace = true alloy-networks.workspace = true alloy-primitives.workspace = true alloy-rpc-client.workspace = true @@ -23,7 +22,6 @@ alloy-transport.workspace = true async-trait.workspace = true reqwest.workspace = true serde.workspace = true -serde_json = { workspace = true, features = ["raw_value"] } thiserror.workspace = true [dev-dependencies] diff --git a/crates/providers/src/lib.rs b/crates/providers/src/lib.rs index 1baec4e3993..dbd39f52343 100644 --- a/crates/providers/src/lib.rs +++ b/crates/providers/src/lib.rs @@ -16,12 +16,10 @@ #![deny(unused_must_use, rust_2018_idioms)] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -use alloy_json_rpc::RpcResult; use alloy_networks::{Network, Transaction}; use alloy_primitives::Address; use alloy_rpc_client::RpcClient; -use alloy_transport::{BoxTransport, Transport, TransportError}; -use serde_json::value::RawValue; +use alloy_transport::{BoxTransport, Transport, TransportResult}; use std::{borrow::Cow, marker::PhantomData}; mod builder; @@ -93,7 +91,7 @@ pub trait Provider: Send + Sync { async fn estimate_gas( &self, tx: &N::TransactionRequest, - ) -> RpcResult, TransportError> { + ) -> TransportResult { self.inner().estimate_gas(tx).await } @@ -104,24 +102,18 @@ pub trait Provider: Send + Sync { async fn get_transaction_count( &self, address: Address, - ) -> RpcResult, TransportError> { + ) -> TransportResult { self.inner().get_transaction_count(address).await } /// Send a transaction to the network. /// /// The transaction type is defined by the network. - async fn send_transaction( - &self, - tx: &N::TransactionRequest, - ) -> RpcResult, TransportError> { + async fn send_transaction(&self, tx: &N::TransactionRequest) -> TransportResult { self.inner().send_transaction(tx).await } - async fn populate_gas( - &self, - tx: &mut N::TransactionRequest, - ) -> RpcResult<(), Box, TransportError> { + async fn populate_gas(&self, tx: &mut N::TransactionRequest) -> TransportResult<()> { let gas = self.estimate_gas(&*tx).await; gas.map(|gas| tx.set_gas(gas)) @@ -142,14 +134,14 @@ impl Provider for NetworkRpcClient async fn estimate_gas( &self, tx: &::TransactionRequest, - ) -> RpcResult, TransportError> { + ) -> TransportResult { self.prepare("eth_estimateGas", Cow::Borrowed(tx)).await } async fn get_transaction_count( &self, address: Address, - ) -> RpcResult, TransportError> { + ) -> TransportResult { self.prepare( "eth_getTransactionCount", Cow::<(Address, String)>::Owned((address, "latest".to_string())), @@ -157,10 +149,7 @@ impl Provider for NetworkRpcClient .await } - async fn send_transaction( - &self, - tx: &N::TransactionRequest, - ) -> RpcResult, TransportError> { + async fn send_transaction(&self, tx: &N::TransactionRequest) -> TransportResult { self.prepare("eth_sendTransaction", Cow::Borrowed(tx)).await } } diff --git a/crates/providers/src/provider.rs b/crates/providers/src/provider.rs index b373441755c..1ded15b0e92 100644 --- a/crates/providers/src/provider.rs +++ b/crates/providers/src/provider.rs @@ -7,11 +7,10 @@ use alloy_rpc_types::{ Block, BlockId, BlockNumberOrTag, FeeHistory, Filter, Log, RpcBlockHash, SyncStatus, Transaction, TransactionReceipt, TransactionRequest, }; -use alloy_transport::{BoxTransport, RpcResult, Transport, TransportError}; +use alloy_transport::{BoxTransport, Transport, TransportErrorKind, TransportResult}; use alloy_transport_http::Http; use reqwest::Client; use serde::{Deserialize, Serialize}; -use serde_json::value::RawValue; use std::borrow::Cow; use thiserror::Error; @@ -41,7 +40,7 @@ impl Provider { pub async fn get_transaction_count( &self, address: Address, - ) -> RpcResult, TransportError> + ) -> TransportResult where Self: Sync, { @@ -54,7 +53,7 @@ impl Provider { } /// Gets the last block number available. - pub async fn get_block_number(&self) -> RpcResult, TransportError> + pub async fn get_block_number(&self) -> TransportResult where Self: Sync, { @@ -62,11 +61,7 @@ impl Provider { } /// Gets the balance of the account at the specified tag, which defaults to latest. - pub async fn get_balance( - &self, - address: Address, - tag: Option, - ) -> RpcResult, TransportError> + pub async fn get_balance(&self, address: Address, tag: Option) -> TransportResult where Self: Sync, { @@ -86,7 +81,7 @@ impl Provider { &self, hash: BlockHash, full: bool, - ) -> RpcResult, Box, TransportError> + ) -> TransportResult> where Self: Sync, { @@ -100,7 +95,7 @@ impl Provider { &self, number: B, full: bool, - ) -> RpcResult, Box, TransportError> + ) -> TransportResult> where Self: Sync, { @@ -113,7 +108,7 @@ impl Provider { } /// Gets the chain ID. - pub async fn get_chain_id(&self) -> RpcResult, TransportError> + pub async fn get_chain_id(&self) -> TransportResult where Self: Sync, { @@ -124,7 +119,7 @@ impl Provider { &self, address: Address, tag: B, - ) -> RpcResult, TransportError> + ) -> TransportResult where Self: Sync, { @@ -134,10 +129,7 @@ impl Provider { } /// Gets a [Transaction] by its [TxHash]. - pub async fn get_transaction_by_hash( - &self, - hash: TxHash, - ) -> RpcResult, TransportError> + pub async fn get_transaction_by_hash(&self, hash: TxHash) -> TransportResult where Self: Sync, { @@ -152,10 +144,7 @@ impl Provider { } /// Retrieves a [`Vec`] with the given [Filter]. - pub async fn get_logs( - &self, - filter: Filter, - ) -> RpcResult, Box, TransportError> + pub async fn get_logs(&self, filter: Filter) -> TransportResult> where Self: Sync, { @@ -164,7 +153,7 @@ impl Provider { /// Gets the accounts in the remote node. This is usually empty unless you're using a local /// node. - pub async fn get_accounts(&self) -> RpcResult, Box, TransportError> + pub async fn get_accounts(&self) -> TransportResult> where Self: Sync, { @@ -172,7 +161,7 @@ impl Provider { } /// Gets the current gas price. - pub async fn get_gas_price(&self) -> RpcResult, TransportError> + pub async fn get_gas_price(&self) -> TransportResult where Self: Sync, { @@ -183,7 +172,7 @@ impl Provider { pub async fn get_transaction_receipt( &self, hash: TxHash, - ) -> RpcResult, Box, TransportError> + ) -> TransportResult> where Self: Sync, { @@ -197,7 +186,7 @@ impl Provider { block_count: U256, last_block: B, reward_percentiles: &[f64], - ) -> RpcResult, TransportError> + ) -> TransportResult where Self: Sync, { @@ -217,7 +206,7 @@ impl Provider { pub async fn get_block_receipts( &self, block: BlockNumberOrTag, - ) -> RpcResult, Box, TransportError> + ) -> TransportResult> where Self: Sync, { @@ -229,7 +218,7 @@ impl Provider { &self, tag: B, idx: U64, - ) -> RpcResult, Box, TransportError> + ) -> TransportResult> where Self: Sync, { @@ -255,7 +244,7 @@ impl Provider { } /// Gets syncing info. - pub async fn syncing(&self) -> RpcResult, TransportError> + pub async fn syncing(&self) -> TransportResult where Self: Sync, { @@ -267,7 +256,7 @@ impl Provider { &self, tx: TransactionRequest, block: Option, - ) -> RpcResult, TransportError> + ) -> TransportResult where Self: Sync, { @@ -287,7 +276,7 @@ impl Provider { &self, tx: TransactionRequest, block: Option, - ) -> RpcResult, TransportError> + ) -> TransportResult where Self: Sync, { @@ -301,10 +290,7 @@ impl Provider { } /// Sends an already-signed transaction. - pub async fn send_raw_transaction( - &self, - tx: Bytes, - ) -> RpcResult, TransportError> + pub async fn send_raw_transaction(&self, tx: Bytes) -> TransportResult where Self: Sync, { @@ -317,23 +303,22 @@ impl Provider { pub async fn estimate_eip1559_fees( &self, estimator: Option, - ) -> RpcResult<(U256, U256), Box, TransportError> + ) -> TransportResult<(U256, U256)> where Self: Sync, { let base_fee_per_gas = match self.get_block_by_number(BlockNumberOrTag::Latest, false).await { - RpcResult::Success(Some(block)) => match block.header.base_fee_per_gas { + Ok(Some(block)) => match block.header.base_fee_per_gas { Some(base_fee_per_gas) => base_fee_per_gas, - None => { - return RpcResult::Err(TransportError::Custom("EIP-1559 not activated".into())) - } + None => return Err(TransportErrorKind::custom_str("EIP-1559 not activated")), }, - RpcResult::Success(None) => { - return RpcResult::Err(TransportError::Custom("Latest block not found".into())) + + Ok(None) => { + return Err(TransportErrorKind::custom_str("Latest block not found".into())) } - RpcResult::Err(err) => return RpcResult::Err(err), - RpcResult::Failure(err) => return RpcResult::Failure(err), + + Err(err) => return Err(err), }; let fee_history = match self @@ -344,9 +329,8 @@ impl Provider { ) .await { - RpcResult::Success(fee_history) => fee_history, - RpcResult::Err(err) => return RpcResult::Err(err), - RpcResult::Failure(err) => return RpcResult::Failure(err), + Ok(fee_history) => fee_history, + Err(err) => return Err(err), }; // use the provided fee estimator function, or fallback to the default implementation. @@ -359,15 +343,11 @@ impl Provider { ) }; - RpcResult::Success((max_fee_per_gas, max_priority_fee_per_gas)) + Ok((max_fee_per_gas, max_priority_fee_per_gas)) } #[cfg(feature = "anvil")] - pub async fn set_code( - &self, - address: Address, - code: &'static str, - ) -> RpcResult<(), Box, TransportError> + pub async fn set_code(&self, address: Address, code: &'static str) -> TransportResult<()> where Self: Sync, { diff --git a/crates/pubsub/src/frontend.rs b/crates/pubsub/src/frontend.rs index 6d0bf7a1875..c0e9d0b9a0d 100644 --- a/crates/pubsub/src/frontend.rs +++ b/crates/pubsub/src/frontend.rs @@ -1,7 +1,7 @@ use crate::{ix::PubSubInstruction, managers::InFlight}; use alloy_json_rpc::{RequestPacket, Response, ResponsePacket, SerializedRequest}; use alloy_primitives::U256; -use alloy_transport::{TransportError, TransportFut}; +use alloy_transport::{TransportError, TransportErrorKind, TransportFut}; use futures::future::try_join_all; use serde_json::value::RawValue; use std::{future::Future, pin::Pin}; @@ -28,15 +28,17 @@ impl PubSubFrontend { id: U256, ) -> Result>, TransportError> { let (tx, rx) = oneshot::channel(); - self.tx.send(PubSubInstruction::GetSub(id, tx)).map_err(|_| TransportError::BackendGone)?; - rx.await.map_err(|_| TransportError::BackendGone) + self.tx + .send(PubSubInstruction::GetSub(id, tx)) + .map_err(|_| TransportErrorKind::backend_gone())?; + rx.await.map_err(|_| TransportErrorKind::backend_gone()) } /// Unsubscribe from a subscription. pub async fn unsubscribe(&self, id: U256) -> Result<(), TransportError> { self.tx .send(PubSubInstruction::Unsubscribe(id)) - .map_err(|_| TransportError::BackendGone)?; + .map_err(|_| TransportErrorKind::backend_gone())?; Ok(()) } @@ -50,8 +52,8 @@ impl PubSubFrontend { let tx = self.tx.clone(); Box::pin(async move { - tx.send(ix).map_err(|_| TransportError::BackendGone)?; - rx.await.map_err(|_| TransportError::BackendGone)? + tx.send(ix).map_err(|_| TransportErrorKind::backend_gone())?; + rx.await.map_err(|_| TransportErrorKind::backend_gone())? }) } @@ -87,7 +89,7 @@ impl tower::Service for PubSubFrontend { _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { if self.tx.is_closed() { - return std::task::Poll::Ready(Err(TransportError::BackendGone)); + return std::task::Poll::Ready(Err(TransportErrorKind::backend_gone())); } std::task::Poll::Ready(Ok(())) } @@ -109,7 +111,7 @@ impl tower::Service for &PubSubFrontend { _cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { if self.tx.is_closed() { - return std::task::Poll::Ready(Err(TransportError::BackendGone)); + return std::task::Poll::Ready(Err(TransportErrorKind::backend_gone())); } std::task::Poll::Ready(Ok(())) } diff --git a/crates/pubsub/src/service.rs b/crates/pubsub/src/service.rs index e1eba120236..e90629b8f98 100644 --- a/crates/pubsub/src/service.rs +++ b/crates/pubsub/src/service.rs @@ -4,11 +4,12 @@ use crate::{ managers::{InFlight, RequestManager, SubscriptionManager}, PubSubConnect, PubSubFrontend, }; + use alloy_json_rpc::{Id, PubSubItem, Request, RequestMeta, Response, ResponsePayload}; use alloy_primitives::U256; use alloy_transport::{ utils::{to_json_raw_value, Spawnable}, - TransportError, + TransportError, TransportErrorKind, TransportResult, }; use serde_json::value::RawValue; use tokio::sync::{broadcast, mpsc, oneshot}; @@ -62,10 +63,10 @@ where /// Reconnect the backend, re-issue pending requests, and re-start active /// subscriptions. - async fn reconnect(&mut self) -> Result<(), TransportError> { + async fn reconnect(&mut self) -> TransportResult<()> { tracing::info!("Reconnecting pubsub service backend."); - let mut old_handle = self.get_new_backend().await.map_err(TransportError::custom)?; + let mut old_handle = self.get_new_backend().await?; tracing::debug!("Draining old backend to_handle"); @@ -102,12 +103,12 @@ where } /// Dispatch a request to the socket. - fn dispatch_request(&mut self, brv: Box) -> Result<(), TransportError> { - self.handle.to_socket.send(brv).map(drop).map_err(|_| TransportError::BackendGone) + fn dispatch_request(&mut self, brv: Box) -> TransportResult<()> { + self.handle.to_socket.send(brv).map(drop).map_err(|_| TransportErrorKind::backend_gone()) } /// Service a request. - fn service_request(&mut self, in_flight: InFlight) -> Result<(), TransportError> { + fn service_request(&mut self, in_flight: InFlight) -> TransportResult<()> { let brv = in_flight.request(); self.dispatch_request(brv.serialized().to_owned())?; @@ -126,7 +127,7 @@ where &mut self, local_id: U256, tx: oneshot::Sender>>, - ) -> Result<(), TransportError> { + ) -> TransportResult<()> { let local_id = local_id.into(); if let Some(rx) = self.subs.get_rx(local_id) { @@ -137,7 +138,7 @@ where } /// Service an unsubscribe instruction. - fn service_unsubscribe(&mut self, local_id: U256) -> Result<(), TransportError> { + fn service_unsubscribe(&mut self, local_id: U256) -> TransportResult<()> { let local_id = local_id.into(); let req = Request { meta: RequestMeta { id: Id::None, method: "eth_unsubscribe" }, @@ -151,7 +152,7 @@ where } /// Service an instruction - fn service_ix(&mut self, ix: PubSubInstruction) -> Result<(), TransportError> { + fn service_ix(&mut self, ix: PubSubInstruction) -> TransportResult<()> { tracing::trace!(?ix, "servicing instruction"); match ix { PubSubInstruction::Request(in_flight) => self.service_request(in_flight), @@ -161,7 +162,7 @@ where } /// Handle an item from the backend. - fn handle_item(&mut self, item: PubSubItem) -> Result<(), TransportError> { + fn handle_item(&mut self, item: PubSubItem) -> TransportResult<()> { match item { PubSubItem::Response(resp) => match self.in_flights.handle_response(resp) { Some((server_id, in_flight)) => self.handle_sub_response(in_flight, server_id), @@ -175,11 +176,7 @@ where } /// Rewrite the subscription id and insert into the subscriptions manager - fn handle_sub_response( - &mut self, - in_flight: InFlight, - server_id: U256, - ) -> Result<(), TransportError> { + fn handle_sub_response(&mut self, in_flight: InFlight, server_id: U256) -> TransportResult<()> { let request = in_flight.request; let id = request.id().clone(); @@ -201,7 +198,7 @@ where /// Spawn the service. pub(crate) fn spawn(mut self) { let fut = async move { - let result: Result<(), TransportError> = loop { + let result: TransportResult<()> = loop { // We bias the loop so that we always handle new messages before // reconnecting, and always reconnect before dispatching new // requests. @@ -214,14 +211,14 @@ where break Err(e) } } else if let Err(e) = self.reconnect().await { - break Err(TransportError::Custom(Box::new(e))) + break Err(e) } } _ = &mut self.handle.error => { tracing::error!("Pubsub service backend error."); if let Err(e) = self.reconnect().await { - break Err(TransportError::Custom(Box::new(e))) + break Err(e) } } diff --git a/crates/rpc-client/Cargo.toml b/crates/rpc-client/Cargo.toml index cbac62f8111..95ca6b129e7 100644 --- a/crates/rpc-client/Cargo.toml +++ b/crates/rpc-client/Cargo.toml @@ -35,6 +35,7 @@ alloy-primitives.workspace = true alloy-transport-ws.workspace = true test-log = { version = "0.2.13", default-features = false, features = ["trace"] } tracing-subscriber = { version = "0.3.17", features = ["std", "env-filter"] } +tokio.workspace = true [features] default = ["reqwest"] diff --git a/crates/rpc-client/src/batch.rs b/crates/rpc-client/src/batch.rs index b539bfe2f36..9d9d3edf097 100644 --- a/crates/rpc-client/src/batch.rs +++ b/crates/rpc-client/src/batch.rs @@ -1,8 +1,10 @@ use crate::RpcClient; + use alloy_json_rpc::{ - Id, Request, RequestPacket, ResponsePacket, RpcParam, RpcResult, RpcReturn, SerializedRequest, + transform_response, try_deserialize_success, Id, Request, RequestPacket, ResponsePacket, + RpcParam, RpcReturn, SerializedRequest, }; -use alloy_transport::{Transport, TransportError}; +use alloy_transport::{Transport, TransportError, TransportErrorKind, TransportResult}; use futures::channel::oneshot; use serde_json::value::RawValue; use std::{ @@ -14,7 +16,7 @@ use std::{ task::{self, ready, Poll}, }; -pub(crate) type Channel = oneshot::Sender, Box, TransportError>>; +pub(crate) type Channel = oneshot::Sender>>; pub(crate) type ChannelMap = HashMap; /// A batch JSON-RPC request, used to bundle requests into a single transport @@ -36,16 +38,12 @@ pub struct BatchRequest<'a, T> { #[must_use = "A Waiter does nothing unless the corresponding BatchRequest is sent via `send_batch` and `.await`, AND the Waiter is awaited."] #[derive(Debug)] pub struct Waiter { - rx: oneshot::Receiver, Box, TransportError>>, + rx: oneshot::Receiver>>, _resp: PhantomData Resp>, } -impl From, Box, TransportError>>> - for Waiter -{ - fn from( - rx: oneshot::Receiver, Box, TransportError>>, - ) -> Self { +impl From>>> for Waiter { + fn from(rx: oneshot::Receiver>>) -> Self { Self { rx, _resp: PhantomData } } } @@ -54,15 +52,15 @@ impl std::future::Future for Waiter where Resp: RpcReturn, { - type Output = RpcResult, TransportError>; + type Output = TransportResult; fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { let resp = ready!(Pin::new(&mut self.rx).poll(cx)); Poll::Ready(match resp { - Ok(resp) => resp - .try_deserialize_success_or_else(|err, text| TransportError::deser_err(err, text)), - Err(e) => RpcResult::Err(TransportError::Custom(Box::new(e))), + Ok(resp) => try_deserialize_success(resp), + + Err(e) => Err(TransportErrorKind::custom(e)), }) } } @@ -100,7 +98,7 @@ impl<'a, T> BatchRequest<'a, T> { fn push_raw( &mut self, request: SerializedRequest, - ) -> oneshot::Receiver, Box, TransportError>> { + ) -> oneshot::Receiver>> { let (tx, rx) = oneshot::channel(); self.channels.insert(request.id().clone(), tx); self.requests.push(request); @@ -207,13 +205,13 @@ where match responses { ResponsePacket::Single(single) => { if let Some(tx) = channels.remove(&single.id) { - let _ = tx.send(RpcResult::from(single)); + let _ = tx.send(transform_response(single)); } } ResponsePacket::Batch(responses) => { for response in responses.into_iter() { if let Some(tx) = channels.remove(&response.id) { - let _ = tx.send(RpcResult::from(response)); + let _ = tx.send(transform_response(response)); } } } @@ -221,8 +219,8 @@ where // Any channels remaining in the map are missing responses. To avoid // hanging futures, we send an error. - channels.drain().for_each(|(_, tx)| { - let _ = tx.send(RpcResult::Err(TransportError::MissingBatchResponse)); + channels.drain().for_each(|(id, tx)| { + let _ = tx.send(Err(TransportErrorKind::missing_batch_response(id))); }); self.set(BatchFuture::Complete); diff --git a/crates/rpc-client/src/call.rs b/crates/rpc-client/src/call.rs index 6de6b7cbede..925e61d363d 100644 --- a/crates/rpc-client/src/call.rs +++ b/crates/rpc-client/src/call.rs @@ -1,5 +1,8 @@ -use alloy_json_rpc::{Request, RequestPacket, ResponsePacket, RpcParam, RpcResult, RpcReturn}; -use alloy_transport::{RpcFut, Transport, TransportError}; +use alloy_json_rpc::{ + transform_response, try_deserialize_success, Request, RequestPacket, ResponsePacket, RpcParam, + RpcResult, RpcReturn, +}; +use alloy_transport::{RpcFut, Transport, TransportError, TransportResult}; use core::panic; use serde_json::value::RawValue; use std::{ @@ -90,7 +93,7 @@ where }; match task::ready!(fut.poll(cx)) { - Ok(ResponsePacket::Single(res)) => Ready(RpcResult::from(res)), + Ok(ResponsePacket::Single(res)) => Ready(transform_response(res)), Err(e) => Ready(RpcResult::Err(e)), _ => panic!("received batch response from single request"), } @@ -102,7 +105,7 @@ where Conn: Transport + Clone, Params: RpcParam, { - type Output = RpcResult, Box, TransportError>; + type Output = TransportResult>; #[instrument(skip(self, cx))] fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { @@ -190,16 +193,14 @@ where Params: RpcParam, Resp: RpcReturn, { - type Output = RpcResult, TransportError>; + type Output = TransportResult; fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { tracing::trace!(?self.state, "Polling RpcCall"); let this = self.project(); - let resp = task::ready!(this.state.poll(cx)); + let result = task::ready!(this.state.poll(cx)); - Ready( - resp.try_deserialize_success_or_else(|err, text| TransportError::deser_err(err, text)), - ) + Ready(try_deserialize_success(result)) } } diff --git a/crates/rpc-client/tests/it/http.rs b/crates/rpc-client/tests/it/http.rs index b8b8e6b2299..ff1b433878b 100644 --- a/crates/rpc-client/tests/it/http.rs +++ b/crates/rpc-client/tests/it/http.rs @@ -2,7 +2,7 @@ use alloy_primitives::U64; use alloy_rpc_client::{ClientBuilder, RpcCall}; use std::borrow::Cow; -// #[tokio::test] +#[tokio::test] async fn it_makes_a_request() { let infura = std::env::var("HTTP_PROVIDER_URL").unwrap(); diff --git a/crates/transport-http/src/hyper.rs b/crates/transport-http/src/hyper.rs index cd59684b403..742a212cc32 100644 --- a/crates/transport-http/src/hyper.rs +++ b/crates/transport-http/src/hyper.rs @@ -1,6 +1,6 @@ use crate::Http; use alloy_json_rpc::{RequestPacket, ResponsePacket}; -use alloy_transport::{TransportError, TransportFut}; +use alloy_transport::{TransportError, TransportErrorKind, TransportFut}; use hyper::client::{connect::Connect, Client}; use std::task; use tower::Service; @@ -23,11 +23,12 @@ where .body(hyper::Body::from(ser.get().to_owned())) .expect("request parts are valid"); - let resp = this.client.request(req).await.map_err(TransportError::custom)?; + let resp = this.client.request(req).await.map_err(TransportErrorKind::custom)?; // unpack json from the response body - let body = - hyper::body::to_bytes(resp.into_body()).await.map_err(TransportError::custom)?; + let body = hyper::body::to_bytes(resp.into_body()) + .await + .map_err(TransportErrorKind::custom)?; // Deser a Box from the body. If deser fails, return the // body as a string in the error. If the body is not UTF8, this will diff --git a/crates/transport-http/src/reqwest.rs b/crates/transport-http/src/reqwest.rs index 3c33b08c445..a5fbc3b484d 100644 --- a/crates/transport-http/src/reqwest.rs +++ b/crates/transport-http/src/reqwest.rs @@ -1,6 +1,6 @@ use crate::Http; use alloy_json_rpc::{RequestPacket, ResponsePacket}; -use alloy_transport::{TransportError, TransportFut}; +use alloy_transport::{TransportError, TransportErrorKind, TransportFut}; use std::task; use tower::Service; @@ -15,8 +15,8 @@ impl Http { .json(&req) .send() .await - .map_err(TransportError::custom)?; - let json = resp.text().await.map_err(TransportError::custom)?; + .map_err(TransportErrorKind::custom)?; + let json = resp.text().await.map_err(TransportErrorKind::custom)?; serde_json::from_str(&json).map_err(|err| TransportError::deser_err(err, &json)) }) diff --git a/crates/transport-ws/src/native.rs b/crates/transport-ws/src/native.rs index 8b8a7b5858d..6ec9fd16952 100644 --- a/crates/transport-ws/src/native.rs +++ b/crates/transport-ws/src/native.rs @@ -1,6 +1,6 @@ use crate::WsBackend; use alloy_pubsub::PubSubConnect; -use alloy_transport::{utils::Spawnable, Authorization, Pbf, TransportError}; +use alloy_transport::{utils::Spawnable, Authorization, Pbf, TransportError, TransportErrorKind}; use futures::{SinkExt, StreamExt}; use serde_json::value::RawValue; use std::time::Duration; @@ -46,9 +46,9 @@ impl PubSubConnect for WsConnect { let request = self.clone().into_client_request(); Box::pin(async move { - let req = request.map_err(TransportError::custom)?; + let req = request.map_err(TransportErrorKind::custom)?; let (socket, _) = - tokio_tungstenite::connect_async(req).await.map_err(TransportError::custom)?; + tokio_tungstenite::connect_async(req).await.map_err(TransportErrorKind::custom)?; let (handle, interface) = alloy_pubsub::ConnectionHandle::new(); let backend = WsBackend { socket, interface }; diff --git a/crates/transport/src/error.rs b/crates/transport/src/error.rs index e2740f8965f..9a1351eb84b 100644 --- a/crates/transport/src/error.rs +++ b/crates/transport/src/error.rs @@ -1,59 +1,54 @@ +use alloy_json_rpc::{Id, RpcError, RpcResult}; +use serde_json::value::RawValue; use std::{error::Error as StdError, fmt::Debug}; use thiserror::Error; +/// A transport error is an [`RpcError`] containing a [`TransportErrorKind`]. +pub type TransportError = RpcError; + +/// A transport result is a [`Result`] containing a [`TransportError`]. +pub type TransportResult> = RpcResult; + /// Transport error. /// /// All transport errors are wrapped in this enum. #[derive(Error, Debug)] -pub enum TransportError { - /// SerdeJson (de)ser - #[error("{err}")] - SerdeJson { - /// The underlying serde_json error. - #[source] - err: serde_json::Error, - /// For deser errors, the text that failed to deserialize. - text: Option, - }, - - /// Missing batch response - #[error("Missing response in batch request")] - MissingBatchResponse, +pub enum TransportErrorKind { + /// Missing batch response. + /// + /// This error is returned when a batch request is sent and the response + /// does not contain a response for a request. For convenience the ID is + /// specified. + #[error("Missing response for request with ID {0}.")] + MissingBatchResponse(Id), /// PubSub backend connection task has stopped. #[error("PubSub backend connection task has stopped.")] BackendGone, /// Custom error - #[error(transparent)] - Custom(Box), + #[error("{0}")] + Custom(#[source] Box), } -impl TransportError { - /// Instantiate a new `TransportError` from a [`serde_json::Error`]. This - /// should be called when the error occurs during serialization. - pub const fn ser_err(err: serde_json::Error) -> Self { - Self::SerdeJson { err, text: None } +impl TransportErrorKind { + /// Instantiate a new `TransportError` from a custom error. + pub fn custom_str(err: &str) -> TransportError { + RpcError::Transport(Self::Custom(err.into())) } - /// Instantiate a new `TransportError` from a [`serde_json::Error`] and the - /// text. This should be called when the error occurs during - /// deserialization. - pub fn deser_err(err: serde_json::Error, text: impl AsRef) -> Self { - Self::from((err, text)) + /// Instantiate a new `TransportError` from a custom error. + pub fn custom(err: impl StdError + Send + Sync + 'static) -> TransportError { + RpcError::Transport(Self::Custom(Box::new(err))) } - /// Instantiate a new `TransportError` from a custom error. - pub fn custom(err: impl StdError + Send + Sync + 'static) -> Self { - Self::Custom(Box::new(err)) + /// Instantiate a new `TransportError` from a missing ID. + pub const fn missing_batch_response(id: Id) -> TransportError { + RpcError::Transport(Self::MissingBatchResponse(id)) } -} -impl From<(serde_json::Error, T)> for TransportError -where - T: AsRef, -{ - fn from((err, text): (serde_json::Error, T)) -> Self { - Self::SerdeJson { err, text: Some(text.as_ref().to_string()) } + /// Instantiate a new `TransportError::BackendGone`. + pub const fn backend_gone() -> TransportError { + RpcError::Transport(Self::BackendGone) } } diff --git a/crates/transport/src/lib.rs b/crates/transport/src/lib.rs index 9c1c72fb05c..6efacab107a 100644 --- a/crates/transport/src/lib.rs +++ b/crates/transport/src/lib.rs @@ -25,7 +25,9 @@ mod common; pub use common::Authorization; mod error; -pub use error::TransportError; +#[doc(hidden)] +pub use error::TransportErrorKind; +pub use error::{TransportError, TransportResult}; mod r#trait; pub use r#trait::Transport; @@ -39,10 +41,9 @@ pub use type_aliases::*; #[cfg(not(target_arch = "wasm32"))] mod type_aliases { - use alloy_json_rpc::{ResponsePacket, RpcResult}; - use serde_json::value::RawValue; + use alloy_json_rpc::ResponsePacket; - use crate::TransportError; + use crate::{TransportError, TransportResult}; /// Pin-boxed future. pub type Pbf<'a, T, E> = @@ -53,17 +54,15 @@ mod type_aliases { std::pin::Pin> + Send + 'a>>; /// Future for RPC-level requests. - pub type RpcFut<'a, T, E = TransportError> = std::pin::Pin< - Box, E>> + Send + 'a>, - >; + pub type RpcFut<'a, T> = + std::pin::Pin> + Send + 'a>>; } #[cfg(target_arch = "wasm32")] mod type_aliases { - use alloy_json_rpc::{ResponsePacket, RpcResult}; - use serde_json::value::RawValue; + use alloy_json_rpc::ResponsePacket; - use crate::TransportError; + use crate::{TransportError, TransportResult}; /// Pin-boxed future. pub type Pbf<'a, T, E> = @@ -74,6 +73,6 @@ mod type_aliases { std::pin::Pin> + 'a>>; /// Future for RPC-level requests. - pub type RpcFut<'a, T, E = TransportError> = - std::pin::Pin, E>> + 'a>>; + pub type RpcFut<'a, T> = + std::pin::Pin> + Send + 'a>>; } diff --git a/crates/transport/src/utils.rs b/crates/transport/src/utils.rs index 06d3bd5a2ec..a667406c910 100644 --- a/crates/transport/src/utils.rs +++ b/crates/transport/src/utils.rs @@ -1,9 +1,20 @@ -use crate::error::TransportError; use serde::Serialize; use serde_json::{self, value::RawValue}; use std::future::Future; use url::Url; +use crate::{TransportError, TransportResult}; + +/// Convert to a `Box` from a `Serialize` type, mapping the error +/// to a `TransportError`. +pub fn to_json_raw_value(s: &S) -> TransportResult> +where + S: Serialize, +{ + RawValue::from_string(serde_json::to_string(s).map_err(TransportError::ser_err)?) + .map_err(TransportError::ser_err) +} + /// Guess whether the URL is local, based on the hostname. /// /// The ouput of this function is best-efforts, and should be checked if @@ -20,16 +31,6 @@ pub fn guess_local_url(s: impl AsRef) -> bool { _guess_local_url(s.as_ref()) } -/// Convert to a `Box` from a `Serialize` type, mapping the error -/// to a `TransportError`. -pub fn to_json_raw_value(s: &S) -> Result, TransportError> -where - S: Serialize, -{ - RawValue::from_string(serde_json::to_string(s).map_err(TransportError::ser_err)?) - .map_err(TransportError::ser_err) -} - #[doc(hidden)] pub trait Spawnable { /// Spawn the future as a task. From b6b98b1bf7febfc18b02daa4e03a13a3aa319b00 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 15 Nov 2023 09:21:02 -0800 Subject: [PATCH 2/4] fixes: docs and clippy and wasm --- crates/json-rpc/src/lib.rs | 7 +++---- crates/json-rpc/src/result.rs | 5 ++++- crates/providers/src/provider.rs | 4 +--- crates/rpc-client/Cargo.toml | 1 - crates/rpc-client/src/batch.rs | 6 +++--- crates/rpc-client/src/call.rs | 4 ++-- crates/rpc-client/tests/it/http.rs | 2 +- crates/transport-ws/src/wasm.rs | 9 ++++++--- crates/transport/src/lib.rs | 2 +- 9 files changed, 21 insertions(+), 19 deletions(-) diff --git a/crates/json-rpc/src/lib.rs b/crates/json-rpc/src/lib.rs index 54985486012..a0b3558e8fa 100644 --- a/crates/json-rpc/src/lib.rs +++ b/crates/json-rpc/src/lib.rs @@ -35,9 +35,8 @@ //! //! In general, partially deserialized responses can be further deserialized. //! E.g. an [`BorrowedRpcResult`] may have success responses deserialized -//! with [`RpcResult::deserialize_success::`], which will transform it to an -//! [`RpcResult`]. Or the caller may use [`RpcResult::try_success_as::`] -//! to attempt to deserialize without transforming the [`RpcResult`]. +//! with [`crate::try_deserialize_ok::`], which will transform it to an +//! [`RpcResult`]. #![doc( html_logo_url = "https://raw.githubusercontent.com/alloy-rs/core/main/assets/alloy.jpg", @@ -78,7 +77,7 @@ pub use response::{ mod result; pub use result::{ - transform_response, transform_result, try_deserialize_success, BorrowedRpcResult, RpcResult, + transform_response, transform_result, try_deserialize_ok, BorrowedRpcResult, RpcResult, }; use serde::{de::DeserializeOwned, Serialize}; diff --git a/crates/json-rpc/src/result.rs b/crates/json-rpc/src/result.rs index 93ba6b8dad4..4ef5b45dac5 100644 --- a/crates/json-rpc/src/result.rs +++ b/crates/json-rpc/src/result.rs @@ -22,6 +22,9 @@ pub type RpcResult> = Result = RpcResult<&'a RawValue, E, &'a RawValue>; /// Transform a transport response into an [`RpcResult`], discarding the [`Id`]. +/// +/// [`Id`]: crate::Id +#[allow(clippy::missing_const_for_fn)] // false positive pub fn transform_response( response: Response, ) -> Result> @@ -52,7 +55,7 @@ where } /// Attempt to deserialize the `Ok(_)` variant of an [`RpcResult`]. -pub fn try_deserialize_success<'a, J, T, E, ErrResp>( +pub fn try_deserialize_ok<'a, J, T, E, ErrResp>( result: RpcResult, ) -> RpcResult where diff --git a/crates/providers/src/provider.rs b/crates/providers/src/provider.rs index 1ded15b0e92..aac3c9a01e5 100644 --- a/crates/providers/src/provider.rs +++ b/crates/providers/src/provider.rs @@ -314,9 +314,7 @@ impl Provider { None => return Err(TransportErrorKind::custom_str("EIP-1559 not activated")), }, - Ok(None) => { - return Err(TransportErrorKind::custom_str("Latest block not found".into())) - } + Ok(None) => return Err(TransportErrorKind::custom_str("Latest block not found")), Err(err) => return Err(err), }; diff --git a/crates/rpc-client/Cargo.toml b/crates/rpc-client/Cargo.toml index 95ca6b129e7..cbac62f8111 100644 --- a/crates/rpc-client/Cargo.toml +++ b/crates/rpc-client/Cargo.toml @@ -35,7 +35,6 @@ alloy-primitives.workspace = true alloy-transport-ws.workspace = true test-log = { version = "0.2.13", default-features = false, features = ["trace"] } tracing-subscriber = { version = "0.3.17", features = ["std", "env-filter"] } -tokio.workspace = true [features] default = ["reqwest"] diff --git a/crates/rpc-client/src/batch.rs b/crates/rpc-client/src/batch.rs index 9d9d3edf097..b154c81bfa2 100644 --- a/crates/rpc-client/src/batch.rs +++ b/crates/rpc-client/src/batch.rs @@ -1,8 +1,8 @@ use crate::RpcClient; use alloy_json_rpc::{ - transform_response, try_deserialize_success, Id, Request, RequestPacket, ResponsePacket, - RpcParam, RpcReturn, SerializedRequest, + transform_response, try_deserialize_ok, Id, Request, RequestPacket, ResponsePacket, RpcParam, + RpcReturn, SerializedRequest, }; use alloy_transport::{Transport, TransportError, TransportErrorKind, TransportResult}; use futures::channel::oneshot; @@ -58,7 +58,7 @@ where let resp = ready!(Pin::new(&mut self.rx).poll(cx)); Poll::Ready(match resp { - Ok(resp) => try_deserialize_success(resp), + Ok(resp) => try_deserialize_ok(resp), Err(e) => Err(TransportErrorKind::custom(e)), }) diff --git a/crates/rpc-client/src/call.rs b/crates/rpc-client/src/call.rs index 925e61d363d..79f08800716 100644 --- a/crates/rpc-client/src/call.rs +++ b/crates/rpc-client/src/call.rs @@ -1,5 +1,5 @@ use alloy_json_rpc::{ - transform_response, try_deserialize_success, Request, RequestPacket, ResponsePacket, RpcParam, + transform_response, try_deserialize_ok, Request, RequestPacket, ResponsePacket, RpcParam, RpcResult, RpcReturn, }; use alloy_transport::{RpcFut, Transport, TransportError, TransportResult}; @@ -201,6 +201,6 @@ where let result = task::ready!(this.state.poll(cx)); - Ready(try_deserialize_success(result)) + Ready(try_deserialize_ok(result)) } } diff --git a/crates/rpc-client/tests/it/http.rs b/crates/rpc-client/tests/it/http.rs index ff1b433878b..b8b8e6b2299 100644 --- a/crates/rpc-client/tests/it/http.rs +++ b/crates/rpc-client/tests/it/http.rs @@ -2,7 +2,7 @@ use alloy_primitives::U64; use alloy_rpc_client::{ClientBuilder, RpcCall}; use std::borrow::Cow; -#[tokio::test] +// #[tokio::test] async fn it_makes_a_request() { let infura = std::env::var("HTTP_PROVIDER_URL").unwrap(); diff --git a/crates/transport-ws/src/wasm.rs b/crates/transport-ws/src/wasm.rs index 9869d6d79e1..b7ca79ad1ed 100644 --- a/crates/transport-ws/src/wasm.rs +++ b/crates/transport-ws/src/wasm.rs @@ -1,6 +1,6 @@ use super::WsBackend; use alloy_pubsub::PubSubConnect; -use alloy_transport::{utils::Spawnable, Pbf, TransportError}; +use alloy_transport::{utils::Spawnable, Pbf, TransportError, TransportErrorKind}; use futures::{ sink::SinkExt, stream::{Fuse, StreamExt}, @@ -22,8 +22,11 @@ impl PubSubConnect for WsConnect { fn connect<'a: 'b, 'b>(&'a self) -> Pbf<'b, alloy_pubsub::ConnectionHandle, TransportError> { Box::pin(async move { - let socket = - WsMeta::connect(&self.url, None).await.map_err(TransportError::custom)?.1.fuse(); + let socket = WsMeta::connect(&self.url, None) + .await + .map_err(TransportErrorKind::custom)? + .1 + .fuse(); let (handle, interface) = alloy_pubsub::ConnectionHandle::new(); let backend = WsBackend { socket, interface }; diff --git a/crates/transport/src/lib.rs b/crates/transport/src/lib.rs index 6efacab107a..89e9211961b 100644 --- a/crates/transport/src/lib.rs +++ b/crates/transport/src/lib.rs @@ -74,5 +74,5 @@ mod type_aliases { /// Future for RPC-level requests. pub type RpcFut<'a, T> = - std::pin::Pin> + Send + 'a>>; + std::pin::Pin> + 'a>>; } From c129252dced5c8f46a84c6b0ff06dab33e3d2c25 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 15 Nov 2023 09:23:21 -0800 Subject: [PATCH 3/4] nit: propagate errresp through transporterror --- crates/transport/src/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/transport/src/error.rs b/crates/transport/src/error.rs index 9a1351eb84b..b7ecb329705 100644 --- a/crates/transport/src/error.rs +++ b/crates/transport/src/error.rs @@ -4,7 +4,7 @@ use std::{error::Error as StdError, fmt::Debug}; use thiserror::Error; /// A transport error is an [`RpcError`] containing a [`TransportErrorKind`]. -pub type TransportError = RpcError; +pub type TransportError> = RpcError; /// A transport result is a [`Result`] containing a [`TransportError`]. pub type TransportResult> = RpcResult; From 1c8e15fe57453bd42277870e1c708feb5fa3452d Mon Sep 17 00:00:00 2001 From: James Date: Wed, 15 Nov 2023 09:40:54 -0800 Subject: [PATCH 4/4] docs: more for json-rpc --- crates/json-rpc/src/error.rs | 28 ++++++++++++++++++++++++++++ crates/json-rpc/src/lib.rs | 28 ++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+) diff --git a/crates/json-rpc/src/error.rs b/crates/json-rpc/src/error.rs index 8b00e15ba3e..f72122b668d 100644 --- a/crates/json-rpc/src/error.rs +++ b/crates/json-rpc/src/error.rs @@ -64,4 +64,32 @@ impl RpcError { pub fn deser_err(err: serde_json::Error, text: impl AsRef) -> Self { Self::DeserError { err, text: text.as_ref().to_owned() } } + + /// Check if the error is a serialization error. + pub const fn is_ser_error(&self) -> bool { + matches!(self, Self::SerError(_)) + } + + /// Check if the error is a deserialization error. + pub const fn is_deser_error(&self) -> bool { + matches!(self, Self::DeserError { .. }) + } + + /// Check if the error is a transport error. + pub const fn is_transport_error(&self) -> bool { + matches!(self, Self::Transport(_)) + } + + /// Check if the error is an error response. + pub const fn is_error_resp(&self) -> bool { + matches!(self, Self::ErrorResp(_)) + } + + /// Fallible conversion to an error response. + pub const fn as_error_resp(&self) -> Option<&ErrorPayload> { + match self { + Self::ErrorResp(err) => Some(err), + _ => None, + } + } } diff --git a/crates/json-rpc/src/lib.rs b/crates/json-rpc/src/lib.rs index a0b3558e8fa..21eb3255c98 100644 --- a/crates/json-rpc/src/lib.rs +++ b/crates/json-rpc/src/lib.rs @@ -11,6 +11,34 @@ //! //! [`alloy-transports`]: https://docs.rs/alloy-transports/latest/alloy-transports //! +//! ## Usage +//! +//! This crate models the JSON-RPC 2.0 protocol data-types. It is intended to +//! be used to build JSON-RPC clients or servers. Most users will not need to +//! import this crate. +//! +//! This crate provides the following low-level data types: +//! +//! - [`Request`] - A JSON-RPC request. +//! - [`Response`] - A JSON-RPC response. +//! - [`ErrorPayload`] - A JSON-RPC error response payload, including code and message. +//! - [`ResponsePayload`] - The payload of a JSON-RPC response, either a success payload, or an +//! [`ErrorPayload`]. +//! +//! For client-side Rust ergonomics, we want to map responses to [`Result`]s. +//! To that end, we provide the following types: +//! +//! - [`RpcError`] - An error that can occur during JSON-RPC communication. This type aggregates +//! errors that are common to all transports, such as (de)serialization, error responses, and +//! includes a generic transport error. +//! - [`RpcResult`] - A result modeling an Rpc outcome as `Result>`. +//! +//! We recommend that transport implementors use [`RpcResult`] as the return +//! type for their transport methods, parameterized by their transport error +//! type. This will allow them to return either a successful response or an +//! error. +//! //! ## Note On (De)Serialization //! //! [`Request`], [`Response`], and similar types are generic over the