Skip to content

Commit

Permalink
feat: implement reth_getBalanceChangesInBlock (#3768)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
ryanschneider and mattsse authored Jul 18, 2023
1 parent 0f81022 commit 34c9abe
Show file tree
Hide file tree
Showing 12 changed files with 219 additions and 18 deletions.
6 changes: 4 additions & 2 deletions bin/reth/src/args/rpc_server_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use clap::{
use futures::TryFutureExt;
use reth_network_api::{NetworkInfo, Peers};
use reth_provider::{
BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, EvmEnvProvider, HeaderProvider,
StateProviderFactory,
BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, EvmEnvProvider,
HeaderProvider, StateProviderFactory,
};
use reth_rpc::{
eth::{
Expand Down Expand Up @@ -249,6 +249,7 @@ impl RpcServerArgs {
+ StateProviderFactory
+ EvmEnvProvider
+ ChainSpecProvider
+ ChangeSetReader
+ Clone
+ Unpin
+ 'static,
Expand Down Expand Up @@ -310,6 +311,7 @@ impl RpcServerArgs {
+ StateProviderFactory
+ EvmEnvProvider
+ ChainSpecProvider
+ ChangeSetReader
+ Clone
+ Unpin
+ 'static,
Expand Down
2 changes: 2 additions & 0 deletions crates/rpc/rpc-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod eth;
mod eth_filter;
mod eth_pubsub;
mod net;
mod reth;
mod rpc;
mod trace;
mod txpool;
Expand All @@ -44,6 +45,7 @@ pub mod servers {
eth_filter::EthFilterApiServer,
eth_pubsub::EthPubSubApiServer,
net::NetApiServer,
reth::RethApiServer,
rpc::RpcApiServer,
trace::TraceApiServer,
txpool::TxPoolApiServer,
Expand Down
15 changes: 15 additions & 0 deletions crates/rpc/rpc-api/src/reth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use reth_primitives::{Address, BlockId, U256};
use std::collections::HashMap;

/// Reth API namespace for reth-specific methods
#[cfg_attr(not(feature = "client"), rpc(server, namespace = "reth"))]
#[cfg_attr(feature = "client", rpc(server, client, namespace = "reth"))]
pub trait RethApi {
/// Returns all ETH balance changes in a block
#[method(name = "getBalanceChangesInBlock")]
async fn reth_get_balance_changes_in_block(
&self,
block_id: BlockId,
) -> RpcResult<HashMap<Address, U256>>;
}
35 changes: 28 additions & 7 deletions crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@
//!
//! ```
//! use reth_network_api::{NetworkInfo, Peers};
//! use reth_provider::{BlockReaderIdExt, ChainSpecProvider, CanonStateSubscriptions, StateProviderFactory, EvmEnvProvider};
//! use reth_provider::{BlockReaderIdExt, ChainSpecProvider, CanonStateSubscriptions, StateProviderFactory, EvmEnvProvider, ChangeSetReader};
//! use reth_rpc_builder::{RethRpcModule, RpcModuleBuilder, RpcServerConfig, ServerBuilder, TransportRpcModuleConfig};
//! use reth_tasks::TokioTaskExecutor;
//! use reth_transaction_pool::TransactionPool;
//! pub async fn launch<Provider, Pool, Network, Events>(provider: Provider, pool: Pool, network: Network, events: Events)
//! where
//! Provider: BlockReaderIdExt + ChainSpecProvider + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static,
//! Provider: BlockReaderIdExt + ChainSpecProvider + ChangeSetReader + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static,
//! Pool: TransactionPool + Clone + 'static,
//! Network: NetworkInfo + Peers + Clone + 'static,
//! Events: CanonStateSubscriptions + Clone + 'static,
Expand All @@ -64,7 +64,7 @@
//! ```
//! use tokio::try_join;
//! use reth_network_api::{NetworkInfo, Peers};
//! use reth_provider::{BlockReaderIdExt, ChainSpecProvider, CanonStateSubscriptions, StateProviderFactory, EvmEnvProvider};
//! use reth_provider::{BlockReaderIdExt, ChainSpecProvider, CanonStateSubscriptions, StateProviderFactory, EvmEnvProvider, ChangeSetReader};
//! use reth_rpc::JwtSecret;
//! use reth_rpc_builder::{RethRpcModule, RpcModuleBuilder, RpcServerConfig, TransportRpcModuleConfig};
//! use reth_tasks::TokioTaskExecutor;
Expand All @@ -73,7 +73,7 @@
//! use reth_rpc_builder::auth::AuthServerConfig;
//! pub async fn launch<Provider, Pool, Network, Events, EngineApi>(provider: Provider, pool: Pool, network: Network, events: Events, engine_api: EngineApi)
//! where
//! Provider: BlockReaderIdExt + ChainSpecProvider + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static,
//! Provider: BlockReaderIdExt + ChainSpecProvider + ChangeSetReader + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static,
//! Pool: TransactionPool + Clone + 'static,
//! Network: NetworkInfo + Peers + Clone + 'static,
//! Events: CanonStateSubscriptions + Clone + 'static,
Expand Down Expand Up @@ -113,16 +113,16 @@ use jsonrpsee::{
use reth_ipc::server::IpcServer;
use reth_network_api::{NetworkInfo, Peers};
use reth_provider::{
BlockReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, EvmEnvProvider,
StateProviderFactory,
BlockReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader,
EvmEnvProvider, StateProviderFactory,
};
use reth_rpc::{
eth::{
cache::{cache_new_blocks_task, EthStateCache},
gas_oracle::GasPriceOracle,
},
AdminApi, DebugApi, EngineEthApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider,
NetApi, RPCApi, TraceApi, TracingCallGuard, TxPoolApi, Web3Api,
NetApi, RPCApi, RethApi, TraceApi, TracingCallGuard, TxPoolApi, Web3Api,
};
use reth_rpc_api::{servers::*, EngineApiServer};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
Expand Down Expand Up @@ -176,6 +176,7 @@ where
+ StateProviderFactory
+ EvmEnvProvider
+ ChainSpecProvider
+ ChangeSetReader
+ Clone
+ Unpin
+ 'static,
Expand Down Expand Up @@ -320,6 +321,7 @@ where
+ StateProviderFactory
+ EvmEnvProvider
+ ChainSpecProvider
+ ChangeSetReader
+ Clone
+ Unpin
+ 'static,
Expand Down Expand Up @@ -543,6 +545,7 @@ impl RpcModuleSelection {
+ StateProviderFactory
+ EvmEnvProvider
+ ChainSpecProvider
+ ChangeSetReader
+ Clone
+ Unpin
+ 'static,
Expand Down Expand Up @@ -646,6 +649,8 @@ pub enum RethRpcModule {
Web3,
/// `rpc_` module
Rpc,
/// `reth_` module
Reth,
}

// === impl RethRpcModule ===
Expand Down Expand Up @@ -758,6 +763,7 @@ where
+ StateProviderFactory
+ EvmEnvProvider
+ ChainSpecProvider
+ ChangeSetReader
+ Clone
+ Unpin
+ 'static,
Expand Down Expand Up @@ -839,6 +845,15 @@ where
self
}

/// Register Reth namespace
pub fn register_reth(&mut self) -> &mut Self {
self.modules.insert(
RethRpcModule::Reth,
RethApi::new(self.provider.clone(), Box::new(self.executor.clone())).into_rpc().into(),
);
self
}

/// Helper function to create a [RpcModule] if it's not `None`
fn maybe_module(&mut self, config: Option<&RpcModuleSelection>) -> Option<RpcModule<()>> {
let config = config?;
Expand Down Expand Up @@ -921,6 +936,11 @@ where
)
.into_rpc()
.into(),
RethRpcModule::Reth => {
RethApi::new(self.provider.clone(), Box::new(self.executor.clone()))
.into_rpc()
.into()
}
})
.clone()
})
Expand Down Expand Up @@ -1754,6 +1774,7 @@ mod tests {
"trace" => RethRpcModule::Trace,
"web3" => RethRpcModule::Web3,
"rpc" => RethRpcModule::Rpc,
"reth" => RethRpcModule::Reth,
);
}

Expand Down
2 changes: 2 additions & 0 deletions crates/rpc/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ mod engine;
pub mod eth;
mod layers;
mod net;
mod reth;
mod rpc;
mod trace;
mod txpool;
Expand All @@ -49,6 +50,7 @@ pub use engine::{EngineApi, EngineEthApi};
pub use eth::{EthApi, EthApiSpec, EthFilter, EthPubSub, EthSubscriptionIdProvider};
pub use layers::{AuthLayer, AuthValidator, Claims, JwtAuthValidator, JwtError, JwtSecret};
pub use net::NetApi;
pub use reth::RethApi;
pub use rpc::RPCApi;
pub use trace::TraceApi;
pub use txpool::TxPoolApi;
Expand Down
118 changes: 118 additions & 0 deletions crates/rpc/rpc/src/reth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use crate::eth::error::{EthApiError, EthResult};
use async_trait::async_trait;
use jsonrpsee::core::RpcResult;
use reth_interfaces::Result;
use reth_primitives::{Address, BlockId, U256};
use reth_provider::{BlockReaderIdExt, ChangeSetReader, StateProviderFactory};
use reth_rpc_api::RethApiServer;
use reth_tasks::TaskSpawner;
use std::{collections::HashMap, future::Future, sync::Arc};
use tokio::sync::oneshot;

/// `reth` API implementation.
///
/// This type provides the functionality for handling `reth` prototype RPC requests.
pub struct RethApi<Provider> {
inner: Arc<RethApiInner<Provider>>,
}

// === impl RethApi ===

impl<Provider> RethApi<Provider> {
/// The provider that can interact with the chain.
pub fn provider(&self) -> &Provider {
&self.inner.provider
}

/// Create a new instance of the [RethApi]
pub fn new(provider: Provider, task_spawner: Box<dyn TaskSpawner>) -> Self {
let inner = Arc::new(RethApiInner { provider, task_spawner });
Self { inner }
}
}

impl<Provider> RethApi<Provider>
where
Provider: BlockReaderIdExt + ChangeSetReader + StateProviderFactory + 'static,
{
/// Executes the future on a new blocking task.
async fn on_blocking_task<C, F, R>(&self, c: C) -> EthResult<R>
where
C: FnOnce(Self) -> F,
F: Future<Output = EthResult<R>> + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let this = self.clone();
let f = c(this);
self.inner.task_spawner.spawn_blocking(Box::pin(async move {
let res = f.await;
let _ = tx.send(res);
}));
rx.await.map_err(|_| EthApiError::InternalEthError)?
}

/// Returns a map of addresses to changed account balanced for a particular block.
pub async fn balance_changes_in_block(
&self,
block_id: BlockId,
) -> EthResult<HashMap<Address, U256>> {
self.on_blocking_task(|this| async move { this.try_balance_changes_in_block(block_id) })
.await
}

fn try_balance_changes_in_block(&self, block_id: BlockId) -> EthResult<HashMap<Address, U256>> {
let block_id = block_id;
let Some(block_number) = self.provider().block_number_for_id(block_id)? else {
return Err(EthApiError::UnknownBlockNumber)
};

let state = self.provider().state_by_block_id(block_id)?;
let accounts_before = self.provider().account_block_changeset(block_number)?;
let hash_map = accounts_before.iter().try_fold(
HashMap::new(),
|mut hash_map, account_before| -> Result<_> {
let current_balance = state.account_balance(account_before.address)?;
let prev_balance = account_before.info.map(|info| info.balance);
if current_balance != prev_balance {
hash_map.insert(account_before.address, current_balance.unwrap_or_default());
}
Ok(hash_map)
},
)?;
Ok(hash_map)
}
}

#[async_trait]
impl<Provider> RethApiServer for RethApi<Provider>
where
Provider: BlockReaderIdExt + ChangeSetReader + StateProviderFactory + 'static,
{
/// Handler for `reth_getBalanceChangesInBlock`
async fn reth_get_balance_changes_in_block(
&self,
block_id: BlockId,
) -> RpcResult<HashMap<Address, U256>> {
Ok(RethApi::balance_changes_in_block(self, block_id).await?)
}
}

impl<Provider> std::fmt::Debug for RethApi<Provider> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RethApi").finish_non_exhaustive()
}
}

