diff --git a/Cargo.lock b/Cargo.lock index 808058c3f31e..69f9cb25c24e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -123,7 +123,7 @@ dependencies = [ [[package]] name = "alloy-json-rpc" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-primitives", "serde", @@ -134,7 +134,7 @@ dependencies = [ [[package]] name = "alloy-networks" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -169,7 +169,7 @@ dependencies = [ [[package]] name = "alloy-providers" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-networks", "alloy-primitives", @@ -187,7 +187,7 @@ dependencies = [ [[package]] name = "alloy-pubsub" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-json-rpc", "alloy-primitives", @@ -226,7 +226,7 @@ dependencies = [ [[package]] name = "alloy-rpc-client" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-json-rpc", "alloy-transport", @@ -243,7 +243,7 @@ dependencies = [ [[package]] name = "alloy-rpc-types" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-primitives", "alloy-rlp", @@ -300,7 +300,7 @@ dependencies = [ [[package]] name = "alloy-transport" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-json-rpc", "base64 0.21.5", @@ -316,7 +316,7 @@ dependencies = [ [[package]] name = "alloy-transport-http" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-json-rpc", "alloy-transport", @@ -329,7 +329,7 @@ dependencies = [ [[package]] name = "alloy-transport-ipc" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-json-rpc", "alloy-pubsub", @@ -346,7 +346,7 @@ dependencies = [ [[package]] name = "alloy-transport-ws" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#0ccb86904ef2fc1e8d51790ff3ab4062e1e28c45" +source = "git+https://github.com/alloy-rs/alloy/?branch=onbjerg/alloy-temp-provider-trait#1d825ecdbaa54877813aa372b641181e922d2d86" dependencies = [ "alloy-pubsub", "alloy-transport", @@ -3192,6 +3192,7 @@ dependencies = [ "alloy-primitives", "alloy-providers", "alloy-pubsub", + "alloy-rpc-client", "alloy-rpc-types", "alloy-sol-types", "alloy-transport", diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 2c6443703a7c..21f970ccb06f 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -22,6 +22,7 @@ alloy-dyn-abi = { workspace = true, features = ["arbitrary", "eip712"] } alloy-json-abi.workspace = true alloy-primitives = { workspace = true, features = ["serde", "getrandom", "arbitrary", "rlp"] } alloy-rpc-types.workspace = true +alloy-rpc-client.workspace = true alloy-providers.workspace = true alloy-transport.workspace = true alloy-transport-http.workspace = true diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index a75ffcbb75a0..ef8eb88528c0 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -8,7 +8,6 @@ extern crate self as foundry_common; extern crate tracing; pub mod abi; -pub mod alloy_runtime_transport; pub mod calc; pub mod clap_helpers; pub mod compile; diff --git a/crates/common/src/provider/alloy.rs b/crates/common/src/provider/alloy.rs index a2c29869ab16..bb52778145e9 100644 --- a/crates/common/src/provider/alloy.rs +++ b/crates/common/src/provider/alloy.rs @@ -1,10 +1,11 @@ //! Commonly used helpers to construct `Provider`s use crate::{ - alloy_runtime_transport::RuntimeTransportBuilder, ALCHEMY_FREE_TIER_CUPS, REQUEST_TIMEOUT, + provider::runtime_transport::RuntimeTransportBuilder, ALCHEMY_FREE_TIER_CUPS, REQUEST_TIMEOUT, }; use alloy_primitives::U256; use alloy_providers::provider::{Provider, TempProvider}; +use alloy_rpc_client::ClientBuilder; use alloy_transport::BoxTransport; use ethers_middleware::gas_oracle::{GasCategory, GasOracle, Polygon}; use eyre::{Result, WrapErr}; @@ -17,6 +18,8 @@ use std::{ }; use url::ParseError; +use super::tower::RetryBackoffLayer; + /// Helper type alias for a retry provider pub type RetryProvider = Provider; @@ -217,24 +220,31 @@ impl ProviderBuilder { let ProviderBuilder { url, chain: _, - max_retry: _, - timeout_retry: _, - initial_backoff: _, + max_retry, + timeout_retry, + initial_backoff, timeout, - compute_units_per_second: _, + compute_units_per_second, jwt, headers, } = self; let url = url?; - // todo: port alchemy compute units logic? - // todo: provider polling interval - let transport_builder = RuntimeTransportBuilder::new(url.clone()) + let retry_layer = RetryBackoffLayer::new( + max_retry, + timeout_retry, + initial_backoff, + compute_units_per_second, + ); + let transport = RuntimeTransportBuilder::new(url.clone()) .with_timeout(timeout) .with_headers(headers) - .with_jwt(jwt); + .with_jwt(jwt) + .build(); + let client = ClientBuilder::default().layer(retry_layer).transport(transport, false); - Ok(Provider::new(transport_builder.build().boxed())) + // todo: provider polling interval + Ok(Provider::new_with_client(client.boxed())) } } diff --git a/crates/common/src/provider/mod.rs b/crates/common/src/provider/mod.rs index e02e7f7aef39..cbd9ecbd00ef 100644 --- a/crates/common/src/provider/mod.rs +++ b/crates/common/src/provider/mod.rs @@ -2,3 +2,6 @@ pub mod alloy; pub mod ethers; +pub mod retry; +pub mod runtime_transport; +pub mod tower; diff --git a/crates/common/src/provider/retry.rs b/crates/common/src/provider/retry.rs new file mode 100644 index 000000000000..59bd2c2e80a2 --- /dev/null +++ b/crates/common/src/provider/retry.rs @@ -0,0 +1,95 @@ +//! An utility trait for retrying requests based on the error type. See [TransportError]. +use alloy_json_rpc::ErrorPayload; +use alloy_transport::TransportError; +use serde::Deserialize; + +/// [RetryPolicy] defines logic for which [JsonRpcClient::Error] instances should +/// the client retry the request and try to recover from. +pub trait RetryPolicy: Send + Sync + std::fmt::Debug { + /// Whether to retry the request based on the given `error` + fn should_retry(&self, error: &TransportError) -> bool; + + /// Providers may include the `backoff` in the error response directly + fn backoff_hint(&self, error: &TransportError) -> Option; +} + +/// Implements [RetryPolicy] that will retry requests that errored with +/// status code 429 i.e. TOO_MANY_REQUESTS +/// +/// Infura often fails with a `"header not found"` rpc error which is apparently linked to load +/// balancing, which are retried as well. +#[derive(Clone, Debug, Default)] +pub struct RateLimitRetryPolicy; + +impl RetryPolicy for RateLimitRetryPolicy { + fn backoff_hint(&self, error: &TransportError) -> Option { + if let TransportError::ErrorResp(resp) = error { + println!("resp: {:?}", resp); + let data = resp.try_data_as::(); + if let Some(Ok(data)) = data { + // if daily rate limit exceeded, infura returns the requested backoff in the error + // response + let backoff_seconds = &data["rate"]["backoff_seconds"]; + // infura rate limit error + if let Some(seconds) = backoff_seconds.as_u64() { + return Some(std::time::Duration::from_secs(seconds)) + } + if let Some(seconds) = backoff_seconds.as_f64() { + return Some(std::time::Duration::from_secs(seconds as u64 + 1)) + } + } + } + None + } + + fn should_retry(&self, error: &TransportError) -> bool { + match error { + TransportError::Transport(_) => true, + // The transport could not serialize the error itself. The request was malformed from + // the start. + TransportError::SerError(_) => false, + TransportError::DeserError { text, .. } => { + // some providers send invalid JSON RPC in the error case (no `id:u64`), but the + // text should be a `JsonRpcError` + #[derive(Deserialize)] + struct Resp { + error: ErrorPayload, + } + + if let Ok(resp) = serde_json::from_str::(text) { + return should_retry_json_rpc_error(&resp.error) + } + false + } + TransportError::ErrorResp(err) => should_retry_json_rpc_error(err), + } + } +} + +/// Analyzes the [ErrorPayload] and decides if the request should be retried based on the +/// error code or the message. +fn should_retry_json_rpc_error(error: &ErrorPayload) -> bool { + let ErrorPayload { code, message, .. } = error; + // alchemy throws it this way + if *code == 429 { + return true + } + + // This is an infura error code for `exceeded project rate limit` + if *code == -32005 { + return true + } + + // alternative alchemy error for specific IPs + if *code == -32016 && message.contains("rate limit") { + return true + } + + match message.as_str() { + // this is commonly thrown by infura and is apparently a load balancer issue, see also + "header not found" => true, + // also thrown by infura if out of budget for the day and ratelimited + "daily request count exceeded, request rate limited" => true, + _ => false, + } +} diff --git a/crates/common/src/alloy_runtime_transport.rs b/crates/common/src/provider/runtime_transport.rs similarity index 92% rename from crates/common/src/alloy_runtime_transport.rs rename to crates/common/src/provider/runtime_transport.rs index 22844ccc4a01..67f83998d44c 100644 --- a/crates/common/src/alloy_runtime_transport.rs +++ b/crates/common/src/provider/runtime_transport.rs @@ -1,5 +1,6 @@ //! Runtime transport that connects on first request, which can take either of an HTTP, -//! WebSocket, or IPC transport. +//! WebSocket, or IPC transport and supports retries based on CUPS logic. +use crate::REQUEST_TIMEOUT; use alloy_json_rpc::{RequestPacket, ResponsePacket}; use alloy_pubsub::{PubSubConnect, PubSubFrontend}; use alloy_transport::{ @@ -64,6 +65,7 @@ pub enum RuntimeTransportError { /// A runtime transport is a custom [alloy_transport::Transport] that only connects when the *first* /// request is made. When the first request is made, it will connect to the runtime using either an /// HTTP WebSocket, or IPC transport depending on the URL used. +/// It also supports retries for rate-limiting and timeout-related errors. #[derive(Clone, Debug, Error)] pub struct RuntimeTransport { /// The inner actual transport used. @@ -79,6 +81,7 @@ pub struct RuntimeTransport { } /// A builder for [RuntimeTransport]. +#[derive(Debug)] pub struct RuntimeTransportBuilder { url: Url, headers: Vec, @@ -89,7 +92,7 @@ pub struct RuntimeTransportBuilder { impl RuntimeTransportBuilder { /// Create a new builder with the given URL. pub fn new(url: Url) -> Self { - Self { url, headers: vec![], jwt: None, timeout: std::time::Duration::from_secs(30) } + Self { url, headers: vec![], jwt: None, timeout: REQUEST_TIMEOUT } } /// Set the URL for the transport. @@ -130,16 +133,6 @@ impl ::core::fmt::Display for RuntimeTransport { } impl RuntimeTransport { - /// Create a new [RuntimeTransport]. - pub fn new( - url: Url, - headers: Vec, - jwt: Option, - timeout: std::time::Duration, - ) -> Self { - Self { inner: Arc::new(RwLock::new(None)), url, headers, jwt, timeout } - } - /// Connects the underlying transport, depending on the URL scheme. pub async fn connect(&self) -> Result { match self.url.scheme() { @@ -210,9 +203,11 @@ impl RuntimeTransport { /// Sends a request using the underlying transport. /// If this is the first request, it will connect to the appropiate transport depending on the - /// URL scheme. For sending the actual request, this action is delegated down to the - /// underlying transport through Tower's call. See tower's [tower::Service] trait for more - /// information. + /// URL scheme. When sending the request, retries will be automatically handled depending + /// on the parameters set on the [RuntimeTransport]. + /// For sending the actual request, this action is delegated down to the + /// underlying transport through Tower's [tower::Service::call]. See tower's [tower::Service] + /// trait for more information. pub fn request(&self, req: RequestPacket) -> TransportFut<'static> { let this = self.clone(); Box::pin(async move { @@ -226,10 +221,11 @@ impl RuntimeTransport { let inner_mut = inner.as_mut().expect("We should have an inner transport."); match inner_mut { - InnerTransport::Http(http) => http.call(req).await, - InnerTransport::Ws(ws) => ws.call(req).await, - InnerTransport::Ipc(ipc) => ipc.call(req).await, + InnerTransport::Http(http) => http.call(req), + InnerTransport::Ws(ws) => ws.call(req), + InnerTransport::Ipc(ipc) => ipc.call(req), } + .await }) } diff --git a/crates/common/src/provider/tower.rs b/crates/common/src/provider/tower.rs new file mode 100644 index 000000000000..5e08b836c991 --- /dev/null +++ b/crates/common/src/provider/tower.rs @@ -0,0 +1,192 @@ +//! Alloy-related tower middleware for retrying rate-limited requests +//! and applying backoff. +use std::{ + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, + task::{Context, Poll}, +}; + +use alloy_json_rpc::{RequestPacket, ResponsePacket}; +use alloy_transport::{TransportError, TransportErrorKind, TransportFut}; +use tower::Service; + +use super::{ + retry::{RateLimitRetryPolicy, RetryPolicy}, + runtime_transport::RuntimeTransport, +}; + +/// An Alloy Tower Layer that is responsible for retrying requests based on the +/// error type. See [TransportError]. +#[derive(Debug, Clone)] +pub struct RetryBackoffLayer { + /// The maximum number of retries for rate limit errors + max_rate_limit_retries: u32, + /// The maximum number of retries for timeout errors + max_timeout_retries: u32, + /// The initial backoff in milliseconds + initial_backoff: u64, + /// The number of compute units per second for this provider + compute_units_per_second: u64, +} + +impl RetryBackoffLayer { + /// Creates a new [RetryWithPolicyLayer] with the given parameters + pub fn new( + max_rate_limit_retries: u32, + max_timeout_retries: u32, + initial_backoff: u64, + compute_units_per_second: u64, + ) -> Self { + Self { + max_rate_limit_retries, + max_timeout_retries, + initial_backoff, + compute_units_per_second, + } + } +} + +impl tower::layer::Layer for RetryBackoffLayer { + type Service = RetryBackoffService; + + fn layer(&self, inner: S) -> Self::Service { + RetryBackoffService { + inner, + policy: RateLimitRetryPolicy, + max_rate_limit_retries: self.max_rate_limit_retries, + max_timeout_retries: self.max_timeout_retries, + initial_backoff: self.initial_backoff, + compute_units_per_second: self.compute_units_per_second, + requests_enqueued: Arc::new(AtomicU32::new(0)), + } + } +} + +/// An Alloy Tower Service that is responsible for retrying requests based on the +/// error type. See [TransportError] and [RetryWithPolicyLayer]. +#[derive(Debug, Clone)] +pub struct RetryBackoffService { + /// The inner service + inner: S, + /// The retry policy + policy: RateLimitRetryPolicy, + /// The maximum number of retries for rate limit errors + max_rate_limit_retries: u32, + /// The maximum number of retries for timeout errors + max_timeout_retries: u32, + /// The initial backoff in milliseconds + initial_backoff: u64, + /// The number of compute units per second for this service + compute_units_per_second: u64, + /// The number of requests currently enqueued + requests_enqueued: Arc, +} + +// impl tower service +impl Service for RetryBackoffService { + type Response = ResponsePacket; + type Error = TransportError; + type Future = TransportFut<'static>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // Our middleware doesn't care about backpressure, so it's ready as long + // as the inner service is ready. + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: RequestPacket) -> Self::Future { + let mut this = self.clone(); + Box::pin(async move { + let ahead_in_queue = this.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64; + let mut rate_limit_retry_number: u32 = 0; + let mut timeout_retries: u32 = 0; + loop { + let err; + let fut = this.inner.call(request.clone()).await; + + match fut { + Ok(res) => { + this.requests_enqueued.fetch_sub(1, Ordering::SeqCst); + return Ok(res) + } + Err(e) => err = e, + } + + let err = TransportError::from(err); + let should_retry = this.policy.should_retry(&err); + if should_retry { + rate_limit_retry_number += 1; + if rate_limit_retry_number > this.max_rate_limit_retries { + return Err(TransportErrorKind::custom_str("Max retries exceeded")) + } + + let current_queued_reqs = this.requests_enqueued.load(Ordering::SeqCst) as u64; + + // try to extract the requested backoff from the error or compute the next + // backoff based on retry count + let mut next_backoff = this + .policy + .backoff_hint(&err) + .unwrap_or_else(|| std::time::Duration::from_millis(this.initial_backoff)); + + // requests are usually weighted and can vary from 10 CU to several 100 CU, + // cheaper requests are more common some example alchemy + // weights: + // - `eth_getStorageAt`: 17 + // - `eth_getBlockByNumber`: 16 + // - `eth_newFilter`: 20 + // + // (coming from forking mode) assuming here that storage request will be the + // driver for Rate limits we choose `17` as the average cost + // of any request + const AVG_COST: u64 = 17u64; + let seconds_to_wait_for_compute_budget = compute_unit_offset_in_secs( + AVG_COST, + this.compute_units_per_second, + current_queued_reqs, + ahead_in_queue, + ); + next_backoff += + std::time::Duration::from_secs(seconds_to_wait_for_compute_budget); + + tokio::time::sleep(next_backoff).await; + } else { + if timeout_retries < this.max_timeout_retries { + timeout_retries += 1; + continue; + } + + this.requests_enqueued.fetch_sub(1, Ordering::SeqCst); + return Err(TransportErrorKind::custom_str("Max retries exceeded")) + } + } + }) + } +} + +/// Calculates an offset in seconds by taking into account the number of currently queued requests, +/// number of requests that were ahead in the queue when the request was first issued, the average +/// cost a weighted request (heuristic), and the number of available compute units per seconds. +/// +/// Returns the number of seconds (the unit the remote endpoint measures compute budget) a request +/// is supposed to wait to not get rate limited. The budget per second is +/// `compute_units_per_second`, assuming an average cost of `avg_cost` this allows (in theory) +/// `compute_units_per_second / avg_cost` requests per seconds without getting rate limited. +/// By taking into account the number of concurrent request and the position in queue when the +/// request was first issued and determine the number of seconds a request is supposed to wait, if +/// at all +fn compute_unit_offset_in_secs( + avg_cost: u64, + compute_units_per_second: u64, + current_queued_requests: u64, + ahead_in_queue: u64, +) -> u64 { + let request_capacity_per_second = compute_units_per_second.saturating_div(avg_cost); + if current_queued_requests > request_capacity_per_second { + current_queued_requests.min(ahead_in_queue).saturating_div(request_capacity_per_second) + } else { + 0 + } +}