diff --git a/Cargo.toml b/Cargo.toml index 91887436ad2..1bc05796c52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,12 +8,35 @@ edition = "2021" rust-version = "1.65" authors = ["Alloy Contributors"] license = "MIT OR Apache-2.0" -homepage = "https://github.com/ethers-rs/next" -repository = "https://github.com/ethers-rs/next" +homepage = "https://github.com/alloy-rs/next" +repository = "https://github.com/alloy-rs/next" exclude = ["benches/", "tests/"] [workspace.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] -[workspace.dependencies] \ No newline at end of file +[workspace.dependencies] + +alloy-json-rpc = { path = "crates/json-rpc" } +alloy-transports = { path = "crates/transports" } +alloy-networks = { path = "crates/networks" } + +alloy-primitives = { version = "0.2.0", features = ["serde"] } +alloy-rlp = "0.3.0" + +# futures +futures-channel = "0.3" +futures-util = "0.3" + +# serde +serde = { version = "1.0", default-features = false, features = ["alloc"] } +serde_json = { version = "1.0", default-features = false, features = ["alloc"] } + +# thiserror +thiserror = "1.0" + +# transports +url = "2.4.0" +pin-project = "1.1.2" +tower = { version = "0.4.13", features = ["util"] } diff --git a/README.md b/README.md index a8e5ccfaac9..099098b390c 100644 --- a/README.md +++ b/README.md @@ -1 +1,12 @@ # alloy-next + +### Layout + +- alloy-json-rpc + - Core data types for JSON-RPC 2.0 +- alloy-transports + - Transports and RPC call futures. +- alloy-networks + - Network abstraction for RPC types. Allows capturing different RPC param and response types on a per-network basis. +- alloy-provider + - Based on ethers::middleware::Middleware, but abstract over , and object-safe. diff --git a/crates/json-rpc/Cargo.toml b/crates/json-rpc/Cargo.toml new file mode 100644 index 00000000000..9999aefb305 --- /dev/null +++ b/crates/json-rpc/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "alloy-json-rpc" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +exclude.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde = { workspace = true, features = ["derive"] } +serde_json = { version = "1.0.103", features = ["raw_value"] } diff --git a/crates/json-rpc/src/common.rs b/crates/json-rpc/src/common.rs new file mode 100644 index 00000000000..b1892c9be36 --- /dev/null +++ b/crates/json-rpc/src/common.rs @@ -0,0 +1,43 @@ +use serde::{Deserialize, Serialize}; + +/// A JSON-RPC 2.0 ID object. This may be a number, string, or null. +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Hash)] +#[serde(untagged)] +pub enum Id { + Number(u64), + String(String), + None, +} + +impl Id { + /// Returns `true` if the ID is a number. + pub fn is_number(&self) -> bool { + matches!(self, Id::Number(_)) + } + + /// Returns `true` if the ID is a string. + pub fn is_string(&self) -> bool { + matches!(self, Id::String(_)) + } + + /// Returns `true` if the ID is `None`. + pub fn is_none(&self) -> bool { + matches!(self, Id::None) + } + + /// Returns the ID as a number, if it is one. + pub fn as_number(&self) -> Option { + match self { + Id::Number(n) => Some(*n), + _ => None, + } + } + + /// Returns the ID as a string, if it is one. + pub fn as_string(&self) -> Option<&str> { + match self { + Id::String(s) => Some(s), + _ => None, + } + } +} diff --git a/crates/json-rpc/src/lib.rs b/crates/json-rpc/src/lib.rs new file mode 100644 index 00000000000..4dd4d843dc8 --- /dev/null +++ b/crates/json-rpc/src/lib.rs @@ -0,0 +1,38 @@ +//! Alloy JSON-RPC data types. +//! +//! This crate provides data types for use with the JSON-RPC 2.0 protocol. It +//! does not provide any functionality for actually sending or receiving +//! JSON-RPC data. +//! +//! This crate is aimed at simplifying client implementations. It is not +//! well-suited to in-server applications. We do not support borrowing data from +//! deserializers, for example. This choice prevents complex lifetime +//! propagation in user code, at the expense of copying data + +use serde::{de::DeserializeOwned, Serialize}; + +mod request; +pub use request::JsonRpcRequest; + +mod response; +pub use response::{ErrorPayload, JsonRpcResponse, ResponsePayload}; + +mod common; +pub use common::Id; + +mod result; +pub use result::RpcResult; + +/// An object that can be used as a JSON-RPC parameter. +pub trait RpcParam: Serialize + Clone + Send + Sync + Unpin {} +impl RpcParam for T where T: Serialize + Clone + Send + Sync + Unpin {} + +/// An object that can be used as a JSON-RPC return value. +// Note: we add `'static` here to indicate that the Resp is wholly owned. It +// may not borrow. +pub trait RpcReturn: DeserializeOwned + Send + Sync + Unpin + 'static {} +impl RpcReturn for T where T: DeserializeOwned + Send + Sync + Unpin + 'static {} + +/// An object that can be used as a JSON-RPC parameter and return value. +pub trait RpcObject: RpcParam + RpcReturn {} +impl RpcObject for T where T: RpcParam + RpcReturn {} diff --git a/crates/json-rpc/src/request.rs b/crates/json-rpc/src/request.rs new file mode 100644 index 00000000000..edbaf9641b2 --- /dev/null +++ b/crates/json-rpc/src/request.rs @@ -0,0 +1,36 @@ +use crate::{common::Id, RpcParam}; + +use serde::{ser::SerializeMap, Deserialize, Serialize}; + +/// A JSON-RPC 2.0 request object. +/// +/// This is a generic type that can be used to represent any JSON-RPC request. +/// The `Params` type parameter is used to represent the parameters of the +/// request, and the `method` field is used to represent the method name. +/// +/// ### Note +/// +/// The value of `method` must be known at compile time. +#[derive(Debug, Deserialize, Clone)] +pub struct JsonRpcRequest { + pub method: &'static str, + pub params: Params, + pub id: Id, +} + +impl Serialize for JsonRpcRequest +where + Params: RpcParam, +{ + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut map = serializer.serialize_map(Some(4))?; + map.serialize_entry("method", self.method)?; + map.serialize_entry("params", &self.params)?; + map.serialize_entry("id", &self.id)?; + map.serialize_entry("jsonrpc", "2.0")?; + map.end() + } +} diff --git a/crates/json-rpc/src/response.rs b/crates/json-rpc/src/response.rs new file mode 100644 index 00000000000..dacd9f2a0eb --- /dev/null +++ b/crates/json-rpc/src/response.rs @@ -0,0 +1,226 @@ +use std::fmt; + +use serde::{ + de::{MapAccess, Visitor}, + Deserialize, Deserializer, Serialize, +}; +use serde_json::value::RawValue; + +use crate::common::Id; + +/// A JSONRPC-2.0 error object. +/// +/// This response indicates that the server received and handled the request, +/// but that there was an error in the processing of it. The error should be +/// included in the `message` field of the response. +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ErrorPayload { + pub code: i64, + pub message: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub data: Option>, +} + +/// A JSONRPC-2.0 response payload. +/// +/// This enum covers both the success and error cases of a JSONRPC-2.0 +/// response. +#[derive(Debug, Clone)] +pub enum ResponsePayload { + Success(Box), + Error(ErrorPayload), +} + +/// A JSONRPC-2.0 response object. +pub struct JsonRpcResponse { + pub id: Id, + pub payload: ResponsePayload, +} + +impl<'de> Deserialize<'de> for JsonRpcResponse { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + enum Field { + Result, + Error, + Id, + Unknown, + } + + impl<'de> Deserialize<'de> for Field { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct FieldVisitor; + + impl<'de> serde::de::Visitor<'de> for FieldVisitor { + type Value = Field; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("`result`, `error` and `id`") + } + + fn visit_str(self, value: &str) -> Result + where + E: serde::de::Error, + { + match value { + "result" => Ok(Field::Result), + "error" => Ok(Field::Error), + "id" => Ok(Field::Id), + _ => Ok(Field::Unknown), + } + } + } + deserializer.deserialize_identifier(FieldVisitor) + } + } + + struct JsonRpcResponseVisitor; + + impl<'de> Visitor<'de> for JsonRpcResponseVisitor { + type Value = JsonRpcResponse; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str( + "a JSON-RPC response object, consisting of either a result or an error", + ) + } + + fn visit_map(self, mut map: M) -> Result + where + M: MapAccess<'de>, + { + let mut result = None; + let mut error = None; + let mut id: Option = None; + + while let Some(key) = map.next_key()? { + match key { + Field::Result => { + if result.is_some() { + return Err(serde::de::Error::duplicate_field("result")); + } + result = Some(map.next_value()?); + } + Field::Error => { + if error.is_some() { + return Err(serde::de::Error::duplicate_field("error")); + } + error = Some(map.next_value()?); + } + Field::Id => { + if id.is_some() { + return Err(serde::de::Error::duplicate_field("id")); + } + id = Some(map.next_value()?); + } + Field::Unknown => { + let _: serde::de::IgnoredAny = map.next_value()?; // ignore + } + } + } + let id = id.unwrap_or(Id::None); + + match (result, error) { + (Some(result), None) => Ok(JsonRpcResponse { + id, + payload: ResponsePayload::Success(result), + }), + (None, Some(error)) => Ok(JsonRpcResponse { + id, + payload: ResponsePayload::Error(error), + }), + (None, None) => Err(serde::de::Error::missing_field("result or error")), + (Some(_), Some(_)) => Err(serde::de::Error::custom( + "result and error are mutually exclusive", + )), + } + } + } + + deserializer.deserialize_map(JsonRpcResponseVisitor) + } +} + +#[cfg(test)] +mod test { + #[test] + pub fn deser_success() { + let response = r#"{ + "jsonrpc": "2.0", + "result": "california", + "id": 1 + }"#; + let response: super::JsonRpcResponse = serde_json::from_str(response).unwrap(); + assert_eq!(response.id, super::Id::Number(1)); + assert!(matches!( + response.payload, + super::ResponsePayload::Success(_) + )); + } + + #[test] + pub fn deser_err() { + let response = r#"{ + "jsonrpc": "2.0", + "error": { + "code": -32600, + "message": "Invalid Request" + }, + "id": null + }"#; + let response: super::JsonRpcResponse = serde_json::from_str(response).unwrap(); + assert_eq!(response.id, super::Id::None); + assert!(matches!(response.payload, super::ResponsePayload::Error(_))); + } + + #[test] + pub fn deser_complex_success() { + let response = r#"{ + "result": { + "name": "california", + "population": 39250000, + "cities": [ + "los angeles", + "san francisco" + ] + } + }"#; + let response: super::JsonRpcResponse = serde_json::from_str(response).unwrap(); + assert_eq!(response.id, super::Id::None); + assert!(matches!( + response.payload, + super::ResponsePayload::Success(_) + )); + } +} + +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. diff --git a/crates/json-rpc/src/result.rs b/crates/json-rpc/src/result.rs new file mode 100644 index 00000000000..b7f1ba76b94 --- /dev/null +++ b/crates/json-rpc/src/result.rs @@ -0,0 +1,206 @@ +use std::fmt::Debug; + +use serde_json::value::RawValue; + +use crate::{response::ErrorPayload, JsonRpcResponse, ResponsePayload, RpcReturn}; + +/// The result of a JSON-RPC request. +/// +/// Either a success response, an error response, or a non-response error. The +/// non-response error is intended to be used for errors returned by a +/// transport, or serde errors. +/// +/// The three cases +#[must_use = "Results must be handled."] +#[derive(Debug)] +pub enum RpcResult { + /// Server returned a response. + Ok(T), + /// Server returned an error response. No communication or serialization + /// errors occurred. + ErrResp(ErrorPayload), + /// Some other error occurred. This could indicate a transport error, a + /// serde error, or anything else. + Err(E), +} + +impl RpcResult { + /// `true` if the result is an `Ok` value. + pub fn is_ok(&self) -> bool { + matches!(self, RpcResult::Ok(_)) + } + + /// `true` if the result is an `ErrResp` value. + pub fn is_err_resp(&self) -> bool { + matches!(self, RpcResult::ErrResp(_)) + } + + /// `true` if the result is an `Err` value. + pub fn is_err(&self) -> bool { + matches!(self, RpcResult::Err(_)) + } + + /// Unwrap the inner value if it is `Ok`, panic otherwise. + pub fn unwrap(self) -> T + where + E: Debug, + { + match self { + RpcResult::Ok(val) => val, + RpcResult::ErrResp(err) => panic!("Error response: {:?}", err), + RpcResult::Err(err) => panic!("Error: {:?}", err), + } + } + + /// Unwrap the inner value if it is `ErrResp`, panic otherwise. + pub fn unwrap_err_resp(self) -> ErrorPayload + where + T: Debug, + E: Debug, + { + match self { + RpcResult::Ok(val) => panic!("Ok: {:?}", val), + RpcResult::ErrResp(err) => err, + RpcResult::Err(err) => panic!("Error: {:?}", err), + } + } + + /// Unwrap the inner value if it is `Err`, panic otherwise. + pub fn unwrap_err(self) -> E + where + T: Debug, + E: Debug, + { + match self { + RpcResult::Ok(val) => panic!("Ok: {:?}", val), + RpcResult::ErrResp(err) => panic!("Error response: {:?}", err), + RpcResult::Err(err) => err, + } + } + + /// Apply `op` to the inner value if it is `Ok`. + pub fn map(self, op: F) -> RpcResult + where + F: FnOnce(T) -> U, + { + match self { + RpcResult::Ok(val) => RpcResult::Ok(op(val)), + RpcResult::ErrResp(err) => RpcResult::ErrResp(err), + RpcResult::Err(err) => RpcResult::Err(err), + } + } + + /// Calls `op` if the result is `Ok`, otherwise returns the `Err` or + /// `ErrResp` value of `self` + pub fn and_then(self, op: F) -> RpcResult + where + F: FnOnce(T) -> RpcResult, + { + match self { + RpcResult::Ok(val) => op(val), + RpcResult::ErrResp(err) => RpcResult::ErrResp(err), + RpcResult::Err(err) => RpcResult::Err(err), + } + } + + /// Apply `op` to the inner value if it is `Err`. + pub fn map_err(self, op: F) -> RpcResult + where + F: FnOnce(E) -> U, + { + match self { + RpcResult::Ok(val) => RpcResult::Ok(val), + RpcResult::ErrResp(err) => RpcResult::ErrResp(err), + RpcResult::Err(err) => RpcResult::Err(op(err)), + } + } + + /// Shortcut for `map_err(Into::into)`. Useful for converting between error + /// types. + pub fn convert_err(self) -> RpcResult + where + U: From, + { + self.map_err(Into::into) + } + + /// Drop the inner value if it is `Ok`, returning `()` instead. Used when + /// we only want success/failure status, and don't care about the response + /// value. + pub fn empty(self) -> RpcResult<(), E> { + self.map(|_| ()) + } + + /// Converts from `RpcResult` to `Option`. + pub fn ok(self) -> Option { + match self { + RpcResult::Ok(val) => Some(val), + _ => None, + } + } + + /// Converts from `RpcResult` to `Option`. + pub fn err_resp(self) -> Option { + match self { + RpcResult::ErrResp(err) => Some(err), + _ => None, + } + } + + /// Converts from `RpcResult` to `Option`. + pub fn err(self) -> Option { + match self { + RpcResult::Err(err) => Some(err), + _ => None, + } + } +} + +impl RpcResult, E> { + pub fn deser_ok(self) -> RpcResult + where + E: From, + { + match self { + RpcResult::Ok(val) => match serde_json::from_str(val.get()) { + Ok(val) => RpcResult::Ok(val), + Err(err) => RpcResult::Err(err.into()), + }, + Self::ErrResp(er) => RpcResult::ErrResp(er), + Self::Err(e) => RpcResult::Err(e), + } + } + + #[doc(hidden)] + pub fn deser_ok_or_else(self, f: F) -> RpcResult + where + F: FnOnce(serde_json::Error, &str) -> E, + { + match self { + RpcResult::Ok(val) => match serde_json::from_str(val.get()) { + Ok(val) => RpcResult::Ok(val), + Err(err) => RpcResult::Err(f(err, val.get())), + }, + Self::ErrResp(er) => RpcResult::ErrResp(er), + Self::Err(e) => RpcResult::Err(e), + } + } +} + +impl From for RpcResult, E> { + fn from(value: JsonRpcResponse) -> Self { + match value.payload { + ResponsePayload::Success(res) => Self::Ok(res), + ResponsePayload::Error(e) => Self::ErrResp(e), + } + } +} + +impl From> for RpcResult, E> { + fn from(value: Result) -> Self { + match value { + Ok(res) => res.into(), + Err(err) => Self::Err(err), + } + } +} diff --git a/crates/networks/Cargo.toml b/crates/networks/Cargo.toml new file mode 100644 index 00000000000..70982c2425a --- /dev/null +++ b/crates/networks/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "alloy-networks" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +exclude.workspace = true +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +alloy-json-rpc.workspace = true +alloy-primitives.workspace = true +alloy-rlp.workspace = true +alloy-transports.workspace = true +pin-project = "1.1.2" +tower = "0.4.13" diff --git a/crates/networks/src/lib.rs b/crates/networks/src/lib.rs new file mode 100644 index 00000000000..ee40b45cf8c --- /dev/null +++ b/crates/networks/src/lib.rs @@ -0,0 +1,33 @@ +use alloy_json_rpc::RpcObject; + +/// Captures type info for network-specific RPC requests/responses. +pub trait Network: Sized + Send + Sync + 'static { + #[doc(hidden)] + /// Asserts that this trait can only be implemented on a ZST. + const __ASSERT_ZST: () = { + assert!(std::mem::size_of::() == 0, "Network must be a ZST"); + }; + + /// The JSON body of a transaction request. + type TransactionRequest: Transaction; + + /// The JSON body of a transaction receipt. + type Receipt: Receipt; + + /// The JSON body of a transaction response. + type TransactionResponse: Transaction; +} + +/// Captures getters and setters common across transactions and +/// transaction-like objects across all networks. +pub trait Transaction: + alloy_rlp::Encodable + alloy_rlp::Decodable + RpcObject + Clone + Sized + 'static +{ + fn set_gas(&mut self, gas: alloy_primitives::U256); +} + +/// Captures getters and setters common across EIP-1559 transactions across all networks +pub trait Eip1559Transaction: Transaction {} + +/// Captures getters and setters common across receipts across all networks +pub trait Receipt: RpcObject + 'static {} diff --git a/crates/provider/Cargo.toml b/crates/provider/Cargo.toml new file mode 100644 index 00000000000..914a30389e4 --- /dev/null +++ b/crates/provider/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "alloy-middleware" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +exclude.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +alloy-json-rpc.workspace = true +alloy-networks.workspace = true +alloy-primitives.workspace = true +alloy-transports.workspace = true +futures-util = "0.3.28" diff --git a/crates/provider/src/builder.rs b/crates/provider/src/builder.rs new file mode 100644 index 00000000000..ecb368e4985 --- /dev/null +++ b/crates/provider/src/builder.rs @@ -0,0 +1,112 @@ +use std::marker::PhantomData; + +use alloy_networks::Network; +use alloy_transports::Transport; + +use crate::Provider; + +/// A layering abstraction in the vein of [`tower::Layer`] +/// +/// [`tower::Layer`]: https://docs.rs/tower/latest/tower/trait.Layer.html +pub trait ProviderLayer, N: Network, T: Transport> { + type Provider: Provider; + + fn layer(&self, inner: P) -> Self::Provider; +} + +pub struct Stack { + inner: Inner, + outer: Outer, + _pd: std::marker::PhantomData T>, +} + +impl Stack { + /// Create a new `Stack`. + pub fn new(inner: Inner, outer: Outer) -> Self { + Stack { + inner, + outer, + _pd: std::marker::PhantomData, + } + } +} + +impl ProviderLayer for Stack +where + T: Transport, + N: Network, + P: Provider, + Inner: ProviderLayer, + Outer: ProviderLayer, +{ + type Provider = Outer::Provider; + + fn layer(&self, provider: P) -> Self::Provider { + let inner = self.inner.layer(provider); + + self.outer.layer(inner) + } +} + +/// A builder for constructing a [`Provider`] from various layers. +pub struct ProviderBuilder { + layer: L, + + transport: PhantomData, + network: PhantomData, +} + +impl ProviderBuilder { + pub fn layer(self, layer: Inner) -> ProviderBuilder> { + ProviderBuilder { + layer: Stack::new(layer, self.layer), + transport: PhantomData, + network: PhantomData, + } + } + + /// Change the network. + pub fn network(self) -> ProviderBuilder { + ProviderBuilder { + layer: self.layer, + transport: self.transport, + network: PhantomData, + } + } + + pub fn provider