impl<Provider> Clone for RethApi<Provider> {
fn clone(&self) -> Self {
Self { inner: Arc::clone(&self.inner) }
}
}

struct RethApiInner<Provider> {
/// The provider that can interact with the chain.
provider: Provider,
/// The type that can spawn tasks which would otherwise block.
task_spawner: Box<dyn TaskSpawner>,
}
4 changes: 2 additions & 2 deletions crates/storage/provider/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ pub use traits::{
BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, BlockSource, BlockWriter,
BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotification,
CanonStateNotificationSender, CanonStateNotifications, CanonStateSubscriptions,
ChainSpecProvider, EvmEnvProvider, ExecutorFactory, HashingWriter, HeaderProvider,
HistoryWriter, PostStateDataProvider, ReceiptProvider, ReceiptProviderIdExt,
ChainSpecProvider, ChangeSetReader, EvmEnvProvider, ExecutorFactory, HashingWriter,
HeaderProvider, HistoryWriter, PostStateDataProvider, ReceiptProvider, ReceiptProviderIdExt,
StageCheckpointReader, StageCheckpointWriter, StateProvider, StateProviderBox,
StateProviderFactory, StateRootProvider, StorageReader, TransactionsProvider,
WithdrawalsProvider,
Expand Down
18 changes: 17 additions & 1 deletion crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::{
post_state::StorageChangeset,
traits::{AccountExtReader, BlockSource, ReceiptProvider, StageCheckpointWriter},
traits::{
AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
},
AccountReader, BlockExecutionWriter, BlockHashReader, BlockNumReader, BlockReader, BlockWriter,
EvmEnvProvider, HashingWriter, HeaderProvider, HistoryWriter, PostState, ProviderError,
StageCheckpointReader, StorageReader, TransactionsProvider, WithdrawalsProvider,
Expand Down Expand Up @@ -726,6 +728,20 @@ impl<'this, TX: DbTx<'this>> AccountExtReader for DatabaseProvider<'this, TX> {
}
}

