Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(tree): make state methods work for historical blocks #11265

Merged
merged 10 commits into from
Oct 7, 2024
239 changes: 231 additions & 8 deletions crates/storage/provider/src/providers/blockchain_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,27 @@ use reth_chain_state::{
BlockState, CanonicalInMemoryState, ForkChoiceNotifications, ForkChoiceSubscriptions,
MemoryOverlayStateProvider,
};
use reth_chainspec::ChainInfo;
use reth_chainspec::{ChainInfo, EthereumHardforks};
use reth_db::models::BlockNumberAddress;
use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices};
use reth_evm::ConfigureEvmEnv;
use reth_execution_types::ExecutionOutcome;
use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
use reth_node_types::NodeTypesWithDB;
use reth_primitives::{
Account, Block, BlockWithSenders, EthereumHardforks, Header, Receipt, SealedBlock,
SealedBlockWithSenders, SealedHeader, TransactionMeta, TransactionSigned,
TransactionSignedNoHash, Withdrawal, Withdrawals,
Account, Block, BlockWithSenders, Header, Receipt, SealedBlock, SealedBlockWithSenders,
SealedHeader, StorageEntry, TransactionMeta, TransactionSigned, TransactionSignedNoHash,
Withdrawal, Withdrawals,
};
use reth_prune_types::{PruneCheckpoint, PruneSegment};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_api::StorageChangeSetReader;
use reth_storage_errors::provider::ProviderResult;
use revm::primitives::{BlockEnv, CfgEnvWithHandlerCfg};
use revm::{
db::states::PlainStorageRevert,
primitives::{BlockEnv, CfgEnvWithHandlerCfg},
};
use std::{
collections::{hash_map, HashMap},
ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
sync::Arc,
time::Instant,
Expand Down Expand Up @@ -122,6 +128,145 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
(start, end)
}