(self, provider: P) -> L::Provider + where + L: ProviderLayer, + P: Provider, + T: Transport, + N: Network, + { + self.layer.layer(provider) + } +} + +// Copyright (c) 2019 Tower Contributors + +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: + +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. diff --git a/crates/provider/src/lib.rs b/crates/provider/src/lib.rs new file mode 100644 index 00000000000..3d8bb9e02b6 --- /dev/null +++ b/crates/provider/src/lib.rs @@ -0,0 +1,119 @@ +mod builder; +pub use builder::{ProviderBuilder, ProviderLayer, Stack}; + +use alloy_json_rpc::RpcResult; +use alloy_networks::{Network, Transaction}; +use alloy_primitives::Address; +use alloy_transports::{BoxTransport, RpcClient, Transport, TransportError}; + +use std::{borrow::Cow, future::Future, pin::Pin}; + +pub type MwareFut<'a, T, E> = Pin> + Send + 'a>>; + +/// Middleware is parameterized with a network and a transport. The default +/// transport is type-erased, but you can do `Middleware`. +pub trait Provider: Send + Sync { + fn client(&self) -> &RpcClient; + + /// Return a reference to the inner Middleware. + /// + /// Middleware are object safe now :) + fn inner(&self) -> &dyn Provider; + + fn estimate_gas<'s: 'fut, 'a: 'fut, 'fut>( + &'s self, + tx: &'a N::TransactionRequest, + ) -> MwareFut<'fut, alloy_primitives::U256, TransportError> + where + Self: Sync + 'fut, + { + self.inner().estimate_gas(tx) + } + + /// Get the transaction count for an address. Used for finding the + /// appropriate nonce. + /// + /// TODO: block number/hash/tag + fn get_transaction_count<'s: 'fut, 'a: 'fut, 'fut>( + &'s self, + address: Address, + ) -> MwareFut<'fut, alloy_primitives::U256, TransportError> + where + Self: Sync + 'fut, + { + self.inner().get_transaction_count(address) + } + + /// Send a transaction to the network. + /// + /// The transaction type is defined by the network. + fn send_transaction<'s: 'fut, 'a: 'fut, 'fut>( + &'s self, + tx: &'a N::TransactionRequest, + ) -> MwareFut<'fut, N::Receipt, TransportError> { + self.inner().send_transaction(tx) + } + + fn populate_gas<'s: 'fut, 'a: 'fut, 'fut>( + &'s self, + tx: &'a mut N::TransactionRequest, + ) -> MwareFut<'fut, (), TransportError> + where + Self: Sync, + { + Box::pin(async move { + let gas = self.estimate_gas(&*tx).await; + + gas.map(|gas| tx.set_gas(gas)) + }) + } +} + +impl Provider for RpcClient { + fn client(&self) -> &RpcClient { + self + } + + fn inner(&self) -> &dyn Provider { + panic!("called inner on ") + } + + fn estimate_gas<'s: 'fut, 'a: 'fut, 'fut>( + &'s self, + tx: &'a ::TransactionRequest, + ) -> MwareFut<'fut, alloy_primitives::U256, TransportError> { + self.prepare("eth_estimateGas", Cow::Borrowed(tx)).boxed() + } + + fn get_transaction_count<'s: 'fut, 'a: 'fut, 'fut>( + &'s self, + address: Address, + ) -> MwareFut<'fut, alloy_primitives::U256, TransportError> + where + Self: Sync + 'fut, + { + self.prepare( + "eth_getTransactionCount", + Cow::<(Address, &'static str)>::Owned((address, "latest")), + ) + .boxed() + } + + fn send_transaction<'s: 'fut, 'a: 'fut, 'fut>( + &'s self, + tx: &'a N::TransactionRequest, + ) -> MwareFut<'fut, N::Receipt, TransportError> { + self.prepare("eth_sendTransaction", Cow::Borrowed(tx)) + .boxed() + } +} + +#[cfg(test)] +mod test { + use crate::Provider; + use alloy_networks::Network; + + fn __compile_check() -> Box> { + unimplemented!() + } +} diff --git a/crates/transports/Cargo.toml b/crates/transports/Cargo.toml index f89342995da..106abf63b65 100644 --- a/crates/transports/Cargo.toml +++ b/crates/transports/Cargo.toml @@ -1,8 +1,33 @@ [package] name = "alloy-transports" -version = "0.1.0" -edition = "2021" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +exclude.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +alloy-json-rpc.workspace = true + +base64 = "0.21.0" + +futures-channel.workspace = true +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true, features = ["raw_value"] } +thiserror.workspace = true + +url.workspace = true +tower.workspace = true +pin-project.workspace = true + +# feature deps +reqwest = { version = "0.11.18", features = ["serde_json", "json"], optional = true } + +[features] +default = ["reqwest"] +reqwest = ["dep:reqwest"] diff --git a/crates/transports/README.md b/crates/transports/README.md new file mode 100644 index 00000000000..4adfcba471a --- /dev/null +++ b/crates/transports/README.md @@ -0,0 +1 @@ +## Ethers-transports diff --git a/crates/transports/src/batch.rs b/crates/transports/src/batch.rs new file mode 100644 index 00000000000..27201c3556e --- /dev/null +++ b/crates/transports/src/batch.rs @@ -0,0 +1,272 @@ +use std::{ + borrow::Cow, + collections::HashMap, + future::{Future, IntoFuture}, + marker::PhantomData, + pin::Pin, + task::{self, ready, Poll}, +}; + +use futures_channel::oneshot; +use serde_json::value::RawValue; + +use crate::{error::TransportError, transports::Transport, utils::to_json_raw_value, RpcClient}; +use alloy_json_rpc::{Id, JsonRpcRequest, JsonRpcResponse, RpcParam, RpcResult, RpcReturn}; + +type Channel = oneshot::Sender, TransportError>>; +type ChannelMap = HashMap; + +#[must_use = "A BatchRequest does nothing unless sent via `send_batch` and `.await`"] +/// A batch JSON-RPC request, used to bundle requests into a single transport +/// call. +#[derive(Debug)] +pub struct BatchRequest<'a, T> { + transport: &'a RpcClient, + + requests: Vec>, + + channels: ChannelMap, +} + +/// Awaits a single response for a request that has been included in a batch. +pub struct Waiter { + rx: oneshot::Receiver, TransportError>>, + _resp: PhantomData, +} + +impl From, TransportError>>> for Waiter { + fn from(rx: oneshot::Receiver, TransportError>>) -> Self { + Self { + rx, + _resp: PhantomData, + } + } +} + +impl std::future::Future for Waiter +where + Resp: RpcReturn, +{ + type Output = RpcResult; + + fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let resp = ready!(Pin::new(&mut self.rx).poll(cx)); + + Poll::Ready(match resp { + Ok(resp) => resp.deser_ok_or_else(|e, text| TransportError::deser_err(e, text)), + Err(e) => RpcResult::Err(TransportError::Custom(Box::new(e))), + }) + } +} + +#[pin_project::pin_project(project = CallStateProj)] +pub enum BatchFuture +where + Conn: Transport + Clone, + Conn::Future: Send, +{ + Prepared { + transport: Conn, + requests: Vec>, + channels: ChannelMap, + }, + SerError(Option), + AwaitingResponse { + channels: ChannelMap, + #[pin] + fut: Conn::Future, + }, + Complete, +} + +impl<'a, T> BatchRequest<'a, T> { + pub fn new(transport: &'a RpcClient) -> Self { + Self { + transport, + requests: Vec::with_capacity(10), + channels: HashMap::with_capacity(10), + } + } + + fn push_raw( + &mut self, + id: Id, + request: Box, + ) -> oneshot::Receiver, TransportError>> { + let (tx, rx) = oneshot::channel(); + self.channels.insert(id, tx); + self.requests.push(request); + rx + } + + fn push( + &mut self, + request: JsonRpcRequest, + ) -> Result, TransportError> { + to_json_raw_value(&request).map(|rv| self.push_raw(request.id, rv).into()) + } +} + +impl<'a, Conn> BatchRequest<'a, Conn> +where + Conn: Transport + Clone, +{ + #[must_use = "Waiters do nothing unless polled. A Waiter will never resolve unless its batch is sent."] + /// Add a call to the batch. + /// + /// ### Errors + /// + /// If the request cannot be serialized, this will return an error. + pub fn add_call( + &mut self, + method: &'static str, + params: &Params, + ) -> Result, TransportError> { + let request = self.transport.make_request(method, Cow::Borrowed(params)); + self.push(request) + } + + /// Send the batch future via its connection. + pub fn send_batch(self) -> BatchFuture { + BatchFuture::Prepared { + transport: self.transport.transport.clone(), + requests: self.requests, + channels: self.channels, + } + } +} + +impl<'a, T> IntoFuture for BatchRequest<'a, T> +where + T: Transport + Clone, +{ + type Output = as Future>::Output; + type IntoFuture = BatchFuture; + + fn into_future(self) -> Self::IntoFuture { + self.send_batch() + } +} + +impl BatchFuture +where + T: Transport + Clone, +{ + fn poll_prepared( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll<::Output> { + let CallStateProj::Prepared { + transport, + requests, + channels, + } = self.as_mut().project() + else { + unreachable!("Called poll_prepared in incorrect state") + }; + + if let Err(e) = task::ready!(transport.poll_ready(cx)) { + self.set(BatchFuture::Complete); + return Poll::Ready(Err(e)); + } + + // We only have mut refs, and we want ownership, so we just replace + // with 0-capacity collections. + let channels = std::mem::replace(channels, HashMap::with_capacity(0)); + let req = std::mem::replace(requests, Vec::with_capacity(0)); + + let req = match to_json_raw_value(&req) { + Ok(req) => req, + Err(e) => { + self.set(BatchFuture::Complete); + return Poll::Ready(Err(e)); + } + }; + + let fut = transport.call(req); + self.set(BatchFuture::AwaitingResponse { channels, fut }); + cx.waker().wake_by_ref(); + Poll::Pending + } + + fn poll_awaiting_response( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll<::Output> { + let CallStateProj::AwaitingResponse { channels, fut } = self.as_mut().project() else { + unreachable!("Called poll_awaiting_response in incorrect state") + }; + + // Has the service responded yet? + let responses = match ready!(fut.poll(cx)) { + Ok(responses) => responses, + Err(e) => { + self.set(BatchFuture::Complete); + return Poll::Ready(Err(e)); + } + }; + + let responses: Vec = match serde_json::from_str(responses.get()) { + Ok(responses) => responses, + Err(err) => { + self.set(BatchFuture::Complete); + return Poll::Ready(Err(TransportError::deser_err(err, responses.get()))); + } + }; + + // Send the responses via the channels by removing the channels from + // the map. + for response in responses { + if let Some(tx) = channels.remove(&response.id) { + let _ = tx.send(RpcResult::from(response)); + } + } + + // Any channels remaining in the map are missing responses. To avoid + // hanging futures, we send an error. + channels.drain().for_each(|(_, tx)| { + let _ = tx.send(RpcResult::Err(TransportError::MissingBatchResponse)); + }); + + self.set(BatchFuture::Complete); + Poll::Ready(Ok(())) + } + + fn poll_ser_error( + mut self: Pin<&mut Self>, + _cx: &mut task::Context<'_>, + ) -> Poll<::Output> { + let e = if let CallStateProj::SerError(e) = self.as_mut().project() { + e.take().expect("No error. This is a bug.") + } else { + unreachable!("Called poll_ser_error in incorrect state") + }; + + self.set(BatchFuture::Complete); + Poll::Ready(Err(e)) + } +} + +impl Future for BatchFuture +where + T: Transport + Clone, + T::Future: Send, +{ + type Output = Result<(), TransportError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + if matches!(*self.as_mut(), BatchFuture::Prepared { .. }) { + return self.poll_prepared(cx); + } + + if matches!(*self.as_mut(), BatchFuture::AwaitingResponse { .. }) { + return self.poll_awaiting_response(cx); + } + + if matches!(*self.as_mut(), BatchFuture::SerError(_)) { + return self.poll_ser_error(cx); + } + + panic!("Called poll on CallState in invalid state") + } +} diff --git a/crates/transports/src/call.rs b/crates/transports/src/call.rs new file mode 100644 index 00000000000..00061a988c8 --- /dev/null +++ b/crates/transports/src/call.rs @@ -0,0 +1,197 @@ +use crate::{ + error::TransportError, + transports::{JsonRpcLayer, JsonRpcService, Transport}, +}; + +use alloy_json_rpc::{JsonRpcRequest, RpcParam, RpcResult, RpcReturn}; +use core::panic; +use serde_json::value::RawValue; +use std::{future::Future, marker::PhantomData, pin::Pin, task}; +use tower::{Layer, Service}; + +/// The states of the [`RpcCall`] future. +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[pin_project::pin_project(project = CallStateProj)] +enum CallState +where + Conn: Transport + Clone, + Conn::Future: Send, + Params: RpcParam, +{ + Prepared { + request: Option>, + connection: JsonRpcService, + }, + AwaitingResponse { + #[pin] + fut: as Service>>::Future, + }, + Complete, +} + +impl CallState +where + Conn: Transport + Clone, + Conn::Future: Send, + Params: RpcParam, +{ + fn poll_prepared( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll<::Output> { + let fut = { + let CallStateProj::Prepared { + connection, + request, + } = self.as_mut().project() + else { + unreachable!("Called poll_prepared in incorrect state") + }; + + if let Err(e) = task::ready!(Service::>::poll_ready( + connection, cx + )) { + self.set(CallState::Complete); + return task::Poll::Ready(RpcResult::Err(e)); + } + let request = request.take().expect("No request. This is a bug."); + connection.call(request) + }; + + self.set(CallState::AwaitingResponse { fut }); + cx.waker().wake_by_ref(); + + task::Poll::Pending + } + + fn poll_awaiting( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll<::Output> { + let CallStateProj::AwaitingResponse { fut } = self.as_mut().project() else { + unreachable!("Called poll_awaiting in incorrect state") + }; + + let res = task::ready!(fut.poll(cx)); + + task::Poll::Ready(RpcResult::from(res)) + } +} + +impl Future for CallState +where + Conn: Transport + Clone, + Conn::Future: Send, + Params: RpcParam, +{ + type Output = RpcResult, TransportError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { + if matches!(*self.as_mut(), CallState::Prepared { .. }) { + return self.poll_prepared(cx); + } + + if matches!(*self.as_mut(), CallState::AwaitingResponse { .. }) { + return self.poll_awaiting(cx); + } + + panic!("Polled in bad state"); + } +} + +/// A prepared, but unsent, RPC call. +/// +/// This is a future that will send the request when polled. It contains a +/// [`JsonRpcRequest`], a [`Transport`], and knowledge of its expected response +/// type. Upon awaiting, it will send the request and wait for the response. It +/// will then deserialize the response into the expected type. +/// +/// Errors are captured in the [`RpcResult`] type. Rpc Calls will result in +/// either a successful response of the `Resp` type, an error response, or a +/// transport error. +/// +/// ### Note: +/// +/// Serializing the request is done lazily. The request is not serialized until +/// the future is polled. This differs from the behavior of +/// [`crate::BatchRequest`], which serializes greedily. This is because the +/// batch request must immediately erase the `Param` type to allow batching of +/// requests with different `Param` types, while the `RpcCall` may do so lazily. +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[pin_project::pin_project] +pub struct RpcCall +where + Conn: Transport + Clone, + Conn::Future: Send, + Params: RpcParam, +{ + #[pin] + state: CallState, + _pd: PhantomData Resp>, +} + +impl RpcCall +where + Conn: Transport + Clone, + Conn::Future: Send, + Params: RpcParam, +{ + #[doc(hidden)] + pub fn new(req: JsonRpcRequest, connection: Conn) -> Self { + Self { + state: CallState::Prepared { + request: Some(req), + connection: JsonRpcLayer.layer(connection), + }, + _pd: PhantomData, + } + } + + /// Get a mutable reference to the params of the request. + /// + /// This is useful for modifying the params after the request has been + /// prepared. + pub fn params(&mut self) -> &mut Params { + if let CallState::Prepared { request, .. } = &mut self.state { + &mut request + .as_mut() + .expect("No params in prepared. This is a bug") + .params + } else { + panic!("Cannot get params after request has been sent"); + } + } +} + +impl<'a, Conn, Params, Resp> RpcCall +where + Conn: Transport + Clone, + Conn::Future: Send, + Params: RpcParam + 'a, + Resp: RpcReturn, +{ + /// Convert this future into a boxed, pinned future, erasing its type. + pub fn boxed( + self, + ) -> Pin> + Send + 'a>> { + Box::pin(self) + } +} + +impl Future for RpcCall +where + Conn: Transport + Clone, + Conn::Future: Send, + Params: RpcParam, + Resp: RpcReturn, +{ + type Output = RpcResult; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { + let this = self.project(); + + let resp = task::ready!(this.state.poll(cx)); + + task::Poll::Ready(resp.deser_ok_or_else(|e, text| TransportError::deser_err(e, text))) + } +} diff --git a/crates/transports/src/client.rs b/crates/transports/src/client.rs new file mode 100644 index 00000000000..1b93c0fa7e1 --- /dev/null +++ b/crates/transports/src/client.rs @@ -0,0 +1,198 @@ +use alloy_json_rpc::{Id, JsonRpcRequest, RpcParam, RpcReturn}; +use reqwest::Url; +use serde_json::value::RawValue; +use tower::{layer::util::Stack, Layer, ServiceBuilder}; + +use std::{ + borrow::Cow, + sync::atomic::{AtomicU64, Ordering}, +}; + +use crate::{BatchRequest, BoxTransport, Http, RpcCall, Transport}; + +/// A JSON-RPC client. +/// +/// This struct manages a [`Transport`] and a request ID counter. It is used to +/// build [`RpcCall`] and [`BatchRequest`] objects. The client delegates +/// transport access to the calls. +/// +/// ### Note +/// +/// IDs are allocated sequentially, starting at 0. IDs are reserved via +/// [`RpcClient::next_id`]. Note that allocated IDs may not be used. There is +/// no guarantee that a prepared [`RpcCall`] will be sent, or that a sent call +/// will receive a response. +#[derive(Debug)] +pub struct RpcClient { + /// The underlying transport. + pub(crate) transport: T, + /// `true` if the transport is local. + pub(crate) is_local: bool, + /// The next request ID to use. + pub(crate) id: AtomicU64, +} + +impl RpcClient { + /// Create a new [`RpcClient`] with the given transport. + pub fn new(t: T, is_local: bool) -> Self { + Self { + transport: t, + is_local, + id: AtomicU64::new(0), + } + } + + /// `true` if the client believes the transport is local. + /// + /// This can be used to optimize remote API usage, or to change program + /// behavior on local endpoints. When the client is instantiated by parsing + /// a URL or other external input, this value is set on a best-efforts + /// basis and may be incorrect. + #[inline] + pub fn is_local(&self) -> bool { + self.is_local + } + + /// Set the `is_local` flag. + pub fn set_local(&mut self, is_local: bool) { + self.is_local = is_local; + } + + /// Reserve a request ID value. This is used to generate request IDs. + #[inline] + fn increment_id(&self) -> u64 { + self.id.fetch_add(1, Ordering::Relaxed) + } + + /// Reserve a request ID. + #[inline] + pub fn next_id(&self) -> Id { + Id::Number(self.increment_id()) + } +} + +impl RpcClient +where + T: Transport + Clone, + T::Future: Send, +{ + /// Create a new [`BatchRequest`] builder. + #[inline] + pub fn new_batch(&self) -> BatchRequest { + BatchRequest::new(self) + } + + /// Build a `JsonRpcRequest` with the given method and params. + /// + /// This function reserves an ID for the request, however the request + /// is not sent. To send a request, use [`RpcClient::prepare`] and await + /// the returned [`RpcCall`]. + pub fn make_request<'a, Params: RpcParam>( + &self, + method: &'static str, + params: Cow<'a, Params>, + ) -> JsonRpcRequest> { + JsonRpcRequest { + method, + params, + id: self.next_id(), + } + } + + /// Prepare an [`RpcCall`]. + /// + /// This function reserves an ID for the request, however the request + /// is not sent. To send a request, await the returned [`RpcCall`]. + /// + /// ### Note: + /// + /// Serialization is done lazily. It will not be performed until the call + /// is awaited. This means that if a serializer error occurs, it will not + /// be caught until the call is awaited. + pub fn prepare<'a, Params: RpcParam, Resp: RpcReturn>( + &self, + method: &'static str, + params: Cow<'a, Params>, + ) -> RpcCall, Resp> { + let request = self.make_request(method, params); + RpcCall::new(request, self.transport.clone()) + } + + /// Type erase the service in the transport, allowing it to be used in a + /// generic context. + /// + /// ## Note: + /// + /// This is for abstracting over `RpcClient` for multiple `T` by + /// erasing each type. E.g. if you have `RpcClient` and + /// `RpcClient` you can put both into a `Vec>`. + #[inline] + pub fn boxed(self) -> RpcClient { + RpcClient { + transport: self.transport.boxed(), + is_local: self.is_local, + id: self.id, + } + } +} + +/// A builder for the transport [`RpcClient`]. +/// +/// This is a wrapper around [`tower::ServiceBuilder`]. It allows you to +/// configure middleware layers that will be applied to the transport, and has +/// some shortcuts for common layers and transports. +pub struct ClientBuilder { + builder: ServiceBuilder, +} + +impl ClientBuilder { + /// Add a middleware layer to the stack. + /// + /// This is a wrapper around [`tower::ServiceBuilder::layer`]. Layers that + /// are added first will be called with the request first. + pub fn layer(self, layer: M) -> ClientBuilder> { + ClientBuilder { + builder: self.builder.layer(layer), + } + } + + /// Create a new [`RpcClient`] with the given transport and the configured + /// layers. + pub fn transport(self, transport: T, is_local: bool) -> RpcClient + where + L: Layer, + T: Transport, + L::Service: Transport, + >>::Future: Send, + { + RpcClient::new(self.builder.service(transport), is_local) + } + + /// Create a new [`RpcClient`] with an HTTP transport connecting to the + /// given URL and the configured layers. + pub fn http(self, url: Url) -> RpcClient + where + L: Layer>, + L::Service: Transport, + >>::Future: Send, + { + let transport = Http::new(url); + let is_local = transport.is_local(); + + self.transport(transport, is_local) + } +} + +#[cfg(test)] +mod test { + use crate::transports::Http; + + use super::RpcClient; + + #[test] + fn basic_instantiation() { + let h: RpcClient> = "http://localhost:8545".parse().unwrap(); + + assert!(h.is_local()); + } +} diff --git a/crates/transports/src/common.rs b/crates/transports/src/common.rs new file mode 100644 index 00000000000..a3d97ee3eda --- /dev/null +++ b/crates/transports/src/common.rs @@ -0,0 +1,37 @@ +use base64::{engine::general_purpose, Engine}; +use std::fmt; + +/// Basic or bearer authentication in http or websocket transport +/// +/// Use to inject username and password or an auth token into requests +#[derive(Clone, Debug)] +pub enum Authorization { + /// HTTP Basic Auth + Basic(String), + /// Bearer Auth + Bearer(String), +} + +impl Authorization { + /// Instantiate a new basic auth. + pub fn basic(username: impl AsRef, password: impl AsRef) -> Self { + let username = username.as_ref(); + let password = password.as_ref(); + let auth_secret = general_purpose::STANDARD.encode(format!("{username}:{password}")); + Self::Basic(auth_secret) + } + + /// Instantiate a new bearer auth. + pub fn bearer(token: impl Into) -> Self { + Self::Bearer(token.into()) + } +} + +impl fmt::Display for Authorization { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Authorization::Basic(auth_secret) => write!(f, "Basic {auth_secret}"), + Authorization::Bearer(token) => write!(f, "Bearer {token}"), + } + } +} diff --git a/crates/transports/src/error.rs b/crates/transports/src/error.rs new file mode 100644 index 00000000000..2465d0090f1 --- /dev/null +++ b/crates/transports/src/error.rs @@ -0,0 +1,60 @@ +use std::{error::Error as StdError, fmt::Debug}; +use thiserror::Error; + +/// Transport error. +/// +/// All transport errors are wrapped in this enum. +#[derive(Error, Debug)] +pub enum TransportError { + /// SerdeJson (de)ser + #[error("{err}")] + SerdeJson { + #[source] + err: serde_json::Error, + text: Option, + }, + + /// Missing batch response + #[error("Missing response in batch request")] + MissingBatchResponse, + + #[error(transparent)] + Custom(Box), + + /// Http transport + #[error(transparent)] + #[cfg(feature = "reqwest")] + Reqwest(#[from] reqwest::Error), +} + +impl TransportError { + /// Instantiate a new `TransportError` from a [`serde_json::Error`]. This + /// should be called when the error occurs during serialization. + pub fn ser_err(err: serde_json::Error) -> Self { + Self::SerdeJson { err, text: None } + } + + /// Instantiate a new `TransportError` from a [`serde_json::Error`] and the + /// text. This should be called when the error occurs during + /// deserialization. + pub fn deser_err(err: serde_json::Error, text: impl AsRef) -> Self { + Self::from((err, text)) + } + + /// Instantiate a new `TransportError` from a custom error. + pub fn custom(err: impl StdError + Send + Sync + 'static) -> Self { + Self::Custom(Box::new(err)) + } +} + +impl From<(serde_json::Error, T)> for TransportError +where + T: AsRef, +{ + fn from((err, text): (serde_json::Error, T)) -> Self { + Self::SerdeJson { + err, + text: Some(text.as_ref().to_string()), + } + } +} diff --git a/crates/transports/src/lib.rs b/crates/transports/src/lib.rs index 7d12d9af819..b489f06999a 100644 --- a/crates/transports/src/lib.rs +++ b/crates/transports/src/lib.rs @@ -1,14 +1,22 @@ -pub fn add(left: usize, right: usize) -> usize { - left + right -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); - } -} +mod call; +pub use call::RpcCall; + +mod common; +pub use common::Authorization; + +/// [`RpcClient`] and [`ClientBuilder`]. +pub mod client; +pub use client::{ClientBuilder, RpcClient}; + +mod error; +pub use error::TransportError; + +mod batch; +pub use batch::BatchRequest; + +mod transports; +pub use transports::{BoxTransport, Http, Transport}; + +pub use alloy_json_rpc::RpcResult; + +pub(crate) mod utils; diff --git a/crates/transports/src/transports/http/hyper.rs b/crates/transports/src/transports/http/hyper.rs new file mode 100644 index 00000000000..46330df3648 --- /dev/null +++ b/crates/transports/src/transports/http/hyper.rs @@ -0,0 +1 @@ +//! TODO diff --git a/crates/transports/src/transports/http/mod.rs b/crates/transports/src/transports/http/mod.rs new file mode 100644 index 00000000000..278ffb0366d --- /dev/null +++ b/crates/transports/src/transports/http/mod.rs @@ -0,0 +1,75 @@ +mod hyper; + +#[cfg(feature = "reqwest")] +mod reqwest; + +use crate::client::RpcClient; + +use std::{str::FromStr, sync::atomic::AtomicU64}; +use url::Url; + +/// An Http transport. +/// +/// The user must provide an internal http client that implements +/// `Service>`, and a URL to which to connect. +/// +/// Currently supported clients are: +#[cfg_attr(feature = "reqwest", doc = " - [`::reqwest::Client`]")] +#[derive(Debug, Clone)] +pub struct Http { + client: T, + url: Url, +} + +impl Http { + /// Create a new [`Http`] transport. + pub fn new(url: Url) -> Self + where + T: Default, + { + Self { + client: Default::default(), + url, + } + } + + /// Create a new [`Http`] transport with a custom client. + pub fn with_client(client: T, url: Url) -> Self { + Self { client, url } + } + + /// True if the connection has no hostname, or the hostname is `localhost` + /// or `127.0.0.1`. + pub fn is_local(&self) -> bool { + self.url + .host_str() + .map_or(true, |host| host == "localhost" || host == "127.0.0.1") + } +} + +impl RpcClient> +where + T: Default, +{ + /// Create a new [`RpcClient`] from a URL. + pub fn new_http(url: Url) -> Self { + let transport = Http::new(url); + let is_local = transport.is_local(); + Self { + transport, + is_local, + id: AtomicU64::new(0), + } + } +} + +impl FromStr for RpcClient> +where + T: Default, +{ + type Err = ::Err; + + fn from_str(s: &str) -> Result { + s.parse().map(Self::new_http) + } +} diff --git a/crates/transports/src/transports/http/reqwest.rs b/crates/transports/src/transports/http/reqwest.rs new file mode 100644 index 00000000000..137a4273a92 --- /dev/null +++ b/crates/transports/src/transports/http/reqwest.rs @@ -0,0 +1,32 @@ +use serde_json::value::RawValue; +use std::{future::Future, pin::Pin, task}; +use tower::Service; + +use crate::{Http, TransportError}; + +impl Service> for Http { + type Response = Box; + type Error = TransportError; + type Future = + Pin> + Send + 'static>>; + + #[inline] + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> task::Poll> { + self.client.poll_ready(cx).map_err(Into::into) + } + + #[inline] + fn call(&mut self, req: Box) -> Self::Future { + let replacement = self.client.clone(); + let client = std::mem::replace(&mut self.client, replacement); + + let url = self.url.clone(); + + Box::pin(async move { + let resp = client.post(url).json(&req).send().await?; + let json = resp.text().await?; + + RawValue::from_string(json).map_err(|err| TransportError::deser_err(err, "")) + }) + } +} diff --git a/crates/transports/src/transports/json_service.rs b/crates/transports/src/transports/json_service.rs new file mode 100644 index 00000000000..4bae1855ef7 --- /dev/null +++ b/crates/transports/src/transports/json_service.rs @@ -0,0 +1,138 @@ +use crate::{utils::to_json_raw_value, Transport, TransportError}; + +use alloy_json_rpc::{JsonRpcRequest, JsonRpcResponse, RpcParam}; +use serde::de::DeserializeOwned; +use serde_json::value::RawValue; +use std::{future::Future, pin::Pin, task}; +use tower::Service; + +/// A service layer that transforms [`JsonRpcRequest`] into [`JsonRpcResponse`] +/// by wrapping an inner service that implements [`Transport`]. +#[derive(Debug, Clone)] +pub(crate) struct JsonRpcService { + pub(crate) inner: S, +} + +/// Layer for [`JsonRpcService`] +#[derive(Debug, Copy, Clone)] +pub(crate) struct JsonRpcLayer; + +impl tower::Layer for JsonRpcLayer { + type Service = JsonRpcService; + + fn layer(&self, inner: S) -> Self::Service { + JsonRpcService { inner } + } +} + +impl Service> for JsonRpcService +where + S: Transport + Clone, + Param: RpcParam, +{ + type Response = JsonRpcResponse; + + type Error = TransportError; + + type Future = JsonRpcFuture; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> task::Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, req: JsonRpcRequest) -> Self::Future { + let replacement = self.inner.clone(); + let mut client = std::mem::replace(&mut self.inner, replacement); + + match to_json_raw_value(&req) { + Ok(raw) => JsonRpcFuture { + state: States::Pending { + fut: client.call(raw), + }, + _resp: std::marker::PhantomData, + }, + Err(e) => JsonRpcFuture { + state: States::Errored(Some(e)), + _resp: std::marker::PhantomData, + }, + } + } +} + +/// States for [`JsonRpcFuture`] +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[pin_project::pin_project(project = StatesProj)] +enum States { + Errored(Option), + Pending { + #[pin] + fut: F, + }, + Complete, +} + +impl States +where + F: Future, TransportError>>, +{ + pub fn poll_errored(mut self: Pin<&mut Self>) -> task::Poll<::Output> { + let e = if let StatesProj::Errored(e) = self.as_mut().project() { + e.take().expect("No error. This is a bug.") + } else { + unreachable!("Called poll_ser_error in incorrect state") + }; + + self.set(States::Complete); + task::Poll::Ready(Err(e)) + } + + pub fn poll_pending( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll<::Output> { + let StatesProj::Pending { fut } = self.as_mut().project() else { + unreachable!("Called poll_pending in incorrect state") + }; + + fut.poll(cx) + } +} + +impl Future for States +where + F: Future, TransportError>>, +{ + type Output = F::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { + match self.as_mut().project() { + StatesProj::Errored(_) => self.poll_errored(), + StatesProj::Pending { .. } => self.poll_pending(cx), + StatesProj::Complete => panic!("polled after completion"), + } + } +} + +/// Wrapper future to do JSON ser and deser +#[pin_project::pin_project] +pub struct JsonRpcFuture { + #[pin] + state: States, + _resp: std::marker::PhantomData Resp>, +} + +impl Future for JsonRpcFuture +where + F: Future, TransportError>>, + Resp: DeserializeOwned, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll { + let resp = task::ready!(self.project().state.poll(cx)); + + task::Poll::Ready(resp.and_then(|raw| { + serde_json::from_str(raw.get()).map_err(|err| TransportError::deser_err(err, raw.get())) + })) + } +} diff --git a/crates/transports/src/transports/mod.rs b/crates/transports/src/transports/mod.rs new file mode 100644 index 00000000000..a92f9fa2601 --- /dev/null +++ b/crates/transports/src/transports/mod.rs @@ -0,0 +1,121 @@ +mod http; +pub use http::Http; + +mod json_service; +pub(crate) use json_service::{JsonRpcLayer, JsonRpcService}; + +use serde_json::value::RawValue; +use std::{future::Future, pin::Pin}; +use tower::Service; + +use crate::TransportError; + +/// A marker trait for transports. +/// +/// This trait is blanket implemented for all appropriate types. To implement +/// this trait, you must implement the [`tower::Service`] trait with the +/// appropriate associated types. +pub trait Transport: + private::Sealed + + Service< + Box, + Response = Box, + Error = TransportError, + Future = Pin, TransportError>> + Send>>, + > + Send + + Sync + + 'static +{ + fn boxed(self) -> BoxTransport + where + Self: Sized + Clone + Send + Sync + 'static, + { + BoxTransport { + inner: Box::new(self), + } + } +} + +impl Transport for T where + T: private::Sealed + + Service< + Box, + Response = Box, + Error = TransportError, + Future = Pin, TransportError>> + Send>>, + > + Send + + Sync + + 'static +{ +} + +pub struct BoxTransport { + inner: Box, +} + +impl Clone for BoxTransport { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone_box(), + } + } +} + +trait CloneTransport: Transport { + fn clone_box(&self) -> Box; +} + +impl CloneTransport for T +where + T: Transport + Clone + Send + Sync, +{ + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } +} + +impl Service> for BoxTransport { + type Response = Box; + + type Error = TransportError; + + type Future = Pin, TransportError>> + Send>>; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Box) -> Self::Future { + self.inner.call(req) + } +} + +/// checks trait + send + sync + 'static +fn __compile_check() { + fn inner() { + todo!() + } + inner::(); +} + +mod private { + use super::*; + + pub trait Sealed {} + impl Sealed for T where + T: Service< + Box, + Response = Box, + Error = TransportError, + Future = Pin< + Box, TransportError>> + Send>, + >, + > + Send + + Sync + + 'static + { + } +} diff --git a/crates/transports/src/utils.rs b/crates/transports/src/utils.rs new file mode 100644 index 00000000000..5b6ffb7d238 --- /dev/null +++ b/crates/transports/src/utils.rs @@ -0,0 +1,14 @@ +use serde::Serialize; +use serde_json::{self, value::RawValue}; + +use crate::error::TransportError; + +/// Convert to a `Box` from a `Serialize` type, mapping the error +/// to a `TransportError`. +pub(crate) fn to_json_raw_value(s: &S) -> Result, TransportError> +where + S: Serialize, +{ + RawValue::from_string(serde_json::to_string(s).map_err(TransportError::ser_err)?) + .map_err(TransportError::ser_err) +} diff --git a/src/lib.rs b/src/lib.rs deleted file mode 100644 index 7d12d9af819..00000000000 --- a/src/lib.rs +++ /dev/null @@ -1,14 +0,0 @@ -pub fn add(left: usize, right: usize) -> usize { - left + right -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); - } -}