impl<'this, TX: DbTx<'this>> ChangeSetReader for DatabaseProvider<'this, TX> {
fn account_block_changeset(&self, block_number: BlockNumber) -> Result<Vec<AccountBeforeTx>> {
let range = block_number..=block_number;
self.tx
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(range)?
.map(|result| -> Result<_> {
let (_, account_before) = result?;
Ok(account_before)
})
.collect()
}
}

impl<'this, TX: DbTx<'this>> HeaderProvider for DatabaseProvider<'this, TX> {
fn header(&self, block_hash: &BlockHash) -> Result<Option<Header>> {
if let Some(num) = self.block_number(*block_hash)? {
Expand Down
13 changes: 12 additions & 1 deletion crates/storage/provider/src/providers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt,
BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotifications,
CanonStateSubscriptions, ChainSpecProvider, EvmEnvProvider, HeaderProvider,
CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, EvmEnvProvider, HeaderProvider,
PostStateDataProvider, ProviderError, ReceiptProvider, ReceiptProviderIdExt,
StageCheckpointReader, StateProviderBox, StateProviderFactory, TransactionsProvider,
WithdrawalsProvider,
Expand Down Expand Up @@ -39,6 +39,7 @@ mod state;
use crate::{providers::chain_info::ChainInfoTracker, traits::BlockSource};
pub use database::*;
pub use post_state_provider::PostStateProvider;
use reth_db::models::AccountBeforeTx;
use reth_interfaces::blockchain_tree::{
error::InsertBlockError, CanonicalOutcome, InsertPayloadOk,
};
Expand Down Expand Up @@ -813,3 +814,13 @@ where
self.tree.subscribe_to_canonical_state()
}
}

impl<DB, Tree> ChangeSetReader for BlockchainProvider<DB, Tree>
where
DB: Database,
Tree: Sync + Send,
{
fn account_block_changeset(&self, block_number: BlockNumber) -> Result<Vec<AccountBeforeTx>> {
self.database.provider()?.account_block_changeset(block_number)
}
}
Loading

0 comments on commit 34c9abe

Please sign in to comment.