From 34c9abe2498d980802684018dd37728cecb5a37b Mon Sep 17 00:00:00 2001 From: Ryan Schneider Date: Tue, 18 Jul 2023 10:49:23 -0700 Subject: [PATCH] feat: implement reth_getBalanceChangesInBlock (#3768) Co-authored-by: Matthias Seitz --- bin/reth/src/args/rpc_server_args.rs | 6 +- crates/rpc/rpc-api/src/lib.rs | 2 + crates/rpc/rpc-api/src/reth.rs | 15 +++ crates/rpc/rpc-builder/src/lib.rs | 35 ++++-- crates/rpc/rpc/src/lib.rs | 2 + crates/rpc/rpc/src/reth.rs | 118 ++++++++++++++++++ crates/storage/provider/src/lib.rs | 4 +- .../src/providers/database/provider.rs | 18 ++- crates/storage/provider/src/providers/mod.rs | 13 +- .../storage/provider/src/test_utils/noop.rs | 14 ++- crates/storage/provider/src/traits/account.rs | 8 ++ crates/storage/provider/src/traits/mod.rs | 2 +- 12 files changed, 219 insertions(+), 18 deletions(-) create mode 100644 crates/rpc/rpc-api/src/reth.rs create mode 100644 crates/rpc/rpc/src/reth.rs diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index a1c954b4a8a5..4dbde75cbd41 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -8,8 +8,8 @@ use clap::{ use futures::TryFutureExt; use reth_network_api::{NetworkInfo, Peers}; use reth_provider::{ - BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, EvmEnvProvider, HeaderProvider, - StateProviderFactory, + BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, EvmEnvProvider, + HeaderProvider, StateProviderFactory, }; use reth_rpc::{ eth::{ @@ -249,6 +249,7 @@ impl RpcServerArgs { + StateProviderFactory + EvmEnvProvider + ChainSpecProvider + + ChangeSetReader + Clone + Unpin + 'static, @@ -310,6 +311,7 @@ impl RpcServerArgs { + StateProviderFactory + EvmEnvProvider + ChainSpecProvider + + ChangeSetReader + Clone + Unpin + 'static, diff --git a/crates/rpc/rpc-api/src/lib.rs b/crates/rpc/rpc-api/src/lib.rs index acca756711c1..7fba5adba9e0 100644 --- a/crates/rpc/rpc-api/src/lib.rs +++ b/crates/rpc/rpc-api/src/lib.rs @@ -26,6 +26,7 @@ mod eth; mod eth_filter; mod eth_pubsub; mod net; +mod reth; mod rpc; mod trace; mod txpool; @@ -44,6 +45,7 @@ pub mod servers { eth_filter::EthFilterApiServer, eth_pubsub::EthPubSubApiServer, net::NetApiServer, + reth::RethApiServer, rpc::RpcApiServer, trace::TraceApiServer, txpool::TxPoolApiServer, diff --git a/crates/rpc/rpc-api/src/reth.rs b/crates/rpc/rpc-api/src/reth.rs new file mode 100644 index 000000000000..1e9c4314ab14 --- /dev/null +++ b/crates/rpc/rpc-api/src/reth.rs @@ -0,0 +1,15 @@ +use jsonrpsee::{core::RpcResult, proc_macros::rpc}; +use reth_primitives::{Address, BlockId, U256}; +use std::collections::HashMap; + +/// Reth API namespace for reth-specific methods +#[cfg_attr(not(feature = "client"), rpc(server, namespace = "reth"))] +#[cfg_attr(feature = "client", rpc(server, client, namespace = "reth"))] +pub trait RethApi { + /// Returns all ETH balance changes in a block + #[method(name = "getBalanceChangesInBlock")] + async fn reth_get_balance_changes_in_block( + &self, + block_id: BlockId, + ) -> RpcResult>; +} diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index d01036f8d4b7..368960517b7c 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -31,13 +31,13 @@ //! //! ``` //! use reth_network_api::{NetworkInfo, Peers}; -//! use reth_provider::{BlockReaderIdExt, ChainSpecProvider, CanonStateSubscriptions, StateProviderFactory, EvmEnvProvider}; +//! use reth_provider::{BlockReaderIdExt, ChainSpecProvider, CanonStateSubscriptions, StateProviderFactory, EvmEnvProvider, ChangeSetReader}; //! use reth_rpc_builder::{RethRpcModule, RpcModuleBuilder, RpcServerConfig, ServerBuilder, TransportRpcModuleConfig}; //! use reth_tasks::TokioTaskExecutor; //! use reth_transaction_pool::TransactionPool; //! pub async fn launch(provider: Provider, pool: Pool, network: Network, events: Events) //! where -//! Provider: BlockReaderIdExt + ChainSpecProvider + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static, +//! Provider: BlockReaderIdExt + ChainSpecProvider + ChangeSetReader + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static, //! Pool: TransactionPool + Clone + 'static, //! Network: NetworkInfo + Peers + Clone + 'static, //! Events: CanonStateSubscriptions + Clone + 'static, @@ -64,7 +64,7 @@ //! ``` //! use tokio::try_join; //! use reth_network_api::{NetworkInfo, Peers}; -//! use reth_provider::{BlockReaderIdExt, ChainSpecProvider, CanonStateSubscriptions, StateProviderFactory, EvmEnvProvider}; +//! use reth_provider::{BlockReaderIdExt, ChainSpecProvider, CanonStateSubscriptions, StateProviderFactory, EvmEnvProvider, ChangeSetReader}; //! use reth_rpc::JwtSecret; //! use reth_rpc_builder::{RethRpcModule, RpcModuleBuilder, RpcServerConfig, TransportRpcModuleConfig}; //! use reth_tasks::TokioTaskExecutor; @@ -73,7 +73,7 @@ //! use reth_rpc_builder::auth::AuthServerConfig; //! pub async fn launch(provider: Provider, pool: Pool, network: Network, events: Events, engine_api: EngineApi) //! where -//! Provider: BlockReaderIdExt + ChainSpecProvider + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static, +//! Provider: BlockReaderIdExt + ChainSpecProvider + ChangeSetReader + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static, //! Pool: TransactionPool + Clone + 'static, //! Network: NetworkInfo + Peers + Clone + 'static, //! Events: CanonStateSubscriptions + Clone + 'static, @@ -113,8 +113,8 @@ use jsonrpsee::{ use reth_ipc::server::IpcServer; use reth_network_api::{NetworkInfo, Peers}; use reth_provider::{ - BlockReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, EvmEnvProvider, - StateProviderFactory, + BlockReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, + EvmEnvProvider, StateProviderFactory, }; use reth_rpc::{ eth::{ @@ -122,7 +122,7 @@ use reth_rpc::{ gas_oracle::GasPriceOracle, }, AdminApi, DebugApi, EngineEthApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider, - NetApi, RPCApi, TraceApi, TracingCallGuard, TxPoolApi, Web3Api, + NetApi, RPCApi, RethApi, TraceApi, TracingCallGuard, TxPoolApi, Web3Api, }; use reth_rpc_api::{servers::*, EngineApiServer}; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; @@ -176,6 +176,7 @@ where + StateProviderFactory + EvmEnvProvider + ChainSpecProvider + + ChangeSetReader + Clone + Unpin + 'static, @@ -320,6 +321,7 @@ where + StateProviderFactory + EvmEnvProvider + ChainSpecProvider + + ChangeSetReader + Clone + Unpin + 'static, @@ -543,6 +545,7 @@ impl RpcModuleSelection { + StateProviderFactory + EvmEnvProvider + ChainSpecProvider + + ChangeSetReader + Clone + Unpin + 'static, @@ -646,6 +649,8 @@ pub enum RethRpcModule { Web3, /// `rpc_` module Rpc, + /// `reth_` module + Reth, } // === impl RethRpcModule === @@ -758,6 +763,7 @@ where + StateProviderFactory + EvmEnvProvider + ChainSpecProvider + + ChangeSetReader + Clone + Unpin + 'static, @@ -839,6 +845,15 @@ where self } + /// Register Reth namespace + pub fn register_reth(&mut self) -> &mut Self { + self.modules.insert( + RethRpcModule::Reth, + RethApi::new(self.provider.clone(), Box::new(self.executor.clone())).into_rpc().into(), + ); + self + } + /// Helper function to create a [RpcModule] if it's not `None` fn maybe_module(&mut self, config: Option<&RpcModuleSelection>) -> Option> { let config = config?; @@ -921,6 +936,11 @@ where ) .into_rpc() .into(), + RethRpcModule::Reth => { + RethApi::new(self.provider.clone(), Box::new(self.executor.clone())) + .into_rpc() + .into() + } }) .clone() }) @@ -1754,6 +1774,7 @@ mod tests { "trace" => RethRpcModule::Trace, "web3" => RethRpcModule::Web3, "rpc" => RethRpcModule::Rpc, + "reth" => RethRpcModule::Reth, ); } diff --git a/crates/rpc/rpc/src/lib.rs b/crates/rpc/rpc/src/lib.rs index 8ec0893a0282..64083f746b2b 100644 --- a/crates/rpc/rpc/src/lib.rs +++ b/crates/rpc/rpc/src/lib.rs @@ -37,6 +37,7 @@ mod engine; pub mod eth; mod layers; mod net; +mod reth; mod rpc; mod trace; mod txpool; @@ -49,6 +50,7 @@ pub use engine::{EngineApi, EngineEthApi}; pub use eth::{EthApi, EthApiSpec, EthFilter, EthPubSub, EthSubscriptionIdProvider}; pub use layers::{AuthLayer, AuthValidator, Claims, JwtAuthValidator, JwtError, JwtSecret}; pub use net::NetApi; +pub use reth::RethApi; pub use rpc::RPCApi; pub use trace::TraceApi; pub use txpool::TxPoolApi; diff --git a/crates/rpc/rpc/src/reth.rs b/crates/rpc/rpc/src/reth.rs new file mode 100644 index 000000000000..7fbdf25ab460 --- /dev/null +++ b/crates/rpc/rpc/src/reth.rs @@ -0,0 +1,118 @@ +use crate::eth::error::{EthApiError, EthResult}; +use async_trait::async_trait; +use jsonrpsee::core::RpcResult; +use reth_interfaces::Result; +use reth_primitives::{Address, BlockId, U256}; +use reth_provider::{BlockReaderIdExt, ChangeSetReader, StateProviderFactory}; +use reth_rpc_api::RethApiServer; +use reth_tasks::TaskSpawner; +use std::{collections::HashMap, future::Future, sync::Arc}; +use tokio::sync::oneshot; + +/// `reth` API implementation. +/// +/// This type provides the functionality for handling `reth` prototype RPC requests. +pub struct RethApi { + inner: Arc>, +} + +// === impl RethApi === + +impl RethApi { + /// The provider that can interact with the chain. + pub fn provider(&self) -> &Provider { + &self.inner.provider + } + + /// Create a new instance of the [RethApi] + pub fn new(provider: Provider, task_spawner: Box) -> Self { + let inner = Arc::new(RethApiInner { provider, task_spawner }); + Self { inner } + } +} + +impl RethApi +where + Provider: BlockReaderIdExt + ChangeSetReader + StateProviderFactory + 'static, +{ + /// Executes the future on a new blocking task. + async fn on_blocking_task(&self, c: C) -> EthResult + where + C: FnOnce(Self) -> F, + F: Future> + Send + 'static, + R: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + let this = self.clone(); + let f = c(this); + self.inner.task_spawner.spawn_blocking(Box::pin(async move { + let res = f.await; + let _ = tx.send(res); + })); + rx.await.map_err(|_| EthApiError::InternalEthError)? + } + + /// Returns a map of addresses to changed account balanced for a particular block. + pub async fn balance_changes_in_block( + &self, + block_id: BlockId, + ) -> EthResult> { + self.on_blocking_task(|this| async move { this.try_balance_changes_in_block(block_id) }) + .await + } + + fn try_balance_changes_in_block(&self, block_id: BlockId) -> EthResult> { + let block_id = block_id; + let Some(block_number) = self.provider().block_number_for_id(block_id)? else { + return Err(EthApiError::UnknownBlockNumber) + }; + + let state = self.provider().state_by_block_id(block_id)?; + let accounts_before = self.provider().account_block_changeset(block_number)?; + let hash_map = accounts_before.iter().try_fold( + HashMap::new(), + |mut hash_map, account_before| -> Result<_> { + let current_balance = state.account_balance(account_before.address)?; + let prev_balance = account_before.info.map(|info| info.balance); + if current_balance != prev_balance { + hash_map.insert(account_before.address, current_balance.unwrap_or_default()); + } + Ok(hash_map) + }, + )?; + Ok(hash_map) + } +} + +#[async_trait] +impl RethApiServer for RethApi +where + Provider: BlockReaderIdExt + ChangeSetReader + StateProviderFactory + 'static, +{ + /// Handler for `reth_getBalanceChangesInBlock` + async fn reth_get_balance_changes_in_block( + &self, + block_id: BlockId, + ) -> RpcResult> { + Ok(RethApi::balance_changes_in_block(self, block_id).await?) + } +} + +impl std::fmt::Debug for RethApi { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RethApi").finish_non_exhaustive() + } +} + +impl Clone for RethApi { + fn clone(&self) -> Self { + Self { inner: Arc::clone(&self.inner) } + } +} + +struct RethApiInner { + /// The provider that can interact with the chain. + provider: Provider, + /// The type that can spawn tasks which would otherwise block. + task_spawner: Box, +} diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index a6a1f6bf2980..64c9932711e8 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -25,8 +25,8 @@ pub use traits::{ BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, BlockSource, BlockWriter, BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, CanonStateSubscriptions, - ChainSpecProvider, EvmEnvProvider, ExecutorFactory, HashingWriter, HeaderProvider, - HistoryWriter, PostStateDataProvider, ReceiptProvider, ReceiptProviderIdExt, + ChainSpecProvider, ChangeSetReader, EvmEnvProvider, ExecutorFactory, HashingWriter, + HeaderProvider, HistoryWriter, PostStateDataProvider, ReceiptProvider, ReceiptProviderIdExt, StageCheckpointReader, StageCheckpointWriter, StateProvider, StateProviderBox, StateProviderFactory, StateRootProvider, StorageReader, TransactionsProvider, WithdrawalsProvider, diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 16f25d9792f7..7205710ab1a3 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -1,6 +1,8 @@ use crate::{ post_state::StorageChangeset, - traits::{AccountExtReader, BlockSource, ReceiptProvider, StageCheckpointWriter}, + traits::{ + AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter, + }, AccountReader, BlockExecutionWriter, BlockHashReader, BlockNumReader, BlockReader, BlockWriter, EvmEnvProvider, HashingWriter, HeaderProvider, HistoryWriter, PostState, ProviderError, StageCheckpointReader, StorageReader, TransactionsProvider, WithdrawalsProvider, @@ -726,6 +728,20 @@ impl<'this, TX: DbTx<'this>> AccountExtReader for DatabaseProvider<'this, TX> { } } +impl<'this, TX: DbTx<'this>> ChangeSetReader for DatabaseProvider<'this, TX> { + fn account_block_changeset(&self, block_number: BlockNumber) -> Result> { + let range = block_number..=block_number; + self.tx + .cursor_read::()? + .walk_range(range)? + .map(|result| -> Result<_> { + let (_, account_before) = result?; + Ok(account_before) + }) + .collect() + } +} + impl<'this, TX: DbTx<'this>> HeaderProvider for DatabaseProvider<'this, TX> { fn header(&self, block_hash: &BlockHash) -> Result> { if let Some(num) = self.block_number(*block_hash)? { diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 083f15f98874..27e4903bb1de 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -1,7 +1,7 @@ use crate::{ BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotifications, - CanonStateSubscriptions, ChainSpecProvider, EvmEnvProvider, HeaderProvider, + CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, EvmEnvProvider, HeaderProvider, PostStateDataProvider, ProviderError, ReceiptProvider, ReceiptProviderIdExt, StageCheckpointReader, StateProviderBox, StateProviderFactory, TransactionsProvider, WithdrawalsProvider, @@ -39,6 +39,7 @@ mod state; use crate::{providers::chain_info::ChainInfoTracker, traits::BlockSource}; pub use database::*; pub use post_state_provider::PostStateProvider; +use reth_db::models::AccountBeforeTx; use reth_interfaces::blockchain_tree::{ error::InsertBlockError, CanonicalOutcome, InsertPayloadOk, }; @@ -813,3 +814,13 @@ where self.tree.subscribe_to_canonical_state() } } + +impl ChangeSetReader for BlockchainProvider +where + DB: Database, + Tree: Sync + Send, +{ + fn account_block_changeset(&self, block_number: BlockNumber) -> Result> { + self.database.provider()?.account_block_changeset(block_number) + } +} diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index 8fa349b11b5b..def01741e68a 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -1,11 +1,11 @@ use crate::{ traits::{BlockSource, ReceiptProvider}, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, - ChainSpecProvider, EvmEnvProvider, HeaderProvider, PostState, ReceiptProviderIdExt, - StageCheckpointReader, StateProvider, StateProviderBox, StateProviderFactory, - StateRootProvider, TransactionsProvider, WithdrawalsProvider, + ChainSpecProvider, ChangeSetReader, EvmEnvProvider, HeaderProvider, PostState, + ReceiptProviderIdExt, StageCheckpointReader, StateProvider, StateProviderBox, + StateProviderFactory, StateRootProvider, TransactionsProvider, WithdrawalsProvider, }; -use reth_db::models::StoredBlockBodyIndices; +use reth_db::models::{AccountBeforeTx, StoredBlockBodyIndices}; use reth_interfaces::Result; use reth_primitives::{ stage::{StageCheckpoint, StageId}, @@ -235,6 +235,12 @@ impl AccountReader for NoopProvider { } } +impl ChangeSetReader for NoopProvider { + fn account_block_changeset(&self, _block_number: BlockNumber) -> Result> { + Ok(Vec::default()) + } +} + impl StateRootProvider for NoopProvider { fn state_root(&self, _post_state: PostState) -> Result { todo!() diff --git a/crates/storage/provider/src/traits/account.rs b/crates/storage/provider/src/traits/account.rs index d08d15a1218e..5ae4fe60a778 100644 --- a/crates/storage/provider/src/traits/account.rs +++ b/crates/storage/provider/src/traits/account.rs @@ -1,4 +1,5 @@ use auto_impl::auto_impl; +use reth_db::models::AccountBeforeTx; use reth_interfaces::Result; use reth_primitives::{Account, Address, BlockNumber}; use std::{ @@ -42,3 +43,10 @@ pub trait AccountExtReader: Send + Sync { range: RangeInclusive, ) -> Result>>; } + +/// AccountChange reader +#[auto_impl(&, Arc, Box)] +pub trait ChangeSetReader: Send + Sync { + /// Iterate over account changesets and return the account state from before this block. + fn account_block_changeset(&self, block_number: BlockNumber) -> Result>; +} diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index 0411c995fcea..5343185bdf19 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -1,7 +1,7 @@ //! Collection of common provider traits. mod account; -pub use account::{AccountExtReader, AccountReader}; +pub use account::{AccountExtReader, AccountReader, ChangeSetReader}; mod storage; pub use storage::StorageReader;