/// Return the last N blocks of state, recreating the [`ExecutionOutcome`].
///
/// If the range is empty, or there are no blocks for the given range, then this returns `None`.
pub fn get_state(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

akin to checks in historical provider this method should check lowest available history blocks in case of a pruned node

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now done in ChangeSetReader and StorageChangeSetReader, ensuring we don't create a db provider without checking prune checkpoints first: d7e6542

Copy link
Collaborator

@joshieDo joshieDo Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • can we add to the docs that it's only for historical (or persistence storage), if thats the case? doesn't seem to be the case
  • this is creating a ton of new database transactions. a lil worried that this might also have some unwanted effects since they do not share the same view. if this is only for historical (is it?), maybe it would be better moved to DatabaseProvider?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add to the docs that it's only for historical (or persistence storage), if thats the case?

Yes, this is only when we're using a databaseprovider, so only for persisted storage

this is creating a ton of new database transactions. a lil worried that this might also have some unwanted effects since they do not share the same view. if this is only for historical (is it?), maybe it would be better moved to DatabaseProvider?

You mean the provider calls which hit the DB? ie:

            let block_body = self
                .block_body_indices(block_num)?
                .ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
// ^ one db tx
            let changeset =
                self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
// ^ may or may not be one db tx
            let changeset = self.storage_changeset(block_num)?;
// ^ also may or may not be one db tx

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Rjected a valid point from @joshieDo here. in scope of this function we perform a ton of lookups by block number, but there is no consistency guarantee here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now uses one database tx

&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Option<ExecutionOutcome>> {
if range.is_empty() {
return Ok(None)
}
let start_block_number = *range.start();
let end_block_number = *range.end();

// We are not removing block meta as it is used to get block changesets.
let mut block_bodies = Vec::new();
for block_num in range.clone() {
let block_body = self
.block_body_indices(block_num)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
block_bodies.push((block_num, block_body))
}

// get transaction receipts
let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
else {
return Ok(None)
};
let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
return Ok(None)
};

let mut account_changeset = Vec::new();
for block_num in range.clone() {
let changeset =
self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
account_changeset.extend(changeset);
}

let mut storage_changeset = Vec::new();
for block_num in range {
let changeset = self.storage_changeset(block_num)?;
storage_changeset.extend(changeset);
}

let (state, reverts) =
self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;

let mut receipt_iter =
self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();

let mut receipts = Vec::with_capacity(block_bodies.len());
// loop break if we are at the end of the blocks.
for (_, block_body) in block_bodies {
let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
for tx_num in block_body.tx_num_range() {
let receipt =
receipt_iter.next().ok_or(ProviderError::ReceiptNotFound(tx_num.into()))?;
block_receipts.push(Some(receipt));
}
receipts.push(block_receipts);
}

Ok(Some(ExecutionOutcome::new_init(
state,
reverts,
// We skip new contracts since we never delete them from the database
Vec::new(),
Rjected marked this conversation as resolved.
Show resolved Hide resolved
receipts.into(),
start_block_number,
Vec::new(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what should we do with requests here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should really be fetching them using the RequestsProvider, but that method requires a timestamp, which I'm not sure how to deal with yet

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whether we actually need to store requests is still tbd so imo this is fine, but please track so we don't forget

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)))
}

/// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
/// [`reth_db::PlainAccountState`] and [`reth_db::PlainStorageState`] tables, based on the given
/// storage and account changesets.
fn populate_bundle_state(
&self,
account_changeset: Vec<(u64, AccountBeforeTx)>,
storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
block_range_end: BlockNumber,
) -> ProviderResult<(BundleStateInit, RevertsInit)> {
let mut state: BundleStateInit = HashMap::new();
let mut reverts: RevertsInit = HashMap::new();
let state_provider = self.state_by_block_number_or_tag(block_range_end.into())?;

// add account changeset changes
for (block_number, account_before) in account_changeset.into_iter().rev() {
let AccountBeforeTx { info: old_info, address } = account_before;
match state.entry(address) {
hash_map::Entry::Vacant(entry) => {
let new_info = state_provider.basic_account(address)?;
entry.insert((old_info, new_info, HashMap::new()));
}
hash_map::Entry::Occupied(mut entry) => {
// overwrite old account state.
entry.get_mut().0 = old_info;
}
}
// insert old info into reverts.
reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
}

// add storage changeset changes
for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
let BlockNumberAddress((block_number, address)) = block_and_address;
// get account state or insert from plain state.
let account_state = match state.entry(address) {
hash_map::Entry::Vacant(entry) => {
let present_info = state_provider.basic_account(address)?;
entry.insert((present_info, present_info, HashMap::new()))
}
hash_map::Entry::Occupied(entry) => entry.into_mut(),
};

// match storage.
match account_state.2.entry(old_storage.key) {
hash_map::Entry::Vacant(entry) => {
let new_storage_value =
state_provider.storage(address, old_storage.key)?.unwrap_or_default();
entry.insert((old_storage.value, new_storage_value));
}
hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().0 = old_storage.value;
}
};

reverts
.entry(block_number)
.or_default()
.entry(address)
.or_default()
.1
.push(old_storage);
}

Ok((state, reverts))
}

/// Fetches a range of data from both in-memory state and persistent storage while a predicate
/// is met.
///
Expand Down Expand Up @@ -1420,6 +1565,57 @@ impl<N: NodeTypesWithDB> ForkChoiceSubscriptions for BlockchainProvider2<N> {
}
}

impl<N: ProviderNodeTypes> StorageChangeSetReader for BlockchainProvider2<N> {
fn storage_changeset(
&self,
block_number: BlockNumber,
) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
if let Some(state) = self.canonical_in_memory_state.state_by_number(block_number) {
let changesets = state
.block()
.execution_output
.bundle
.reverts
.clone()
.into_plain_state_reverts()
.storage
.into_iter()
.flatten()
.flat_map(|revert: PlainStorageRevert| {
revert.storage_revert.into_iter().map(move |(key, value)| {
Comment on lines +1574 to +1585
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-.-

(
BlockNumberAddress((block_number, revert.address)),
StorageEntry { key: key.into(), value: value.to_previous_value() },
)
})
})
.collect();
Ok(changesets)
} else {
// Perform checks on whether or not changesets exist for the block.
let provider = self.database.provider()?;

// No prune checkpoint means history should exist and we should `unwrap_or(true)`
let storage_history_exists = provider
.get_prune_checkpoint(PruneSegment::StorageHistory)?
.and_then(|checkpoint| {
// return true if the block number is ahead of the prune checkpoint.
//
// The checkpoint stores the highest pruned block number, so we should make
// sure the block_number is strictly greater.
checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
})
.unwrap_or(true);

if !storage_history_exists {
return Err(ProviderError::StateAtBlockPruned(block_number))
}

provider.storage_changeset(block_number)
}
}
}

