Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bundle Multiple EVM Providers #531

Merged
merged 9 commits into from
Jun 7, 2023
8 changes: 7 additions & 1 deletion config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,11 +327,17 @@ The HTTP(s) RPC endpoint for this chain, used for watching events, and sending t
- env: `WEBB_EVM_<CHAIN_NAME>_HTTP_ENDPOINT`

Example:

- Single Endpoint
```toml
http-endpoint = "https://mainnet.infura.io/v3/<project-id>"
```

- Multiple Endpoints
```toml
http-endpoint = ["https://mainnet.infura.io/v3/<project-id>","https://rpc.testnet.network"]
salman01zp marked this conversation as resolved.
Show resolved Hide resolved
```


#### ws-endpoint
drewstone marked this conversation as resolved.
Show resolved Hide resolved

The WebSocket RPC endpoint for this chain, used for watching events, and sending transactions.
Expand Down
16 changes: 11 additions & 5 deletions crates/event-watcher-traits/src/evm/event_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,24 @@
// limitations under the License.

use tokio::sync::Mutex;
use webb::evm::ethers::prelude::TimeLag;
use webb::evm::ethers::{prelude::TimeLag, providers::QuorumProvider};
use webb_relayer_utils::retry;

use super::*;

/// Ethereum client using Ethers, that includes a retry strategy.
pub type EthersClient =
providers::Provider<providers::RetryClient<providers::Http>>;
pub type EthersClient = providers::Provider<
providers::RetryClient<QuorumProvider<providers::Http>>,
>;

/// Ethereum TimeLag client using Ethers, that includes a retry strategy.
pub type EthersTimeLagClient =
TimeLag<Arc<providers::Provider<providers::RetryClient<providers::Http>>>>;
pub type EthersTimeLagClient = TimeLag<
Arc<
providers::Provider<
providers::RetryClient<QuorumProvider<providers::Http>>,
>,
>,
>;

/// A watchable contract is a contract used in the [EventWatcher]
pub trait WatchableContract: Send + Sync {
Expand Down
34 changes: 33 additions & 1 deletion crates/relayer-config/src/evm/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use core::fmt;

use ethereum_types::Address;
use url::Url;
use webb_relayer_types::{private_key::PrivateKey, rpc_url::RpcUrl};

use crate::{
Expand All @@ -20,7 +23,7 @@ pub struct EvmChainConfig {
pub enabled: bool,
/// Http(s) Endpoint for quick Req/Res
#[serde(skip_serializing)]
pub http_endpoint: RpcUrl,
pub http_endpoint: HttpEndpoint,
/// Websocket Endpoint for long living connections
#[serde(skip_serializing)]
pub ws_endpoint: RpcUrl,
Expand Down Expand Up @@ -73,6 +76,35 @@ pub struct EvmChainConfig {
pub block_poller: Option<BlockPollerConfig>,
}

/// configuration for adding http endpoints.
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(untagged)]
pub enum HttpEndpoint {
/// Single http endpoint
Single(RpcUrl),
/// Multiple http endpoints
Multiple(Vec<RpcUrl>),
}

impl fmt::Display for HttpEndpoint {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
HttpEndpoint::Single(url) => write!(f, "{}", url),
HttpEndpoint::Multiple(urls) => {
let urls: Vec<String> =
urls.iter().map(ToString::to_string).collect();
write!(f, "{}", urls.join(", "))
}
}
}
}

impl From<Url> for HttpEndpoint {
fn from(url: Url) -> Self {
HttpEndpoint::Single(url.into())
}
}

/// Linked anchor config for Evm based target system
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all(serialize = "camelCase", deserialize = "kebab-case"))]
Expand Down
64 changes: 39 additions & 25 deletions crates/relayer-context/src/ethers_retry_policy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Duration;

use webb::evm::ethers::providers::HttpClientError;
use webb::evm::ethers::providers::ProviderError;
use webb::evm::ethers::providers::{JsonRpcError, RetryPolicy};

/// Implements [RetryPolicy] that will retry requests that errored with
Expand Down Expand Up @@ -56,28 +56,40 @@ fn should_retry_json_rpc_error(err: &JsonRpcError) -> bool {
}
}

