Skip to content

Commit

Permalink
feat(transport): retry layer (alloy-rs#849)
Browse files Browse the repository at this point in the history
  • Loading branch information
yash-atreya authored and ben186 committed Jul 27, 2024
1 parent 0ea42db commit c9dfc80
Show file tree
Hide file tree
Showing 6 changed files with 350 additions and 4 deletions.
47 changes: 47 additions & 0 deletions crates/json-rpc/src/response/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,53 @@ pub struct ErrorPayload<ErrData = Box<RawValue>> {
pub data: Option<ErrData>,
}

impl<E> ErrorPayload<E> {
/// Analyzes the [ErrorPayload] and decides if the request should be retried based on the
/// error code or the message.
pub fn is_retry_err(&self) -> bool {
// alchemy throws it this way
if self.code == 429 {
return true;
}

// This is an infura error code for `exceeded project rate limit`
if self.code == -32005 {
return true;
}

// alternative alchemy error for specific IPs
if self.code == -32016 && self.message.contains("rate limit") {
return true;
}

// quick node error `"credits limited to 6000/sec"`
// <https://github.com/foundry-rs/foundry/pull/6712#issuecomment-1951441240>
if self.code == -32012 && self.message.contains("credits") {
return true;
}

// quick node rate limit error: `100/second request limit reached - reduce calls per second
// or upgrade your account at quicknode.com` <https://github.com/foundry-rs/foundry/issues/4894>
if self.code == -32007 && self.message.contains("request limit reached") {
return true;
}

match self.message.as_str() {
// this is commonly thrown by infura and is apparently a load balancer issue, see also <https://github.com/MetaMask/metamask-extension/issues/7234>
"header not found" => true,
// also thrown by infura if out of budget for the day and ratelimited
"daily request count exceeded, request rate limited" => true,
msg => {
msg.contains("rate limit")
|| msg.contains("rate exceeded")
|| msg.contains("too many requests")
|| msg.contains("credits limited")
|| msg.contains("request limit")
}
}
}
}

impl<ErrData> fmt::Display for ErrorPayload<ErrData> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "error code {}: {}", self.code, self.message)
Expand Down
6 changes: 3 additions & 3 deletions crates/transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ serde.workspace = true
thiserror.workspace = true
tower.workspace = true
url.workspace = true
tracing.workspace = true
tokio = { workspace = true, features = ["rt", "time"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = { version = "0.4", optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { workspace = true, features = ["rt"] }

[features]

wasm-bindgen = ["dep:wasm-bindgen-futures"]
65 changes: 64 additions & 1 deletion crates/transport/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use alloy_json_rpc::{Id, RpcError, RpcResult};
use alloy_json_rpc::{ErrorPayload, Id, RpcError, RpcResult};
use serde::Deserialize;
use serde_json::value::RawValue;
use std::{error::Error as StdError, fmt::Debug};
use thiserror::Error;
Expand Down Expand Up @@ -110,3 +111,65 @@ impl HttpError {
false
}
}

/// Extension trait to implement methods for [`RpcError<TransportErrorKind, E>`].
pub(crate) trait RpcErrorExt {
/// Analyzes whether to retry the request depending on the error.
fn is_retryable(&self) -> bool;

/// Fetches the backoff hint from the error message if present
fn backoff_hint(&self) -> Option<std::time::Duration>;
}

impl RpcErrorExt for RpcError<TransportErrorKind> {
fn is_retryable(&self) -> bool {
match self {
// There was a transport-level error. This is either a non-retryable error,
// or a server error that should be retried.
Self::Transport(err) => err.is_retry_err(),
// The transport could not serialize the error itself. The request was malformed from
// the start.
Self::SerError(_) => false,
Self::DeserError { text, .. } => {
if let Ok(resp) = serde_json::from_str::<ErrorPayload>(text) {
return resp.is_retry_err();
}

// some providers send invalid JSON RPC in the error case (no `id:u64`), but the
// text should be a `JsonRpcError`
#[derive(Deserialize)]
struct Resp {
error: ErrorPayload,
}

if let Ok(resp) = serde_json::from_str::<Resp>(text) {
return resp.error.is_retry_err();
}

false
}
Self::ErrorResp(err) => err.is_retry_err(),
Self::NullResp => true,
_ => false,
}
}

fn backoff_hint(&self) -> Option<std::time::Duration> {
if let Self::ErrorResp(resp) = self {
let data = resp.try_data_as::<serde_json::Value>();
if let Some(Ok(data)) = data {
// if daily rate limit exceeded, infura returns the requested backoff in the error
// response
let backoff_seconds = &data["rate"]["backoff_seconds"];
// infura rate limit error
if let Some(seconds) = backoff_seconds.as_u64() {
return Some(std::time::Duration::from_secs(seconds));
}
if let Some(seconds) = backoff_seconds.as_f64() {
return Some(std::time::Duration::from_secs(seconds as u64 + 1));
}
}
}
None
}
}
6 changes: 6 additions & 0 deletions crates/transport/src/layers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
//! Module for housing transport layers.

mod retry;

/// RetryBackoffLayer
pub use retry::{RateLimitRetryPolicy, RetryBackoffLayer, RetryBackoffService, RetryPolicy};
228 changes: 228 additions & 0 deletions crates/transport/src/layers/retry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
use crate::{
error::{RpcErrorExt, TransportError, TransportErrorKind},
TransportFut,
};
use alloy_json_rpc::{RequestPacket, ResponsePacket};
use std::{
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
task::{Context, Poll},
time::Duration,
};
use tower::{Layer, Service};
use tracing::trace;

/// A Transport Layer that is responsible for retrying requests based on the
/// error type. See [`TransportError`].
///
/// TransportError: crate::error::TransportError
#[derive(Debug, Clone)]
pub struct RetryBackoffLayer {
/// The maximum number of retries for rate limit errors
max_rate_limit_retries: u32,
/// The initial backoff in milliseconds
initial_backoff: u64,
/// The number of compute units per second for this provider
compute_units_per_second: u64,
}

impl RetryBackoffLayer {
/// Creates a new retry layer with the given parameters.
pub const fn new(
max_rate_limit_retries: u32,
initial_backoff: u64,
compute_units_per_second: u64,
) -> Self {
Self { max_rate_limit_retries, initial_backoff, compute_units_per_second }
}
}

/// [RateLimitRetryPolicy] implements [RetryPolicy] to determine whether to retry depending on the
/// err.
#[derive(Debug, Copy, Clone, Default)]
#[non_exhaustive]
pub struct RateLimitRetryPolicy;

/// [RetryPolicy] defines logic for which [TransportError] instances should
/// the client retry the request and try to recover from.
pub trait RetryPolicy: Send + Sync + std::fmt::Debug {
/// Whether to retry the request based on the given `error`
fn should_retry(&self, error: &TransportError) -> bool;

/// Providers may include the `backoff` in the error response directly
fn backoff_hint(&self, error: &TransportError) -> Option<std::time::Duration>;
}

impl RetryPolicy for RateLimitRetryPolicy {
fn should_retry(&self, error: &TransportError) -> bool {
error.is_retryable()
}

/// Provides a backoff hint if the error response contains it
fn backoff_hint(&self, error: &TransportError) -> Option<std::time::Duration> {
error.backoff_hint()
}
}

impl<S> Layer<S> for RetryBackoffLayer {
type Service = RetryBackoffService<S>;

fn layer(&self, inner: S) -> Self::Service {
RetryBackoffService {
inner,
policy: RateLimitRetryPolicy,
max_rate_limit_retries: self.max_rate_limit_retries,
initial_backoff: self.initial_backoff,
compute_units_per_second: self.compute_units_per_second,
requests_enqueued: Arc::new(AtomicU32::new(0)),
}
}
}

/// A Tower Service used by the RetryBackoffLayer that is responsible for retrying requests based
/// on the error type. See [TransportError] and [RateLimitRetryPolicy].
#[derive(Debug, Clone)]
pub struct RetryBackoffService<S> {
/// The inner service
inner: S,
/// The retry policy
policy: RateLimitRetryPolicy,
/// The maximum number of retries for rate limit errors
max_rate_limit_retries: u32,
/// The initial backoff in milliseconds
initial_backoff: u64,
/// The number of compute units per second for this service
compute_units_per_second: u64,
/// The number of requests currently enqueued
requests_enqueued: Arc<AtomicU32>,
}

impl<S> RetryBackoffService<S> {
const fn initial_backoff(&self) -> Duration {
Duration::from_millis(self.initial_backoff)
}
}

impl<S> Service<RequestPacket> for RetryBackoffService<S>
where
S: Service<RequestPacket, Response = ResponsePacket, Error = TransportError>
+ Send
+ 'static
+ Clone,
S::Future: Send + 'static,
{
type Response = ResponsePacket;
type Error = TransportError;
type Future = TransportFut<'static>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// Our middleware doesn't care about backpressure, so it's ready as long
// as the inner service is ready.
self.inner.poll_ready(cx)
}

fn call(&mut self, request: RequestPacket) -> Self::Future {
let inner = self.inner.clone();
let this = self.clone();
let mut inner = std::mem::replace(&mut self.inner, inner);
Box::pin(async move {
let ahead_in_queue = this.requests_enqueued.fetch_add(1, Ordering::SeqCst) as u64;
let mut rate_limit_retry_number: u32 = 0;
loop {
let err;
let res = inner.call(request.clone()).await;

match res {
Ok(res) => {
if let Some(e) = res.as_error() {
err = TransportError::ErrorResp(e.clone())
} else {
this.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
return Ok(res);
}
}
Err(e) => err = e,
}

let should_retry = this.policy.should_retry(&err);
if should_retry {
rate_limit_retry_number += 1;
if rate_limit_retry_number > this.max_rate_limit_retries {
return Err(TransportErrorKind::custom_str(&format!(
"Max retries exceeded {}",
err
)));
}
trace!(%err, "retrying request");

let current_queued_reqs = this.requests_enqueued.load(Ordering::SeqCst) as u64;

// try to extract the requested backoff from the error or compute the next
// backoff based on retry count
let backoff_hint = this.policy.backoff_hint(&err);
let next_backoff = backoff_hint.unwrap_or_else(|| this.initial_backoff());

// requests are usually weighted and can vary from 10 CU to several 100 CU,
// cheaper requests are more common some example alchemy
// weights:
// - `eth_getStorageAt`: 17
// - `eth_getBlockByNumber`: 16
// - `eth_newFilter`: 20
//
// (coming from forking mode) assuming here that storage request will be the
// driver for Rate limits we choose `17` as the average cost
// of any request
const AVG_COST: u64 = 17u64;
let seconds_to_wait_for_compute_budget = compute_unit_offset_in_secs(
AVG_COST,
this.compute_units_per_second,
current_queued_reqs,
ahead_in_queue,
);
let total_backoff = next_backoff
+ std::time::Duration::from_secs(seconds_to_wait_for_compute_budget);

trace!(
total_backoff_millis = total_backoff.as_millis(),
budget_backoff_millis = seconds_to_wait_for_compute_budget * 1000,
default_backoff_millis = next_backoff.as_millis(),
backoff_hint_millis = backoff_hint.map(|d| d.as_millis()),
"(all in ms) backing off due to rate limit"
);

tokio::time::sleep(total_backoff).await;
} else {
this.requests_enqueued.fetch_sub(1, Ordering::SeqCst);
return Err(err);
}
}
})
}
}

/// Calculates an offset in seconds by taking into account the number of currently queued requests,
/// number of requests that were ahead in the queue when the request was first issued, the average
/// cost a weighted request (heuristic), and the number of available compute units per seconds.
///
/// Returns the number of seconds (the unit the remote endpoint measures compute budget) a request
/// is supposed to wait to not get rate limited. The budget per second is
/// `compute_units_per_second`, assuming an average cost of `avg_cost` this allows (in theory)
/// `compute_units_per_second / avg_cost` requests per seconds without getting rate limited.
/// By taking into account the number of concurrent request and the position in queue when the
/// request was first issued and determine the number of seconds a request is supposed to wait, if
/// at all
fn compute_unit_offset_in_secs(
avg_cost: u64,
compute_units_per_second: u64,
current_queued_requests: u64,
ahead_in_queue: u64,
) -> u64 {
let request_capacity_per_second = compute_units_per_second.saturating_div(avg_cost);
if current_queued_requests > request_capacity_per_second {
current_queued_requests.min(ahead_in_queue).saturating_div(request_capacity_per_second)
} else {
0
}
}
2 changes: 2 additions & 0 deletions crates/transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub use r#trait::Transport;
pub use alloy_json_rpc::{RpcError, RpcResult};
pub use futures_utils_wasm::{impl_future, BoxFuture};

pub mod layers;

/// Misc. utilities for building transports.
pub mod utils;

Expand Down

0 comments on commit c9dfc80

Please sign in to comment.