diff --git a/Cargo.lock b/Cargo.lock index 2e3f618ff..150c95fe2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9448,10 +9448,12 @@ name = "webb-relayer-utils" version = "0.1.0" dependencies = [ "ark-std", + "async-trait", "axum", "backoff", "config", "derive_more", + "futures", "glob", "hex", "hyper 0.14.25", @@ -9465,6 +9467,7 @@ dependencies = [ "serde_path_to_error", "sled", "thiserror", + "tokio 1.28.1", "url", "webb 0.5.24", "webb-proposals", diff --git a/README.md b/README.md index 6baf10e7b..ad00db7a5 100644 --- a/README.md +++ b/README.md @@ -178,7 +178,7 @@ api-key = "$POLYGONSCAN_MAINNET_API_KEY" | Field | Description | Optionality | | --------------- | ---------------------------------------------------------------------------------------------------------------------------------- | ---------------------- | -| `http-endpoint` | Http(s) Endpoint for quick Req/Res | Required | +| `http-endpoint` | Http(s) Endpoint for quick Req/Res. Input can be single http-endpoint or array of multiple http-endpoints. | Required | | `ws-endpoint` | Websocket Endpoint for long living connections | Required | | `name` | The Chain/Node name | Required | | `explorer` | Block explorer, used for generating clickable links for transactions that happens on this chain. | Optional | diff --git a/config/README.md b/config/README.md index f671a399f..cb4510b74 100644 --- a/config/README.md +++ b/config/README.md @@ -320,18 +320,24 @@ chain-id = 1 #### http-endpoint -The HTTP(s) RPC endpoint for this chain, used for watching events, and sending transactions. +The HTTP(s) RPC endpoint for this chain, used for watching events, and sending transactions. Input can be single http-endpoint or array of multiple http-endpoints. -- Type: `string` +- Type: `string | string[]` - Required: `true` - env: `WEBB_EVM__HTTP_ENDPOINT` Example: - +- Single Endpoint ```toml http-endpoint = "https://mainnet.infura.io/v3/" ``` +- Multiple Endpoints +```toml +http-endpoint = ["https://mainnet.infura.io/v3/","https://rpc.testnet.network"] +``` + + #### ws-endpoint The WebSocket RPC endpoint for this chain, used for watching events, and sending transactions. diff --git a/config/block-header-relay/eth2-networks/ethereum-mainnet.toml b/config/block-header-relay/eth2-networks/ethereum-mainnet.toml index 88f33110d..c6a9abd0a 100644 --- a/config/block-header-relay/eth2-networks/ethereum-mainnet.toml +++ b/config/block-header-relay/eth2-networks/ethereum-mainnet.toml @@ -2,7 +2,7 @@ enabled = false # The name that the chain is indexed on, for linkable anchors name = "ethereum-mainnet" -# Http(s) Endpoint for quick Req/Res +# Http(s) Endpoint for quick Req/Res. Input can be single http-endpoint or array of multiple http-endpoints. http-endpoint = "$ETHEREUM_MAINNET_HTTP_URL" # Websocket Endpoint for long living connections ws-endpoint = "$ETHEREUM_MAINNET_WS_URL" diff --git a/config/block-header-relay/eth2-networks/sepolia.toml b/config/block-header-relay/eth2-networks/sepolia.toml index 56b5d8322..0f684ae48 100644 --- a/config/block-header-relay/eth2-networks/sepolia.toml +++ b/config/block-header-relay/eth2-networks/sepolia.toml @@ -2,7 +2,7 @@ enabled = true # The name that the chain is indexed on, for linkable anchors name = "sepolia-testnet" -# Http(s) Endpoint for quick Req/Res +# Http(s) Endpoint for quick Req/Res. Input can be single http-endpoint or array of multiple http-endpoints. http-endpoint = "$SEPOLIA_HTTP_URL" # Websocket Endpoint for long living connections ws-endpoint = "$SEPOLIA_WS_URL" diff --git a/config/development/evm-blanknet/athena.toml b/config/development/evm-blanknet/athena.toml index b47443198..abe9d6679 100644 --- a/config/development/evm-blanknet/athena.toml +++ b/config/development/evm-blanknet/athena.toml @@ -2,7 +2,7 @@ [evm.athena] # The name that the chain is indexed on, for linkable anchors name = "athena" -# Http(s) Endpoint for quick Req/Res +# Http(s) Endpoint for quick Req/Res. Input can be single http-endpoint or array of multiple http-endpoints. http-endpoint = "http://localhost:5002" # Websocket Endpoint for long living connections ws-endpoint = "ws://localhost:5002" diff --git a/config/development/evm-blanknet/demeter.toml b/config/development/evm-blanknet/demeter.toml index 683a09116..1209e397f 100644 --- a/config/development/evm-blanknet/demeter.toml +++ b/config/development/evm-blanknet/demeter.toml @@ -2,7 +2,7 @@ [evm.demeter] # The name that the chain is indexed on, for linkable anchors name = "demeter" -# Http(s) Endpoint for quick Req/Res +# Http(s) Endpoint for quick Req/Res. Input can be single http-endpoint or array of multiple http-endpoints. http-endpoint = "http://localhost:5003" # Websocket Endpoint for long living connections ws-endpoint = "ws://localhost:5003" diff --git a/config/development/evm-blanknet/hermes.toml b/config/development/evm-blanknet/hermes.toml index a8836c198..d23843ad9 100644 --- a/config/development/evm-blanknet/hermes.toml +++ b/config/development/evm-blanknet/hermes.toml @@ -2,7 +2,7 @@ [evm.hermes] # The name that the chain is indexed on, for linkable anchors name = "hermes" -# Http(s) Endpoint for quick Req/Res +# Http(s) Endpoint for quick Req/Res. Input can be single http-endpoint or array of multiple http-endpoints. http-endpoint = "http://localhost:5001" # Websocket Endpoint for long living connections ws-endpoint = "ws://localhost:5001" diff --git a/config/example/config.toml b/config/example/config.toml index d8d089cd0..a2c38659d 100644 --- a/config/example/config.toml +++ b/config/example/config.toml @@ -27,7 +27,7 @@ private-tx-relay = true # The following block defines an EVM network (in this case, Goerli) that the relayer will connect to. [evm.goerli] name = "goerli" -# Http(s) Endpoint for quick Req/Res +# Http(s) Endpoint for quick Req/Res. Input can be single http-endpoint or array of multiple http-endpoints. # env: WEBB_EVM_GOERLI_HTTP_ENDPOINT http-endpoint = "https://rpc.ankr.com/eth_goerli" # Websocket Endpoint for long living connections diff --git a/config/exclusive-strategies/data-querying/goerli.toml b/config/exclusive-strategies/data-querying/goerli.toml index bfd4500e8..ab1f3a468 100644 --- a/config/exclusive-strategies/data-querying/goerli.toml +++ b/config/exclusive-strategies/data-querying/goerli.toml @@ -1,6 +1,6 @@ [evm.goerli] name = "goerli" -# Http(s) Endpoint for quick Req/Res +# Http(s) Endpoint for quick Req/Res. Input can be single http-endpoint or array of multiple http-endpoints. http-endpoint = "$GOERLI_HTTPS_URL" # Websocket Endpoint for long living connections ws-endpoint = "$GOERLI_WSS_URL" diff --git a/config/exclusive-strategies/data-querying/sepolia.toml b/config/exclusive-strategies/data-querying/sepolia.toml index 9168864d9..16cf15fb8 100644 --- a/config/exclusive-strategies/data-querying/sepolia.toml +++ b/config/exclusive-strategies/data-querying/sepolia.toml @@ -1,6 +1,6 @@ [evm.sepolia] name = "sepolia" -# Http(s) Endpoint for quick Req/Res +# Http(s) Endpoint for quick Req/Res. Input can be single http-endpoint or array of multiple http-endpoints. http-endpoint = "$SEPOLIA_HTTPS_URL" # Websocket Endpoint for long living connections ws-endpoint = "$SEPOLIA_WSS_URL" diff --git a/config/exclusive-strategies/private-tx-relaying/goerli.toml b/config/exclusive-strategies/private-tx-relaying/goerli.toml index 3df2e0fa2..0d6a49429 100644 --- a/config/exclusive-strategies/private-tx-relaying/goerli.toml +++ b/config/exclusive-strategies/private-tx-relaying/goerli.toml @@ -1,7 +1,7 @@ # Block which represents properties for a network [evm.goerli] name = "goerli" -# Http(s) Endpoint for quick Req/Res +# Http(s) Endpoint for quick Req/Res. Input can be single http-endpoint or array of multiple http-endpoints. http-endpoint = "$GOERLI_HTTPS_URL" # Websocket Endpoint for long living connections ws-endpoint = "$GOERLI_WSS_URL" diff --git a/config/exclusive-strategies/private-tx-relaying/sepolia.toml b/config/exclusive-strategies/private-tx-relaying/sepolia.toml index 83c2d2dde..093b91245 100644 --- a/config/exclusive-strategies/private-tx-relaying/sepolia.toml +++ b/config/exclusive-strategies/private-tx-relaying/sepolia.toml @@ -1,7 +1,7 @@ # Block which represents properties for a network [evm.sepolia] name = "sepolia" -# Http(s) Endpoint for quick Req/Res +# Http(s) Endpoint for quick Req/Res. Input can be single http-endpoint or array of multiple http-endpoints. http-endpoint = "$SEPOLIA_HTTPS_URL" # Websocket Endpoint for long living connections ws-endpoint = "$SEPOLIA_WSS_URL" diff --git a/config/exclusive-strategies/signature-relaying/goerli.toml b/config/exclusive-strategies/signature-relaying/goerli.toml index 18f151d04..bc5d29cf1 100644 --- a/config/exclusive-strategies/signature-relaying/goerli.toml +++ b/config/exclusive-strategies/signature-relaying/goerli.toml @@ -1,7 +1,7 @@ # Block which represents properties for a network [evm.goerli] name = "goerli" -# Http(s) Endpoint for quick Req/Res +# Http(s) Endpoint for quick Req/Res. Input can be single http-endpoint or array of multiple http-endpoints. http-endpoint = "$GOERLI_HTTPS_URL" # Websocket Endpoint for long living connections ws-endpoint = "$GOERLI_WSS_URL" diff --git a/config/exclusive-strategies/signature-relaying/optimism_test.toml b/config/exclusive-strategies/signature-relaying/optimism_test.toml index 851ad7929..d572be2d3 100644 --- a/config/exclusive-strategies/signature-relaying/optimism_test.toml +++ b/config/exclusive-strategies/signature-relaying/optimism_test.toml @@ -1,7 +1,7 @@ # Block which represents properties for a network [evm.optimismtestnet] name = "optimismtestnet" -# Http(s) Endpoint for quick Req/Res +# Http(s) Endpoint for quick Req/Res. Input can be single http-endpoint or array of multiple http-endpoints. http-endpoint = "$OPTIMISM_TESTNET_HTTPS_URL" # Websocket Endpoint for long living connections ws-endpoint = "$OPTIMISM_TESTNET_WSS_URL" diff --git a/config/exclusive-strategies/signature-relaying/sepolia.toml b/config/exclusive-strategies/signature-relaying/sepolia.toml index 2f4e2e92d..d72f316b7 100644 --- a/config/exclusive-strategies/signature-relaying/sepolia.toml +++ b/config/exclusive-strategies/signature-relaying/sepolia.toml @@ -1,7 +1,7 @@ # Block which represents properties for a network [evm.sepolia] name = "sepolia" -# Http(s) Endpoint for quick Req/Res +# Http(s) Endpoint for quick Req/Res. Input can be single http-endpoint or array of multiple http-endpoints. http-endpoint = "$SEPOLIA_HTTPS_URL" # Websocket Endpoint for long living connections ws-endpoint = "$SEPOLIA_WSS_URL" diff --git a/crates/event-watcher-traits/src/evm/event_watcher.rs b/crates/event-watcher-traits/src/evm/event_watcher.rs index a93018e09..51079c03d 100644 --- a/crates/event-watcher-traits/src/evm/event_watcher.rs +++ b/crates/event-watcher-traits/src/evm/event_watcher.rs @@ -14,17 +14,22 @@ use tokio::sync::Mutex; use webb::evm::ethers::prelude::TimeLag; -use webb_relayer_utils::retry; +use webb_relayer_utils::{multi_provider::MultiProvider, retry}; use super::*; /// Ethereum client using Ethers, that includes a retry strategy. pub type EthersClient = - providers::Provider>; + providers::Provider>>; /// Ethereum TimeLag client using Ethers, that includes a retry strategy. -pub type EthersTimeLagClient = - TimeLag>>>; +pub type EthersTimeLagClient = TimeLag< + Arc< + providers::Provider< + providers::RetryClient>, + >, + >, +>; /// A watchable contract is a contract used in the [EventWatcher] pub trait WatchableContract: Send + Sync { diff --git a/crates/relayer-config/src/evm/mod.rs b/crates/relayer-config/src/evm/mod.rs index bf374cd07..7afa03763 100644 --- a/crates/relayer-config/src/evm/mod.rs +++ b/crates/relayer-config/src/evm/mod.rs @@ -1,4 +1,7 @@ +use core::fmt; + use ethereum_types::Address; +use url::Url; use webb_relayer_types::{private_key::PrivateKey, rpc_url::RpcUrl}; use crate::{ @@ -20,7 +23,7 @@ pub struct EvmChainConfig { pub enabled: bool, /// Http(s) Endpoint for quick Req/Res #[serde(skip_serializing)] - pub http_endpoint: RpcUrl, + pub http_endpoint: HttpEndpoint, /// Websocket Endpoint for long living connections #[serde(skip_serializing)] pub ws_endpoint: RpcUrl, @@ -73,6 +76,35 @@ pub struct EvmChainConfig { pub block_poller: Option, } +/// configuration for adding http endpoints. +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(untagged)] +pub enum HttpEndpoint { + /// Single http endpoint + Single(RpcUrl), + /// Multiple http endpoints + Multiple(Vec), +} + +impl fmt::Display for HttpEndpoint { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + HttpEndpoint::Single(url) => write!(f, "{}", url), + HttpEndpoint::Multiple(urls) => { + let urls: Vec = + urls.iter().map(ToString::to_string).collect(); + write!(f, "{}", urls.join(", ")) + } + } + } +} + +impl From for HttpEndpoint { + fn from(url: Url) -> Self { + HttpEndpoint::Single(url.into()) + } +} + /// Linked anchor config for Evm based target system #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all(serialize = "camelCase", deserialize = "kebab-case"))] diff --git a/crates/relayer-context/src/ethers_retry_policy.rs b/crates/relayer-context/src/ethers_retry_policy.rs index 60706ea44..c97d6d086 100644 --- a/crates/relayer-context/src/ethers_retry_policy.rs +++ b/crates/relayer-context/src/ethers_retry_policy.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use webb::evm::ethers::providers::HttpClientError; +use webb::evm::ethers::providers::ProviderError; use webb::evm::ethers::providers::{JsonRpcError, RetryPolicy}; /// Implements [RetryPolicy] that will retry requests that errored with @@ -56,63 +56,103 @@ fn should_retry_json_rpc_error(err: &JsonRpcError) -> bool { } } -impl RetryPolicy for WebbHttpRetryPolicy { - fn should_retry(&self, error: &HttpClientError) -> bool { +// check json rpc error in serde error +fn should_retry_json_rpc_error_from_serde( + err: &serde_json::Error, + err_regex: regex::Regex, +) -> bool { + // some providers send invalid JSON RPC in the error case (no `id:u64`), but the + // text should be a `JsonRpcError` + #[derive(serde::Deserialize)] + struct Resp { + error: JsonRpcError, + } + + if let Ok(resp) = serde_json::from_str::(&err.to_string()) { + return should_retry_json_rpc_error(&resp.error); + } + + let err_text = err.to_string().to_lowercase(); + + // last resort, some providers send the error message in the text + // and the text itself is not a valid json response either. + // check if we have the word "rate", or "limit" in the error message + // and if so, we should retry + + let should_retry = err_regex.is_match(&err_text) + || matches!(err_text.as_str(), "expected value at line 1 column 1"); + + tracing::event!( + target: webb_relayer_utils::probe::TARGET, + tracing::Level::DEBUG, + kind = %webb_relayer_utils::probe::Kind::Retry, + should_retry = should_retry, + error = %err_text, + ); + should_retry +} + +impl RetryPolicy for WebbHttpRetryPolicy { + fn should_retry(&self, error: &ProviderError) -> bool { + tracing::debug!("should_retry: {:?}", error); match error { - HttpClientError::ReqwestError(err) => { + ProviderError::HTTPError(err) => { err.status() == Some(http::StatusCode::TOO_MANY_REQUESTS) } - HttpClientError::JsonRpcError(err) => { - should_retry_json_rpc_error(err) - } - HttpClientError::SerdeJson { text, .. } => { - // some providers send invalid JSON RPC in the error case (no `id:u64`), but the - // text should be a `JsonRpcError` - #[derive(serde::Deserialize)] - struct Resp { - error: JsonRpcError, - } + ProviderError::JsonRpcClientError(err) => { + if let Some(e) = err.as_error_response() { + return should_retry_json_rpc_error(e); + }; - if let Ok(resp) = serde_json::from_str::(text) { - return should_retry_json_rpc_error(&resp.error); + tracing::debug!("Error source: {:?}", err.source()); + if let Some(e) = err.as_serde_error() { + // let new_err = SerdeJson::from(err).into(); + tracing::debug!("Serede error: {:?}", e); + return should_retry_json_rpc_error_from_serde( + e, + self.err_regex.clone(), + ); } - let err_text = text.to_lowercase(); - // last resort, some providers send the error message in the text - // and the text itself is not a valid json response either. - // check if we have the word "rate", or "limit" in the error message - // and if so, we should retry - let should_retry = self.err_regex.is_match(&err_text); - tracing::event!( - target: webb_relayer_utils::probe::TARGET, - tracing::Level::DEBUG, - kind = %webb_relayer_utils::probe::Kind::Retry, - should_retry = should_retry, - error = %err_text, - ); - should_retry + false + } + ProviderError::EnsError(_) => true, + ProviderError::EnsNotOwned(_) => true, + ProviderError::HexError(_) => false, + ProviderError::CustomError(_) => false, + ProviderError::UnsupportedRPC => false, + ProviderError::UnsupportedNodeClient => false, + ProviderError::SignerUnavailable => false, + + ProviderError::SerdeJson(err) => { + should_retry_json_rpc_error_from_serde( + err, + self.err_regex.clone(), + ) } } } - fn backoff_hint(&self, error: &HttpClientError) -> Option { + fn backoff_hint(&self, error: &ProviderError) -> Option { const DEFAULT_BACKOFF: Duration = Duration::from_secs(5); - if let HttpClientError::JsonRpcError(JsonRpcError { data, .. }) = error - { - let data = data.as_ref()?; - - // if daily rate limit exceeded, infura returns the requested backoff in the error - // response - let Some(backoff_seconds) = data.get("rate").and_then(|v| v.get("backoff_seconds")) else { - return Some(DEFAULT_BACKOFF); - }; - // infura rate limit error - if let Some(seconds) = backoff_seconds.as_u64() { - return Some(Duration::from_secs(seconds)); - } - if let Some(seconds) = backoff_seconds.as_f64() { - return Some(Duration::from_secs(seconds as u64 + 1)); + if let ProviderError::JsonRpcClientError(err) = error { + let json_rpc_error = err.as_error_response(); + if let Some(json_rpc_error) = json_rpc_error { + if let Some(data) = &json_rpc_error.data { + // if daily rate limit exceeded, infura returns the requested backoff in the error + // response + let Some(backoff_seconds) = data.get("rate").and_then(|v| v.get("backoff_seconds")) else { + return Some(DEFAULT_BACKOFF); + }; + // infura rate limit error + if let Some(seconds) = backoff_seconds.as_u64() { + return Some(Duration::from_secs(seconds)); + } + if let Some(seconds) = backoff_seconds.as_f64() { + return Some(Duration::from_secs(seconds as u64 + 1)); + } + } } } diff --git a/crates/relayer-context/src/lib.rs b/crates/relayer-context/src/lib.rs index 46fb5b0f8..8eee6b4e1 100644 --- a/crates/relayer-context/src/lib.rs +++ b/crates/relayer-context/src/lib.rs @@ -18,7 +18,6 @@ //! A module for managing the context of the relayer. use std::time::Duration; use std::{collections::HashMap, sync::Arc}; - use tokio::sync::{broadcast, Mutex}; use webb::evm::ethers; @@ -44,8 +43,9 @@ use webb_relayer_utils::metric::{self, Metrics}; mod ethers_retry_policy; use ethers_retry_policy::WebbHttpRetryPolicy; +use webb_relayer_utils::multi_provider::MultiProvider; -type EthersClient = Provider>; +type EthersClient = Provider>>; /// RelayerContext contains Relayer's configuration and shutdown signal. #[derive(Clone)] @@ -120,16 +120,30 @@ impl RelayerContext { // Create a Map for all EVM Chains let mut evm_providers = HashMap::new(); for (_, chain_config) in config.evm.iter() { - let client = Http::new(chain_config.http_endpoint.clone()); + let mut providers = Vec::new(); + match chain_config.http_endpoint.clone() { + webb_relayer_config::evm::HttpEndpoint::Single(rpc_url) => { + let provider = Http::new(rpc_url); + providers.push(provider); + } + webb_relayer_config::evm::HttpEndpoint::Multiple(rpc_urls) => { + rpc_urls.iter().for_each(|rpc_url| { + let provider = Http::new(rpc_url.clone()); + providers.push(provider); + }); + } + } + + let multi_provider = MultiProvider::new(Arc::new(providers)); // Wrap the provider with a retry client. let retry_client = RetryClientBuilder::default() .timeout_retries(u32::MAX) .rate_limit_retries(u32::MAX) - .build(client, WebbHttpRetryPolicy::boxed()); + .build(multi_provider, WebbHttpRetryPolicy::boxed()); let provider = Arc::new(Provider::new(retry_client)); - evm_providers - .insert(chain_config.chain_id.into(), provider.clone()); + + evm_providers.insert(chain_config.chain_id.into(), provider); } Ok(Self { diff --git a/crates/relayer-utils/Cargo.toml b/crates/relayer-utils/Cargo.toml index 7dfe71ea3..55c1c7505 100644 --- a/crates/relayer-utils/Cargo.toml +++ b/crates/relayer-utils/Cargo.toml @@ -10,7 +10,12 @@ repository = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[dev-dependencies] +tokio = { workspace = true } + [dependencies] +async-trait = { workspace = true } +futures = { workspace = true } thiserror = { workspace = true } hex = { workspace = true } backoff = { workspace = true } diff --git a/crates/relayer-utils/src/lib.rs b/crates/relayer-utils/src/lib.rs index 95d6ed2f5..5dd89bbc7 100644 --- a/crates/relayer-utils/src/lib.rs +++ b/crates/relayer-utils/src/lib.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; +use multi_provider::MultiProvider; use webb::{evm::ethers, substrate::subxt}; use webb_proposals::ResourceId; @@ -23,6 +24,8 @@ pub mod clickable_link; /// Metrics functionality pub mod metric; +/// Multi provider for ethers. +pub mod multi_provider; /// A module used for debugging relayer lifecycle, sync state, or other relayer state. pub mod probe; /// Retry functionality @@ -30,6 +33,9 @@ pub mod retry; /// type-erased StaticTxPayload for Substrate Transaction queue. pub mod static_tx_payload; +type RetryClientProvider = ethers::providers::Provider< + ethers::providers::RetryClient>, +>; /// An enum of all possible errors that could be encountered during the execution of the Webb /// Relayer. #[derive(Debug, thiserror::Error)] @@ -96,34 +102,17 @@ pub enum Error { /// Smart contract error. #[error(transparent)] EthersContractCallWithRetry( - #[from] - ethers::contract::ContractError< - ethers::providers::Provider< - ethers::providers::RetryClient, - >, - >, + #[from] ethers::contract::ContractError, ), /// Smart contract error. #[error(transparent)] EthersContractCallWithRetryCloneable( - #[from] - ethers::contract::ContractError< - Arc< - ethers::providers::Provider< - ethers::providers::RetryClient, - >, - >, - >, + #[from] ethers::contract::ContractError>, ), /// Ethers Timelag provider error. #[error(transparent)] EthersTimelagRetryClientError( - #[from] - ethers::middleware::timelag::TimeLagError< - ethers::providers::Provider< - ethers::providers::RetryClient, - >, - >, + #[from] ethers::middleware::timelag::TimeLagError, ), /// Ethers Timelag provider error. @@ -131,11 +120,7 @@ pub enum Error { EthersTimelagRetryClientClonableError( #[from] ethers::middleware::timelag::TimeLagError< - std::sync::Arc< - ethers::providers::Provider< - ethers::providers::RetryClient, - >, - >, + std::sync::Arc, >, ), @@ -144,11 +129,7 @@ pub enum Error { EthersContractCallWithTimeLagRetryClient( #[from] ethers::contract::ContractError< - ethers::middleware::timelag::TimeLag< - ethers::providers::Provider< - ethers::providers::RetryClient, - >, - >, + ethers::middleware::timelag::TimeLag, >, ), @@ -157,13 +138,7 @@ pub enum Error { EthersContractCallWithTimeLagRetryClientCloneable( #[from] ethers::contract::ContractError< - ethers::middleware::timelag::TimeLag< - Arc< - ethers::providers::Provider< - ethers::providers::RetryClient, - >, - >, - >, + ethers::middleware::timelag::TimeLag>, >, ), @@ -261,6 +236,9 @@ pub enum Error { /// are missing. #[error("Missing Substrate Static Transaction Validation Details")] MissingValidationDetails, + /// Provider not found error. + #[error("Provider not found for index {0}")] + ProviderNotFound(usize), } /// A type alias for the result for webb relayer, that uses the `Error` enum. diff --git a/crates/relayer-utils/src/multi_provider.rs b/crates/relayer-utils/src/multi_provider.rs new file mode 100644 index 000000000..f5119125b --- /dev/null +++ b/crates/relayer-utils/src/multi_provider.rs @@ -0,0 +1,83 @@ +use crate::Error as WebbRelayerError; +use core::fmt::Debug; +use futures::prelude::*; +use serde::{de::DeserializeOwned, Serialize}; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; +use webb::evm::ethers::providers::{JsonRpcClient, ProviderError}; +/// MultiProvider is a JsonRpcClient that will round-robin requests to the underlying providers. +#[derive(Debug, Clone)] +pub struct MultiProvider

{ + providers: Arc>, + last_used: Arc, +} + +impl

MultiProvider

{ + pub fn new(providers: Arc>) -> Self { + Self { + providers, + last_used: Default::default(), + } + } +} + +#[async_trait::async_trait] +impl JsonRpcClient for MultiProvider

+where + P::Error: Into, +{ + type Error = ProviderError; + + async fn request< + T: Debug + Serialize + Send + Sync, + R: DeserializeOwned + Send, + >( + &self, + method: &str, + params: T, + ) -> Result { + // Fetch the next provider index to use + // incrementing it by 1 and wrapping around if it exceeds the number of providers + let next_provider_idx = self + .last_used + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |last_used| { + Some(last_used.saturating_add(1) % self.providers.len()) + }) + .unwrap_or_default(); + + if let Some(provider) = self.providers.get(next_provider_idx) { + provider + .request(method, params) + .map_err(P::Error::into) + .await + } else { + Err(ProviderError::CustomError( + WebbRelayerError::ProviderNotFound(next_provider_idx) + .to_string(), + )) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::str::FromStr; + use webb::evm::ethers::providers::{self, Http, Middleware}; + + #[tokio::test] + async fn should_process_request() { + let p1 = Http::from_str("https://eth.llamarpc.com").unwrap(); + let p2 = Http::from_str("https://1rpc.io/eth").unwrap(); + + let multi_provider = MultiProvider::new(vec![p1, p2].into()); + assert_eq!(multi_provider.providers.len(), 2); + assert_eq!(multi_provider.last_used.load(Ordering::SeqCst), 0); + let provider = providers::Provider::new(multi_provider.clone()); + provider.get_block_number().await.expect("should work"); + assert_eq!(multi_provider.last_used.load(Ordering::SeqCst), 1); + provider.get_block_number().await.expect("should work"); + } +} diff --git a/services/block-poller/src/block_poller.rs b/services/block-poller/src/block_poller.rs index 369619898..cacacb07c 100644 --- a/services/block-poller/src/block_poller.rs +++ b/services/block-poller/src/block_poller.rs @@ -2,10 +2,11 @@ use futures::prelude::*; use std::cmp; use std::sync::Arc; use std::time::Duration; +use webb_relayer::service::evm::Client; use webb_relayer_config::block_poller::BlockPollerConfig; use webb::evm::ethers::{ - providers::{self, Middleware}, + providers::Middleware, types::{Block, TxHash}, }; @@ -90,9 +91,7 @@ pub trait BlockPoller { )] async fn run( &self, - client: Arc< - providers::Provider>, - >, + client: Arc, store: Arc, listener_config: BlockPollerConfig, handlers: Vec>, diff --git a/tests/lib/localTestnet.ts b/tests/lib/localTestnet.ts index 0b11de538..da9929efa 100644 --- a/tests/lib/localTestnet.ts +++ b/tests/lib/localTestnet.ts @@ -60,6 +60,7 @@ export type ExportedConfigOptions = { blockConfirmations?: number; privateKey?: string; smartAnchorUpdates?: SmartAnchorUpdatesConfig; + httpEndpoints?: string[]; }; // Default Events watcher for the contracts. @@ -459,7 +460,7 @@ export class LocalChain { const chainInfo: FullChainInfo = { name: this.underlyingChainId.toString(), enabled: true, - httpEndpoint: this.endpoint, + httpEndpoint: opts.httpEndpoints ?? [this.endpoint], wsEndpoint: this.endpoint.replace('http', 'ws'), blockConfirmations: opts.blockConfirmations ?? 0, chainId: this.underlyingChainId, @@ -476,7 +477,7 @@ export class LocalChain { const chainInfo: FullChainInfo = { name: this.underlyingChainId.toString(), enabled: true, - httpEndpoint: this.endpoint, + httpEndpoint: opts.httpEndpoints ?? [this.endpoint], wsEndpoint: this.endpoint.replace('http', 'ws'), blockConfirmations: opts.blockConfirmations ?? 1, chainId: this.underlyingChainId, @@ -543,15 +544,15 @@ export class LocalChain { 'proposal-signing-backend': contract.proposalSigningBackend?.type === 'Mocked' ? { - type: 'Mocked', - 'private-key': contract.proposalSigningBackend?.privateKey, - } + type: 'Mocked', + 'private-key': contract.proposalSigningBackend?.privateKey, + } : contract.proposalSigningBackend?.type === 'DKGNode' - ? { + ? { type: 'DKGNode', 'chain-id': contract.proposalSigningBackend?.chainId, } - : undefined, + : undefined, 'events-watcher': { enabled: contract.eventsWatcher.enabled, 'polling-interval': contract.eventsWatcher.pollingInterval, @@ -570,18 +571,18 @@ export class LocalChain { (anchor: LinkedAnchor) => anchor.type === 'Evm' ? { - 'chain-id': anchor.chainId, - type: 'Evm', - address: anchor.address, - } + 'chain-id': anchor.chainId, + type: 'Evm', + address: anchor.address, + } : anchor.type === 'Substrate' - ? { + ? { type: 'Substrate', 'chain-id': anchor.chainId, 'tree-id': anchor.treeId, pallet: anchor.pallet, } - : { + : { type: 'Raw', 'resource-id': anchor.resourceId, } @@ -600,7 +601,7 @@ export class LocalChain { } export type FullChainInfo = ChainInfo & { - httpEndpoint: string; + httpEndpoint: string[]; wsEndpoint: string; privateKey: string; blockConfirmations: number; diff --git a/tests/lib/webbRelayer.ts b/tests/lib/webbRelayer.ts index 5825d98fc..f146f6834 100644 --- a/tests/lib/webbRelayer.ts +++ b/tests/lib/webbRelayer.ts @@ -351,7 +351,6 @@ export class WebbRelayer { return txHashOrReject(ws, cmd); } - public async substrateVAnchorWithdraw( chainId: number, id: number, @@ -749,26 +748,26 @@ type NetworkMessage = { kind: 'network'; } & { network: - | 'connecting' - | 'connected' - | { failed: { reason: string } } - | 'disconnected' - | 'unsupportedContract' - | 'unsupportedChain' - | 'invalidRelayerAddress'; + | 'connecting' + | 'connected' + | { failed: { reason: string } } + | 'disconnected' + | 'unsupportedContract' + | 'unsupportedChain' + | 'invalidRelayerAddress'; }; type WithdrawMessage = { kind: 'withdraw'; } & { withdraw: - | 'sent' - | { submitted: { txHash: string } } - | { finalized: { txHash: string } } - | 'valid' - | 'invalidMerkleRoots' - | 'droppedFromMemPool' - | { errored: { code: number; reason: string } }; + | 'sent' + | { submitted: { txHash: string } } + | { finalized: { txHash: string } } + | 'valid' + | 'invalidMerkleRoots' + | 'droppedFromMemPool' + | { errored: { code: number; reason: string } }; }; type ErrorMessage = { diff --git a/tests/sim/smartAnchorUpdates.sim.ts b/tests/sim/smartAnchorUpdates.sim.ts index f2a59d821..aba45d732 100644 --- a/tests/sim/smartAnchorUpdates.sim.ts +++ b/tests/sim/smartAnchorUpdates.sim.ts @@ -72,7 +72,7 @@ type SimulationMetrics = { totalWaitTimeToRelayAllRoots: number; }; -describe('Smart Anchor Updates Simulation', function() { +describe('Smart Anchor Updates Simulation', function () { // Disable mocha time-out because these tests take a long time. this.timeout(0); this.slow(Number.MAX_SAFE_INTEGER); diff --git a/tests/test/evm/governorUpdates.test.ts b/tests/test/evm/governorUpdates.test.ts index 7e6515a32..3a0cca497 100644 --- a/tests/test/evm/governorUpdates.test.ts +++ b/tests/test/evm/governorUpdates.test.ts @@ -45,7 +45,7 @@ Chai.use(ChaiAsPromised); // FIXME: This test is currently broken. It needs to be fixed. // The node hangs at 30 blocks and does not proceed further. -describe.skip('SignatureBridge Governor Updates', function() { +describe.skip('SignatureBridge Governor Updates', function () { const tmpDirPath = temp.mkdirSync(); let localChain1: LocalChain; let localChain2: LocalChain; @@ -65,11 +65,11 @@ describe.skip('SignatureBridge Governor Updates', function() { const usageMode: UsageMode = isCi ? { mode: 'docker', forcePullImage: false } : { - mode: 'host', - nodePath: path.resolve( - '../../tangle/target/release/tangle-standalone' - ), - }; + mode: 'host', + nodePath: path.resolve( + '../../tangle/target/release/tangle-standalone' + ), + }; const enabledPallets: Pallet[] = [ { pallet: 'DKGProposalHandler', diff --git a/tests/test/evm/invalidProviders.test.ts b/tests/test/evm/invalidProviders.test.ts index c95959f14..8da5866c0 100644 --- a/tests/test/evm/invalidProviders.test.ts +++ b/tests/test/evm/invalidProviders.test.ts @@ -121,7 +121,6 @@ describe('Invalid EVM Providers', () => { kind: 'retry', event: { should_retry: true, - error: provider.errorMessage.toLowerCase(), }, }); }); diff --git a/tests/test/evm/vanchorTransactionRelayer.test.ts b/tests/test/evm/vanchorTransactionRelayer.test.ts index 153b73d8c..4f5aa473f 100644 --- a/tests/test/evm/vanchorTransactionRelayer.test.ts +++ b/tests/test/evm/vanchorTransactionRelayer.test.ts @@ -128,14 +128,17 @@ describe('Vanchor Transaction relayer', function () { } ); + const dummyEndpoint = 'http://127.0.0.1:1080'; // save the chain configs. await localChain1.writeConfig(`${tmpDirPath}/${localChain1.name}.json`, { signatureVBridge, proposalSigningBackend: { type: 'Mocked', privateKey: PK1 }, + httpEndpoints: [dummyEndpoint, localChain1.endpoint], }); await localChain2.writeConfig(`${tmpDirPath}/${localChain2.name}.json`, { signatureVBridge, proposalSigningBackend: { type: 'Mocked', privateKey: PK2 }, + httpEndpoints: [dummyEndpoint, localChain2.endpoint], }); // get the vanhor on localchain1 diff --git a/tests/test/substrate/governorUpdate.test.ts b/tests/test/substrate/governorUpdate.test.ts index fe7f79368..acf149b03 100644 --- a/tests/test/substrate/governorUpdate.test.ts +++ b/tests/test/substrate/governorUpdate.test.ts @@ -29,7 +29,7 @@ import { u8aToHex } from '@polkadot/util'; import { UsageMode } from '@webb-tools/test-utils'; import { defaultEventsWatcherValue } from '../../lib/utils.js'; -describe('Substrate SignatureBridge Governor Update', function() { +describe('Substrate SignatureBridge Governor Update', function () { const tmpDirPath = temp.mkdirSync(); // Tangle nodes let aliceNode: LocalTangle; @@ -41,11 +41,11 @@ describe('Substrate SignatureBridge Governor Update', function() { const usageMode: UsageMode = isCi ? { mode: 'docker', forcePullImage: false } : { - mode: 'host', - nodePath: path.resolve( - '../../tangle/target/release/tangle-standalone' - ), - }; + mode: 'host', + nodePath: path.resolve( + '../../tangle/target/release/tangle-standalone' + ), + }; const enabledPallets: Pallet[] = [ { pallet: 'SignatureBridge',