impl RetryPolicy<HttpClientError> for WebbHttpRetryPolicy {
fn should_retry(&self, error: &HttpClientError) -> bool {
impl RetryPolicy<ProviderError> for WebbHttpRetryPolicy {
fn should_retry(&self, error: &ProviderError) -> bool {
match error {
HttpClientError::ReqwestError(err) => {
ProviderError::HTTPError(err) => {
err.status() == Some(http::StatusCode::TOO_MANY_REQUESTS)
}
HttpClientError::JsonRpcError(err) => {
should_retry_json_rpc_error(err)
ProviderError::JsonRpcClientError(err) => {
match err.as_error_response() {
Some(e) => should_retry_json_rpc_error(e),
None => false,
}
}
HttpClientError::SerdeJson { text, .. } => {
ProviderError::EnsError(_) => true,
ProviderError::EnsNotOwned(_) => true,
ProviderError::HexError(_) => false,
ProviderError::CustomError(_) => false,
ProviderError::UnsupportedRPC => false,
ProviderError::UnsupportedNodeClient => false,
ProviderError::SignerUnavailable => false,

ProviderError::SerdeJson(err) => {
// some providers send invalid JSON RPC in the error case (no `id:u64`), but the
// text should be a `JsonRpcError`
#[derive(serde::Deserialize)]
struct Resp {
error: JsonRpcError,
}

if let Ok(resp) = serde_json::from_str::<Resp>(text) {
if let Ok(resp) = serde_json::from_str::<Resp>(&err.to_string())
{
return should_retry_json_rpc_error(&resp.error);
}

let err_text = text.to_lowercase();
let err_text = err.to_string().to_lowercase();
// last resort, some providers send the error message in the text
// and the text itself is not a valid json response either.
// check if we have the word "rate", or "limit" in the error message
Expand All @@ -95,24 +107,26 @@ impl RetryPolicy<HttpClientError> for WebbHttpRetryPolicy {
}
}

fn backoff_hint(&self, error: &HttpClientError) -> Option<Duration> {
fn backoff_hint(&self, error: &ProviderError) -> Option<Duration> {
const DEFAULT_BACKOFF: Duration = Duration::from_secs(5);

if let HttpClientError::JsonRpcError(JsonRpcError { data, .. }) = error
{
let data = data.as_ref()?;

// if daily rate limit exceeded, infura returns the requested backoff in the error
// response
let Some(backoff_seconds) = data.get("rate").and_then(|v| v.get("backoff_seconds")) else {
return Some(DEFAULT_BACKOFF);
};
// infura rate limit error
if let Some(seconds) = backoff_seconds.as_u64() {
return Some(Duration::from_secs(seconds));
}
if let Some(seconds) = backoff_seconds.as_f64() {
return Some(Duration::from_secs(seconds as u64 + 1));
if let ProviderError::JsonRpcClientError(err) = error {
let json_rpc_error = err.as_error_response();
if let Some(json_rpc_error) = json_rpc_error {
if let Some(data) = &json_rpc_error.data {
// if daily rate limit exceeded, infura returns the requested backoff in the error
// response
let Some(backoff_seconds) = data.get("rate").and_then(|v| v.get("backoff_seconds")) else {
return Some(DEFAULT_BACKOFF);
};
// infura rate limit error
if let Some(seconds) = backoff_seconds.as_u64() {
return Some(Duration::from_secs(seconds));
}
if let Some(seconds) = backoff_seconds.as_f64() {
return Some(Duration::from_secs(seconds as u64 + 1));
}
}
}
}

Expand Down
29 changes: 24 additions & 5 deletions crates/relayer-context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use webb_relayer_utils::metric::{self, Metrics};
mod ethers_retry_policy;
use ethers_retry_policy::WebbHttpRetryPolicy;

type EthersClient = Provider<RetryClient<Http>>;
type EthersClient = Provider<RetryClient<QuorumProvider<Http>>>;

/// RelayerContext contains Relayer's configuration and shutdown signal.
#[derive(Clone)]
Expand Down Expand Up @@ -120,16 +120,35 @@ impl RelayerContext {
// Create a Map for all EVM Chains
let mut evm_providers = HashMap::new();
for (_, chain_config) in config.evm.iter() {
let client = Http::new(chain_config.http_endpoint.clone());
let mut providers = Vec::new();
match chain_config.http_endpoint.clone() {
webb_relayer_config::evm::HttpEndpoint::Single(rpc_url) => {
let client = Http::new(rpc_url);
salman01zp marked this conversation as resolved.
Show resolved Hide resolved
let provider = WeightedProvider::new(client);
providers.push(provider);
}
webb_relayer_config::evm::HttpEndpoint::Multiple(rpc_urls) => {
for rpc_url in rpc_urls {
let client = Http::new(rpc_url);
let provider = WeightedProvider::new(client);
providers.push(provider);
}
}
}

let q_provider = QuorumProvider::builder()
.add_providers(providers)
.quorum(Quorum::Weight(1))
.build();
// Wrap the provider with a retry client.
let retry_client = RetryClientBuilder::default()
.timeout_retries(u32::MAX)
.rate_limit_retries(u32::MAX)
.build(client, WebbHttpRetryPolicy::boxed());
.build(q_provider, WebbHttpRetryPolicy::boxed());

let provider = Arc::new(Provider::new(retry_client));
evm_providers
.insert(chain_config.chain_id.into(), provider.clone());

evm_providers.insert(chain_config.chain_id.into(), provider);
}

Ok(Self {
Expand Down
47 changes: 10 additions & 37 deletions crates/relayer-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use webb::evm::ethers::providers::QuorumProvider;
use webb::{evm::ethers, substrate::subxt};
use webb_proposals::ResourceId;

Expand All @@ -30,6 +31,9 @@ pub mod retry;
/// type-erased StaticTxPayload for Substrate Transaction queue.
pub mod static_tx_payload;

type RetryClientProvider = ethers::providers::Provider<
ethers::providers::RetryClient<QuorumProvider<ethers::providers::Http>>,
>;
/// An enum of all possible errors that could be encountered during the execution of the Webb
/// Relayer.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -96,46 +100,25 @@ pub enum Error {
/// Smart contract error.
#[error(transparent)]
EthersContractCallWithRetry(
#[from]
ethers::contract::ContractError<
ethers::providers::Provider<
ethers::providers::RetryClient<ethers::providers::Http>,
>,
>,
#[from] ethers::contract::ContractError<RetryClientProvider>,
),
/// Smart contract error.
#[error(transparent)]
EthersContractCallWithRetryCloneable(
#[from]
ethers::contract::ContractError<
Arc<
ethers::providers::Provider<
ethers::providers::RetryClient<ethers::providers::Http>,
>,
>,
>,
#[from] ethers::contract::ContractError<Arc<RetryClientProvider>>,
),
/// Ethers Timelag provider error.
#[error(transparent)]
EthersTimelagRetryClientError(
#[from]
ethers::middleware::timelag::TimeLagError<
ethers::providers::Provider<
ethers::providers::RetryClient<ethers::providers::Http>,
>,
>,
#[from] ethers::middleware::timelag::TimeLagError<RetryClientProvider>,
),

/// Ethers Timelag provider error.
#[error(transparent)]
EthersTimelagRetryClientClonableError(
#[from]
ethers::middleware::timelag::TimeLagError<
std::sync::Arc<
ethers::providers::Provider<
ethers::providers::RetryClient<ethers::providers::Http>,
>,
>,
std::sync::Arc<RetryClientProvider>,
>,
),

Expand All @@ -144,11 +127,7 @@ pub enum Error {
EthersContractCallWithTimeLagRetryClient(
#[from]
ethers::contract::ContractError<
ethers::middleware::timelag::TimeLag<
ethers::providers::Provider<
ethers::providers::RetryClient<ethers::providers::Http>,
>,
>,
ethers::middleware::timelag::TimeLag<RetryClientProvider>,
>,
),

Expand All @@ -157,13 +136,7 @@ pub enum Error {
EthersContractCallWithTimeLagRetryClientCloneable(
#[from]
ethers::contract::ContractError<
ethers::middleware::timelag::TimeLag<
Arc<
ethers::providers::Provider<
ethers::providers::RetryClient<ethers::providers::Http>,
>,
>,
>,
ethers::middleware::timelag::TimeLag<Arc<RetryClientProvider>>,
>,
),

Expand Down
6 changes: 4 additions & 2 deletions services/block-poller/src/block_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::Duration;
use webb_relayer_config::block_poller::BlockPollerConfig;

use webb::evm::ethers::{
providers::{self, Middleware},
providers::{self, Middleware, QuorumProvider},
types::{Block, TxHash},
};

Expand Down Expand Up @@ -91,7 +91,9 @@ pub trait BlockPoller {
async fn run(
&self,
client: Arc<
providers::Provider<providers::RetryClient<providers::Http>>,
providers::Provider<
providers::RetryClient<QuorumProvider<providers::Http>>,
>,
>,
store: Arc<Self::Store>,
listener_config: BlockPollerConfig,
Expand Down
Loading