impl<N: ProviderNodeTypes> ChangeSetReader for BlockchainProvider2<N> {
fn account_block_changeset(
&self,
Expand All @@ -1440,7 +1636,25 @@ impl<N: ProviderNodeTypes> ChangeSetReader for BlockchainProvider2<N> {
.collect();
Ok(changesets)
} else {
self.database.provider()?.account_block_changeset(block_number)
// Perform checks on whether or not changesets exist for the block.
let provider = self.database.provider()?;
// No prune checkpoint means history should exist and we should `unwrap_or(true)`
let account_history_exists = provider
.get_prune_checkpoint(PruneSegment::AccountHistory)?
.and_then(|checkpoint| {
// return true if the block number is ahead of the prune checkpoint.
//
// The checkpoint stores the highest pruned block number, so we should make
// sure the block_number is strictly greater.
checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
})
.unwrap_or(true);

if !account_history_exists {
return Err(ProviderError::StateAtBlockPruned(block_number))
}

provider.account_block_changeset(block_number)
}
}
}
Expand All @@ -1455,12 +1669,21 @@ impl<N: ProviderNodeTypes> AccountReader for BlockchainProvider2<N> {
}

impl<N: ProviderNodeTypes> StateReader for BlockchainProvider2<N> {
/// Re-constructs the [`ExecutionOutcome`] from in-memory and database state, if necessary.
///
/// If data for the block does not exist, this will return [`None`].
///
/// NOTE: This cannot be called safely in a loop outside of the blockchain tree thread. This is
/// because the [`CanonicalInMemoryState`] could change during a reorg, causing results to be
/// inconsistent. Currently this can safely be called within the blockchain tree thread,
/// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the
/// first place.
fn get_state(&self, block: BlockNumber) -> ProviderResult<Option<ExecutionOutcome>> {
if let Some(state) = self.canonical_in_memory_state.state_by_number(block) {
let state = state.block_ref().execution_outcome().clone();
Ok(Some(state))
} else {
self.database.provider()?.get_state(block..=block)
self.get_state(block..=block)
}
}
}
Expand Down
17 changes: 16 additions & 1 deletion crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use reth_primitives::{
};
use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_api::TryIntoHistoricalStateProvider;
use reth_storage_api::{StorageChangeSetReader, TryIntoHistoricalStateProvider};
use reth_storage_errors::provider::{ProviderResult, RootMismatch};
use reth_trie::{
prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
Expand Down Expand Up @@ -1414,6 +1414,21 @@ impl<TX: DbTx, Spec: Send + Sync> AccountExtReader for DatabaseProvider<TX, Spec
}
}

impl<TX: DbTx, Spec: Send + Sync> StorageChangeSetReader for DatabaseProvider<TX, Spec> {
fn storage_changeset(
&self,
block_number: BlockNumber,
) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
let range = block_number..=block_number;
let storage_range = BlockNumberAddress::range(range);
self.tx
.cursor_dup_read::<tables::StorageChangeSets>()?
.walk_range(storage_range)?
.map(|result| -> ProviderResult<_> { Ok(result?) })
.collect()
}
}

impl<TX: DbTx, Spec: Send + Sync> ChangeSetReader for DatabaseProvider<TX, Spec> {
fn account_block_changeset(
&self,
Expand Down
11 changes: 11 additions & 0 deletions crates/storage/storage-api/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use alloy_primitives::{Address, BlockNumber, B256};
use reth_db_api::models::BlockNumberAddress;
use reth_primitives::StorageEntry;
use reth_storage_errors::provider::ProviderResult;
use std::{
Expand Down Expand Up @@ -30,3 +31,13 @@ pub trait StorageReader: Send + Sync {
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>>;
}

/// Storage ChangeSet reader
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait StorageChangeSetReader: Send + Sync {
/// Iterate over storage changesets and return the storage state from before this block.
fn storage_changeset(
&self,
block_number: BlockNumber,
) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>>;
}
Loading