diff --git a/Cargo.lock b/Cargo.lock index fba421d7..0fde2f68 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1457,6 +1457,7 @@ dependencies = [ "const-hex", "enr", "ethers-core", + "futures-channel", "futures-core", "futures-timer", "futures-util", @@ -1478,6 +1479,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", + "winapi", "ws_stream_wasm", ] diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 8bb9e7ee..fa4086d5 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -15,7 +15,7 @@ async-openai = "0.10.0" clap = { version = "3.1.18", features = ["derive"] } colored = "2" crossbeam-channel = "0.5.7" -ethers = "2.0.4" +ethers = { version = "2.0.4", features = [ "ipc", "ws", ] } fancy-regex = "0.11.0" heimdall-cache = { workspace = true } indicatif = "0.17.0" diff --git a/crates/common/src/ether/http_or_ws_or_ipc.rs b/crates/common/src/ether/http_or_ws_or_ipc.rs new file mode 100644 index 00000000..211bc613 --- /dev/null +++ b/crates/common/src/ether/http_or_ws_or_ipc.rs @@ -0,0 +1,169 @@ +//! Create a custom data transport to use with a Provider. +use async_trait::async_trait; +use ethers::prelude::*; +use serde::{de::DeserializeOwned, Serialize}; +use std::{fmt::Debug, str::FromStr}; +use thiserror::Error; + +/// First we must create an error type, and implement [`From`] for +/// [`ProviderError`]. +/// +/// Here we are using [`thiserror`](https://docs.rs/thiserror) to wrap +/// [`HttpClientError`], [`WsClientError`] and [`IpcError`]. +/// +/// This also provides a conversion implementation ([`From`]) for both, so we +/// can use the [question mark operator](https://doc.rust-lang.org/rust-by-example/std/result/question_mark.html) +/// later on in our implementations. +#[derive(Debug, Error)] +pub enum HttpOrWsOrIpcError { + #[error(transparent)] + Ws(#[from] WsClientError), + + #[error(transparent)] + Ipc(#[from] IpcError), + + #[error(transparent)] + Http(#[from] HttpClientError), +} + +/// In order to use our `HttpOrWsOrIpcError` in the RPC client, we have to implement +/// this trait. +/// +/// [`RpcError`] helps other parts off the stack get access to common provider +/// error cases. For example, any RPC connection may have a `serde_json` error, +/// so we want to make those easily accessible, so we implement +/// `as_serde_error()` +/// +/// In addition, RPC requests may return JSON errors from the node, describing +/// why the request failed. In order to make these accessible, we implement +/// `as_error_response()`. +impl RpcError for HttpOrWsOrIpcError { + fn as_error_response(&self) -> Option<ðers::providers::JsonRpcError> { + match self { + HttpOrWsOrIpcError::Ws(e) => e.as_error_response(), + HttpOrWsOrIpcError::Ipc(e) => e.as_error_response(), + HttpOrWsOrIpcError::Http(e) => e.as_error_response(), + } + } + + fn as_serde_error(&self) -> Option<&serde_json::Error> { + match self { + HttpOrWsOrIpcError::Ws(WsClientError::JsonError(e)) => Some(e), + HttpOrWsOrIpcError::Ipc(IpcError::JsonError(e)) => Some(e), + HttpOrWsOrIpcError::Http(HttpClientError::SerdeJson { err: e, text: _ }) => Some(e), + _ => None, + } + } +} + +/// This implementation helps us convert our Error to the library's +/// [`ProviderError`] so that we can use the `?` operator +impl From for ProviderError { + fn from(value: HttpOrWsOrIpcError) -> Self { + Self::JsonRpcClientError(Box::new(value)) + } +} + +/// Next, we create our transport type, which in this case will be an enum that contains +/// either [`Http`], [`Ws`] or [`Ipc`]. +#[derive(Clone, Debug)] +pub enum HttpOrWsOrIpc { + Ws(Ws), + Ipc(Ipc), + Http(Http), +} + +// We implement a convenience "constructor" method, to easily initialize the transport. +// This will connect to [`Http`] if the rpc_url contains 'http', to [`Ws`] if it contains 'ws', +// otherwise it'll default to [`Ipc`]. +impl HttpOrWsOrIpc { + pub async fn connect(rpc_url: &str) -> Result { + let this = if rpc_url.to_lowercase().contains("http") { + Self::Http(Http::from_str(rpc_url).unwrap()) + } else if rpc_url.to_lowercase().contains("ws") { + Self::Ws(Ws::connect(rpc_url).await?) + } else { + Self::Ipc(Ipc::connect(rpc_url).await?) + }; + Ok(this) + } +} + +// Next, the most important step: implement [`JsonRpcClient`]. +// +// For this implementation, we simply delegate to the wrapped transport and return the +// result. +// +// Note that we are using [`async-trait`](https://docs.rs/async-trait) for asynchronous +// functions in traits, as this is not yet supported in stable Rust; see: +// +#[async_trait] +impl JsonRpcClient for HttpOrWsOrIpc { + type Error = HttpOrWsOrIpcError; + + async fn request(&self, method: &str, params: T) -> Result + where + T: Debug + Serialize + Send + Sync, + R: DeserializeOwned + Send, { + let res = match self { + Self::Ws(ws) => JsonRpcClient::request(ws, method, params).await?, + Self::Ipc(ipc) => JsonRpcClient::request(ipc, method, params).await?, + Self::Http(http) => JsonRpcClient::request(http, method, params).await?, + }; + Ok(res) + } +} + +// We can also implement [`PubsubClient`], since both `Ws` and `Ipc` implement it, by +// doing the same as in the `JsonRpcClient` implementation above. +// Trying to subscribe on a `Http` will panic. +impl PubsubClient for HttpOrWsOrIpc { + // Since both `Ws` and `Ipc`'s `NotificationStream` associated type is the same, + // we can simply return one of them. + // In case they differed, we would have to create a `HttpOrWsOrIpcNotificationStream`, + // similar to the error type. + type NotificationStream = ::NotificationStream; + + fn subscribe>(&self, id: T) -> Result { + let stream = match self { + Self::Ws(ws) => PubsubClient::subscribe(ws, id)?, + Self::Ipc(ipc) => PubsubClient::subscribe(ipc, id)?, + HttpOrWsOrIpc::Http(_) => unreachable!("Http RPC cannot be used for subscriptions!"), + }; + Ok(stream) + } + + fn unsubscribe>(&self, id: T) -> Result<(), Self::Error> { + match self { + Self::Ws(ws) => PubsubClient::unsubscribe(ws, id)?, + Self::Ipc(ipc) => PubsubClient::unsubscribe(ipc, id)?, + HttpOrWsOrIpc::Http(_) => unreachable!("Http RPC cannot be used for subscriptions!"), + }; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::ether::http_or_ws_or_ipc::HttpOrWsOrIpc; + use ethers::prelude::*; + + #[tokio::test] + async fn test_subscription() { + // Connect to transport, using Arbitrum as it has the fastest block time of 0.25s + let transport = + HttpOrWsOrIpc::connect("wss://arbitrum-one-rpc.publicnode.com").await.unwrap(); + + // Wrap the transport in a provider + let provider = Provider::new(transport); + + // Now we can use our custom transport provider like normal + let block_number = provider.get_block_number().await.unwrap(); + println!("Current block: {block_number}"); + + let mut subscription = provider.subscribe_blocks().await.unwrap().take(2); + while let Some(block) = subscription.next().await { + println!("New block: {:?}", block.number); + } + } +} diff --git a/crates/common/src/ether/lexers/cleanup.rs b/crates/common/src/ether/lexers/cleanup.rs index f4d9c2b1..249851f7 100644 --- a/crates/common/src/ether/lexers/cleanup.rs +++ b/crates/common/src/ether/lexers/cleanup.rs @@ -244,10 +244,7 @@ pub fn simplify_parentheses(line: &str, paren_index: usize) -> Result x, - None => "", - }, + logical_expression.get(2..logical_expression.len() - 2).unwrap_or_default(), ); // remove double negation, if one was created diff --git a/crates/common/src/ether/mod.rs b/crates/common/src/ether/mod.rs index 8702fc7b..ec38c825 100644 --- a/crates/common/src/ether/mod.rs +++ b/crates/common/src/ether/mod.rs @@ -2,6 +2,7 @@ pub mod bytecode; pub mod calldata; pub mod compiler; pub mod evm; +pub mod http_or_ws_or_ipc; pub mod lexers; pub mod rpc; pub mod selectors; diff --git a/crates/common/src/ether/rpc.rs b/crates/common/src/ether/rpc.rs index 94b31af2..12dd3101 100644 --- a/crates/common/src/ether/rpc.rs +++ b/crates/common/src/ether/rpc.rs @@ -1,8 +1,11 @@ -use crate::error::Error; +use crate::{ + error::Error, + ether::http_or_ws_or_ipc::{self, HttpOrWsOrIpc}, +}; use backoff::ExponentialBackoff; use ethers::{ core::types::Address, - providers::{Http, Middleware, Provider}, + providers::{Middleware, Provider}, types::{ BlockNumber::{self}, BlockTrace, Filter, FilterBlockOption, TraceType, Transaction, H256, @@ -12,13 +15,31 @@ use heimdall_cache::{read_cache, store_cache}; use std::{str::FromStr, time::Duration}; use tracing::{debug, error, trace}; +/// Get the Provider object for RPC URL +/// +/// ```no_run +/// use heimdall_common::ether::rpc::get_provider; +/// +/// // let provider = get_provider("https://eth.llamarpc.com").await?; +/// // assert_eq!(provider.get_chainid().await.unwrap(), 1); +/// ``` +pub async fn get_provider(rpc_url: &str) -> Result, Error> { + Ok(Provider::new(match http_or_ws_or_ipc::HttpOrWsOrIpc::connect(rpc_url).await { + Ok(provider) => provider, + Err(error) => { + error!("failed to connect to RPC provider '{}' .", &rpc_url); + return Err(Error::Generic(error.to_string())); + } + })) +} + /// Get the chainId of the provided RPC URL /// /// ```no_run /// use heimdall_common::ether::rpc::chain_id; /// /// // let chain_id = chain_id("https://eth.llamarpc.com").await?; -/// //assert_eq!(chain_id, 1); +/// // assert_eq!(chain_id, 1); /// ``` pub async fn chain_id(rpc_url: &str) -> Result { backoff::future::retry( @@ -44,7 +65,7 @@ pub async fn chain_id(rpc_url: &str) -> Result { } // create new provider - let provider = match Provider::::try_from(rpc_url) { + let provider = match get_provider(rpc_url).await { Ok(provider) => provider, Err(_) => { error!("failed to connect to RPC provider '{}' .", &rpc_url); @@ -96,7 +117,7 @@ pub async fn latest_block_number(rpc_url: &str) -> Result { } // create new provider - let provider = match Provider::::try_from(rpc_url) { + let provider = match get_provider(rpc_url).await { Ok(provider) => provider, Err(_) => { error!("failed to connect to RPC provider '{}' .", &rpc_url); @@ -156,7 +177,7 @@ pub async fn get_code(contract_address: &str, rpc_url: &str) -> Result, } // create new provider - let provider = match Provider::::try_from(rpc_url) { + let provider = match get_provider(rpc_url).await { Ok(provider) => provider, Err(_) => { error!("failed to connect to RPC provider '{}' .", &rpc_url); @@ -223,7 +244,7 @@ pub async fn get_transaction(transaction_hash: &str, rpc_url: &str) -> Result::try_from(rpc_url) { + let provider = match get_provider(rpc_url).await { Ok(provider) => provider, Err(_) => { error!("failed to connect to RPC provider '{}' .", &rpc_url); @@ -280,7 +301,7 @@ pub async fn get_trace(transaction_hash: &str, rpc_url: &str) -> Result::try_from(rpc_url) { + let provider = match get_provider(rpc_url).await { Ok(provider) => provider, Err(_) => { error!("failed to connect to RPC provider '{}' .", &rpc_url); @@ -349,7 +370,7 @@ pub async fn get_block_logs( trace!("fetching logs from node for block: '{}' .", &block_number); // create new provider - let provider = match Provider::::try_from(rpc_url) { + let provider = match get_provider(rpc_url).await { Ok(provider) => provider, Err(_) => { error!("failed to connect to RPC provider '{}' .", &rpc_url); @@ -409,7 +430,7 @@ pub async fn get_block_state_diff( trace!("fetching traces from node for block: '{}' .", &block_number); // create new provider - let provider = match Provider::::try_from(rpc_url) { + let provider = match get_provider(rpc_url).await { Ok(provider) => provider, Err(_) => { error!("failed to connect to RPC provider '{}' .", &rpc_url); @@ -528,4 +549,12 @@ pub mod tests { assert!(!logs.is_empty()); } + + #[tokio::test] + async fn test_chain_id_with_ws_rpc() { + let rpc_url = "wss://arbitrum-one-rpc.publicnode.com"; + let rpc_chain_id = chain_id(rpc_url).await.expect("chain_id() returned an error!"); + + assert_eq!(rpc_chain_id, 42161); + } } diff --git a/crates/decode/src/utils/abi.rs b/crates/decode/src/utils/abi.rs index 2b92adb6..e0091790 100644 --- a/crates/decode/src/utils/abi.rs +++ b/crates/decode/src/utils/abi.rs @@ -371,7 +371,7 @@ fn get_potential_type( } if size > max_size { - potential_type = types.first().expect("types is empty").clone(); + potential_type.clone_from(types.first().expect("types is empty")); (max_size, potential_type) } else { (max_size, potential_type) diff --git a/crates/decompile/src/utils/heuristics/events.rs b/crates/decompile/src/utils/heuristics/events.rs index fd327c42..8f53ed6f 100644 --- a/crates/decompile/src/utils/heuristics/events.rs +++ b/crates/decompile/src/utils/heuristics/events.rs @@ -32,44 +32,37 @@ pub fn event_heuristic( .join(", "); // add the event emission to the function's logic - match analyzer_state.analyzer_type { - AnalyzerType::Solidity => { - function.logic.push(format!( - "emit Event_{}({}{});{}", - &event - .topics - .first() - .unwrap_or(&U256::from(0)) - .encode_hex() - .replacen("0x", "", 1)[0..8], - event - .topics - .get(1..) - .map(|topics| { - if !event.data.is_empty() && !topics.is_empty() { - let mut solidified_topics: Vec = Vec::new(); - for (i, _) in topics.iter().enumerate() { - solidified_topics.push( - state.last_instruction.input_operations[i + 3].solidify(), - ); - } - format!("{}, ", solidified_topics.join(", ")) - } else { - let mut solidified_topics: Vec = Vec::new(); - for (i, _) in topics.iter().enumerate() { - solidified_topics.push( - state.last_instruction.input_operations[i + 3].solidify(), - ); - } - solidified_topics.join(", ") + if analyzer_state.analyzer_type == AnalyzerType::Solidity { + function.logic.push(format!( + "emit Event_{}({}{});{}", + &event.topics.first().unwrap_or(&U256::from(0)).encode_hex().replacen("0x", "", 1) + [0..8], + event + .topics + .get(1..) + .map(|topics| { + if !event.data.is_empty() && !topics.is_empty() { + let mut solidified_topics: Vec = Vec::new(); + for (i, _) in topics.iter().enumerate() { + solidified_topics.push( + state.last_instruction.input_operations[i + 3].solidify(), + ); } - }) - .unwrap_or("".to_string()), - data_mem_ops_solidified, - if anonymous { " // anonymous event" } else { "" } - )); - } - _ => {} + format!("{}, ", solidified_topics.join(", ")) + } else { + let mut solidified_topics: Vec = Vec::new(); + for (i, _) in topics.iter().enumerate() { + solidified_topics.push( + state.last_instruction.input_operations[i + 3].solidify(), + ); + } + solidified_topics.join(", ") + } + }) + .unwrap_or("".to_string()), + data_mem_ops_solidified, + if anonymous { " // anonymous event" } else { "" } + )); } }