diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 45bdb1525690..4e6a8d0ef791 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -14,21 +14,27 @@ use reth_chain_state::{ BlockState, CanonicalInMemoryState, ForkChoiceNotifications, ForkChoiceSubscriptions, MemoryOverlayStateProvider, }; -use reth_chainspec::ChainInfo; +use reth_chainspec::{ChainInfo, EthereumHardforks}; +use reth_db::models::BlockNumberAddress; use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices}; use reth_evm::ConfigureEvmEnv; -use reth_execution_types::ExecutionOutcome; +use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit}; use reth_node_types::NodeTypesWithDB; use reth_primitives::{ - Account, Block, BlockWithSenders, EthereumHardforks, Header, Receipt, SealedBlock, - SealedBlockWithSenders, SealedHeader, TransactionMeta, TransactionSigned, - TransactionSignedNoHash, Withdrawal, Withdrawals, + Account, Block, BlockWithSenders, Header, Receipt, SealedBlock, SealedBlockWithSenders, + SealedHeader, StorageEntry, TransactionMeta, TransactionSigned, TransactionSignedNoHash, + Withdrawal, Withdrawals, }; use reth_prune_types::{PruneCheckpoint, PruneSegment}; use reth_stages_types::{StageCheckpoint, StageId}; +use reth_storage_api::StorageChangeSetReader; use reth_storage_errors::provider::ProviderResult; -use revm::primitives::{BlockEnv, CfgEnvWithHandlerCfg}; +use revm::{ + db::states::PlainStorageRevert, + primitives::{BlockEnv, CfgEnvWithHandlerCfg}, +}; use std::{ + collections::{hash_map, HashMap}, ops::{Add, Bound, RangeBounds, RangeInclusive, Sub}, sync::Arc, time::Instant, @@ -122,6 +128,145 @@ impl BlockchainProvider2 { (start, end) } + /// Return the last N blocks of state, recreating the [`ExecutionOutcome`]. + /// + /// If the range is empty, or there are no blocks for the given range, then this returns `None`. + pub fn get_state( + &self, + range: RangeInclusive, + ) -> ProviderResult> { + if range.is_empty() { + return Ok(None) + } + let start_block_number = *range.start(); + let end_block_number = *range.end(); + + // We are not removing block meta as it is used to get block changesets. + let mut block_bodies = Vec::new(); + for block_num in range.clone() { + let block_body = self + .block_body_indices(block_num)? + .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?; + block_bodies.push((block_num, block_body)) + } + + // get transaction receipts + let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num()) + else { + return Ok(None) + }; + let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else { + return Ok(None) + }; + + let mut account_changeset = Vec::new(); + for block_num in range.clone() { + let changeset = + self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem)); + account_changeset.extend(changeset); + } + + let mut storage_changeset = Vec::new(); + for block_num in range { + let changeset = self.storage_changeset(block_num)?; + storage_changeset.extend(changeset); + } + + let (state, reverts) = + self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?; + + let mut receipt_iter = + self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter(); + + let mut receipts = Vec::with_capacity(block_bodies.len()); + // loop break if we are at the end of the blocks. + for (_, block_body) in block_bodies { + let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize); + for tx_num in block_body.tx_num_range() { + let receipt = + receipt_iter.next().ok_or(ProviderError::ReceiptNotFound(tx_num.into()))?; + block_receipts.push(Some(receipt)); + } + receipts.push(block_receipts); + } + + Ok(Some(ExecutionOutcome::new_init( + state, + reverts, + // We skip new contracts since we never delete them from the database + Vec::new(), + receipts.into(), + start_block_number, + Vec::new(), + ))) + } + + /// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the + /// [`reth_db::PlainAccountState`] and [`reth_db::PlainStorageState`] tables, based on the given + /// storage and account changesets. + fn populate_bundle_state( + &self, + account_changeset: Vec<(u64, AccountBeforeTx)>, + storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>, + block_range_end: BlockNumber, + ) -> ProviderResult<(BundleStateInit, RevertsInit)> { + let mut state: BundleStateInit = HashMap::new(); + let mut reverts: RevertsInit = HashMap::new(); + let state_provider = self.state_by_block_number_or_tag(block_range_end.into())?; + + // add account changeset changes + for (block_number, account_before) in account_changeset.into_iter().rev() { + let AccountBeforeTx { info: old_info, address } = account_before; + match state.entry(address) { + hash_map::Entry::Vacant(entry) => { + let new_info = state_provider.basic_account(address)?; + entry.insert((old_info, new_info, HashMap::new())); + } + hash_map::Entry::Occupied(mut entry) => { + // overwrite old account state. + entry.get_mut().0 = old_info; + } + } + // insert old info into reverts. + reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info); + } + + // add storage changeset changes + for (block_and_address, old_storage) in storage_changeset.into_iter().rev() { + let BlockNumberAddress((block_number, address)) = block_and_address; + // get account state or insert from plain state. + let account_state = match state.entry(address) { + hash_map::Entry::Vacant(entry) => { + let present_info = state_provider.basic_account(address)?; + entry.insert((present_info, present_info, HashMap::new())) + } + hash_map::Entry::Occupied(entry) => entry.into_mut(), + }; + + // match storage. + match account_state.2.entry(old_storage.key) { + hash_map::Entry::Vacant(entry) => { + let new_storage_value = + state_provider.storage(address, old_storage.key)?.unwrap_or_default(); + entry.insert((old_storage.value, new_storage_value)); + } + hash_map::Entry::Occupied(mut entry) => { + entry.get_mut().0 = old_storage.value; + } + }; + + reverts + .entry(block_number) + .or_default() + .entry(address) + .or_default() + .1 + .push(old_storage); + } + + Ok((state, reverts)) + } + /// Fetches a range of data from both in-memory state and persistent storage while a predicate /// is met. /// @@ -1420,6 +1565,57 @@ impl ForkChoiceSubscriptions for BlockchainProvider2 { } } +impl StorageChangeSetReader for BlockchainProvider2 { + fn storage_changeset( + &self, + block_number: BlockNumber, + ) -> ProviderResult> { + if let Some(state) = self.canonical_in_memory_state.state_by_number(block_number) { + let changesets = state + .block() + .execution_output + .bundle + .reverts + .clone() + .into_plain_state_reverts() + .storage + .into_iter() + .flatten() + .flat_map(|revert: PlainStorageRevert| { + revert.storage_revert.into_iter().map(move |(key, value)| { + ( + BlockNumberAddress((block_number, revert.address)), + StorageEntry { key: key.into(), value: value.to_previous_value() }, + ) + }) + }) + .collect(); + Ok(changesets) + } else { + // Perform checks on whether or not changesets exist for the block. + let provider = self.database.provider()?; + + // No prune checkpoint means history should exist and we should `unwrap_or(true)` + let storage_history_exists = provider + .get_prune_checkpoint(PruneSegment::StorageHistory)? + .and_then(|checkpoint| { + // return true if the block number is ahead of the prune checkpoint. + // + // The checkpoint stores the highest pruned block number, so we should make + // sure the block_number is strictly greater. + checkpoint.block_number.map(|checkpoint| block_number > checkpoint) + }) + .unwrap_or(true); + + if !storage_history_exists { + return Err(ProviderError::StateAtBlockPruned(block_number)) + } + + provider.storage_changeset(block_number) + } + } +} + impl ChangeSetReader for BlockchainProvider2 { fn account_block_changeset( &self, @@ -1440,7 +1636,25 @@ impl ChangeSetReader for BlockchainProvider2 { .collect(); Ok(changesets) } else { - self.database.provider()?.account_block_changeset(block_number) + // Perform checks on whether or not changesets exist for the block. + let provider = self.database.provider()?; + // No prune checkpoint means history should exist and we should `unwrap_or(true)` + let account_history_exists = provider + .get_prune_checkpoint(PruneSegment::AccountHistory)? + .and_then(|checkpoint| { + // return true if the block number is ahead of the prune checkpoint. + // + // The checkpoint stores the highest pruned block number, so we should make + // sure the block_number is strictly greater. + checkpoint.block_number.map(|checkpoint| block_number > checkpoint) + }) + .unwrap_or(true); + + if !account_history_exists { + return Err(ProviderError::StateAtBlockPruned(block_number)) + } + + provider.account_block_changeset(block_number) } } } @@ -1455,12 +1669,21 @@ impl AccountReader for BlockchainProvider2 { } impl StateReader for BlockchainProvider2 { + /// Re-constructs the [`ExecutionOutcome`] from in-memory and database state, if necessary. + /// + /// If data for the block does not exist, this will return [`None`]. + /// + /// NOTE: This cannot be called safely in a loop outside of the blockchain tree thread. This is + /// because the [`CanonicalInMemoryState`] could change during a reorg, causing results to be + /// inconsistent. Currently this can safely be called within the blockchain tree thread, + /// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the + /// first place. fn get_state(&self, block: BlockNumber) -> ProviderResult> { if let Some(state) = self.canonical_in_memory_state.state_by_number(block) { let state = state.block_ref().execution_outcome().clone(); Ok(Some(state)) } else { - self.database.provider()?.get_state(block..=block) + self.get_state(block..=block) } } } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 8627cacabb4c..1afd4da3fa8c 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -46,7 +46,7 @@ use reth_primitives::{ }; use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment}; use reth_stages_types::{StageCheckpoint, StageId}; -use reth_storage_api::TryIntoHistoricalStateProvider; +use reth_storage_api::{StorageChangeSetReader, TryIntoHistoricalStateProvider}; use reth_storage_errors::provider::{ProviderResult, RootMismatch}; use reth_trie::{ prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets}, @@ -1414,6 +1414,21 @@ impl AccountExtReader for DatabaseProvider StorageChangeSetReader for DatabaseProvider { + fn storage_changeset( + &self, + block_number: BlockNumber, + ) -> ProviderResult> { + let range = block_number..=block_number; + let storage_range = BlockNumberAddress::range(range); + self.tx + .cursor_dup_read::()? + .walk_range(storage_range)? + .map(|result| -> ProviderResult<_> { Ok(result?) }) + .collect() + } +} + impl ChangeSetReader for DatabaseProvider { fn account_block_changeset( &self, diff --git a/crates/storage/storage-api/src/storage.rs b/crates/storage/storage-api/src/storage.rs index 91d0bc8c7353..e1443347e4bb 100644 --- a/crates/storage/storage-api/src/storage.rs +++ b/crates/storage/storage-api/src/storage.rs @@ -1,4 +1,5 @@ use alloy_primitives::{Address, BlockNumber, B256}; +use reth_db_api::models::BlockNumberAddress; use reth_primitives::StorageEntry; use reth_storage_errors::provider::ProviderResult; use std::{ @@ -30,3 +31,13 @@ pub trait StorageReader: Send + Sync { range: RangeInclusive, ) -> ProviderResult>>; } + +/// Storage ChangeSet reader +#[auto_impl::auto_impl(&, Arc, Box)] +pub trait StorageChangeSetReader: Send + Sync { + /// Iterate over storage changesets and return the storage state from before this block. + fn storage_changeset( + &self, + block_number: BlockNumber, + ) -> ProviderResult>; +}