Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pruning): prune ChangeSets & History during pipeline #3728

Merged
merged 29 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
90ae3c3
prune changesets when writing to db from post state
joshieDo Jul 11, 2023
e980b15
prune history on their stages
joshieDo Jul 11, 2023
1899c85
add deserializer helper to acc/storage history pruning
joshieDo Jul 11, 2023
2f05db3
add deserialize_opt_prune_mode_unsupported_full test
joshieDo Jul 11, 2023
880d2f6
Merge remote-tracking branch 'origin/main' into pruning/hist
joshieDo Jul 15, 2023
5466e9b
add changeset checks to execution prune test
joshieDo Jul 16, 2023
1ba88ba
add history stages to the prune test
joshieDo Jul 16, 2023
b4b013e
move test_prune
joshieDo Jul 16, 2023
87f475e
rename to gen_prune_methods
joshieDo Jul 16, 2023
5b41c0d
Merge remote-tracking branch 'origin/main' into pruning/hist
joshieDo Jul 25, 2023
bf93292
dedup test struct
joshieDo Jul 25, 2023
f0cf8f6
fix prune_target_block when PruneMode::Before
joshieDo Jul 25, 2023
2539015
prune_target_block_ only returns None on invalid configuration
joshieDo Jul 25, 2023
e30447d
history prune parts require 64 min blocks
joshieDo Jul 25, 2023
1e96152
fix up test_prune
joshieDo Jul 25, 2023
792ef89
add an explanation to test_prune
joshieDo Jul 25, 2023
d451ddf
remove dup test
joshieDo Jul 26, 2023
c89a8da
prune_target_block_* returns err if badly configured
joshieDo Jul 26, 2023
c1fcc86
revert serde_helper change
joshieDo Jul 26, 2023
e380fff
add missing doc
joshieDo Jul 26, 2023
2e707df
rename prune_targets to prune_modes
joshieDo Jul 26, 2023
b4ff2d1
add explanation on test_prune
joshieDo Jul 26, 2023
28d6f67
check if target has been reached after prune check on history stages
joshieDo Jul 26, 2023
a483c8b
run from scratch hashing and merkle stages if there's required change…
joshieDo Jul 27, 2023
d46a5bc
remove dbg line
joshieDo Jul 27, 2023
6a33867
fix stage test runners
joshieDo Jul 27, 2023
84b0f27
Merge remote-tracking branch 'origin/main' into pruning/hist
joshieDo Jul 27, 2023
0e88fc9
fix testrunner
joshieDo Jul 28, 2023
c7d83ec
Merge remote-tracking branch 'origin/main' into pruning/hist
joshieDo Jul 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,12 +760,17 @@ impl Command {
.set(AccountHashingStage::new(
stage_config.account_hashing.clean_threshold,
stage_config.account_hashing.commit_threshold,
config.prune.map(|prune| prune.parts).unwrap_or_default(),
))
.set(StorageHashingStage::new(
stage_config.storage_hashing.clean_threshold,
stage_config.storage_hashing.commit_threshold,
config.prune.map(|prune| prune.parts).unwrap_or_default(),
))
.set(MerkleStage::new_execution(
stage_config.merkle.clean_threshold,
config.prune.map(|prune| prune.parts).unwrap_or_default(),
))
.set(MerkleStage::new_execution(stage_config.merkle.clean_threshold))
.set(TransactionLookupStage::new(stage_config.transaction_lookup.commit_threshold))
.set(IndexAccountHistoryStage::new(
stage_config.index_account_history.commit_threshold,
Expand Down
25 changes: 17 additions & 8 deletions bin/reth/src/stage/dump/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,22 @@ async fn unwind_and_copy<DB: Database>(

// Bring hashes to TO

AccountHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX }
.execute(&provider, execute_input)
.await
.unwrap();
StorageHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX }
.execute(&provider, execute_input)
.await
.unwrap();
AccountHashingStage {
clean_threshold: u64::MAX,
commit_threshold: u64::MAX,
prune_modes: PruneModes::none(),
}
.execute(&provider, execute_input)
.await
.unwrap();
StorageHashingStage {
clean_threshold: u64::MAX,
commit_threshold: u64::MAX,
prune_modes: PruneModes::none(),
}
.execute(&provider, execute_input)
.await
.unwrap();

let unwind_inner_tx = provider.into_tx();

Expand Down Expand Up @@ -124,6 +132,7 @@ async fn dry_run<DB: Database>(
clean_threshold: u64::MAX, /* Forces updating the root instead of calculating
* from
* scratch */
prune_modes: Default::default(),
}
.execute(
&provider,
Expand Down
22 changes: 16 additions & 6 deletions bin/reth/src/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,22 @@ impl Command {
)
}
StageEnum::TxLookup => (Box::new(TransactionLookupStage::new(batch_size)), None),
StageEnum::AccountHashing => {
(Box::new(AccountHashingStage::new(1, batch_size)), None)
}
StageEnum::StorageHashing => {
(Box::new(StorageHashingStage::new(1, batch_size)), None)
}
StageEnum::AccountHashing => (
Box::new(AccountHashingStage::new(
1,
batch_size,
config.prune.map(|prune| prune.parts).unwrap_or_default(),
)),
None,
),
StageEnum::StorageHashing => (
Box::new(StorageHashingStage::new(
1,
batch_size,
config.prune.map(|prune| prune.parts).unwrap_or_default(),
)),
None,
),
StageEnum::Merkle => (
Box::new(MerkleStage::default_execution()),
Some(Box::new(MerkleStage::default_unwind())),
Expand Down
2 changes: 1 addition & 1 deletion crates/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub use net::{
SEPOLIA_BOOTNODES,
};
pub use peer::{PeerId, WithPeerId};
pub use prune::{PruneCheckpoint, PruneMode, PruneModes, PrunePart};
pub use prune::{PruneCheckpoint, PruneMode, PruneModes, PrunePart, PrunePartError};
pub use receipt::{Receipt, ReceiptWithBloom, ReceiptWithBloomRef};
pub use revm_primitives::JumpMap;
pub use serde_helper::JsonU256;
Expand Down
2 changes: 1 addition & 1 deletion crates/primitives/src/prune/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ mod target;

pub use checkpoint::PruneCheckpoint;
pub use mode::PruneMode;
pub use part::PrunePart;
pub use part::{PrunePart, PrunePartError};
pub use target::PruneModes;
12 changes: 11 additions & 1 deletion crates/primitives/src/prune/part.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use derive_more::Display;
use reth_codecs::{main_codec, Compact};
use thiserror::Error;

/// Part of the data that can be pruned.
#[main_codec]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
#[derive(Debug, Display, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum PrunePart {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shekhirin what do you think about renaming this to PruneStep?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, "Step" sounds like a consecutive action like "Stage" in the context of pipeline, while in pruning you can enable different parts and it doesn't really matter in which order and composition they are executed. But I do agree that "Part" is weird too.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe PruneComponent?

/// Prune part responsible for the `TxSenders` table.
SenderRecovery,
Expand All @@ -16,6 +18,14 @@ pub enum PrunePart {
StorageHistory,
}

/// PrunePart error type.
#[derive(Debug, Error)]
pub enum PrunePartError {
/// Invalid configuration of a prune part.
#[error("The configuration provided for {0} is invalid.")]
Configuration(PrunePart),
}

#[cfg(test)]
impl Default for PrunePart {
fn default() -> Self {
Expand Down
40 changes: 26 additions & 14 deletions crates/primitives/src/prune/target.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::{serde_helper::deserialize_opt_prune_mode_with_min_blocks, BlockNumber, PruneMode};
use crate::{
prune::PrunePartError, serde_helper::deserialize_opt_prune_mode_with_min_blocks, BlockNumber,
PruneMode, PrunePart,
};
use paste::paste;
use serde::{Deserialize, Serialize};

Expand All @@ -19,10 +22,16 @@ pub struct PruneModes {
)]
pub receipts: Option<PruneMode>,
/// Account History pruning configuration.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<64, _>"
)]
pub account_history: Option<PruneMode>,
/// Storage History pruning configuration.
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<64, _>"
)]
pub storage_history: Option<PruneMode>,
}

Expand Down Expand Up @@ -51,12 +60,15 @@ macro_rules! impl_prune_parts {
$human_part,
" pruning needs to be done, inclusive, according to the provided tip."
)]
pub fn [<prune_target_block_ $part>](&self, tip: BlockNumber) -> Option<(BlockNumber, PruneMode)> {
self.$part.as_ref().and_then(|mode| {
self.prune_target_block(mode, tip, $min_blocks).map(|block| {
(block, *mode)
})
})
pub fn [<prune_target_block_ $part>](&self, tip: BlockNumber) -> Result<Option<(BlockNumber, PruneMode)>, PrunePartError> {
match &self.$part {
Some(mode) =>
match self.prune_target_block(mode, tip, $min_blocks) {
Some(block) => Ok(Some((block, *mode))),
None => Err(PrunePartError::Configuration(PrunePart::[<$human_part>]))
}
None => Ok(None)
}
}
}
)+
Expand Down Expand Up @@ -107,17 +119,17 @@ impl PruneModes {
Some(tip.saturating_sub(*distance))
}
PruneMode::Before(n) if tip.saturating_sub(*n) >= min_blocks.unwrap_or_default() => {
Some(*n)
Some(n.saturating_sub(1))
shekhirin marked this conversation as resolved.
Show resolved Hide resolved
}
_ => None,
}
}

impl_prune_parts!(
(sender_recovery, "Sender Recovery", None),
(transaction_lookup, "Transaction Lookup", None),
(sender_recovery, "SenderRecovery", None),
(transaction_lookup, "TransactionLookup", None),
(receipts, "Receipts", Some(64)),
(account_history, "Account History", None),
(storage_history, "Storage History", None)
(account_history, "AccountHistory", Some(64)),
(storage_history, "StorageHistory", Some(64))
);
}
3 changes: 3 additions & 0 deletions crates/prune/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ use thiserror::Error;

#[derive(Error, Debug)]
pub enum PrunerError {
#[error(transparent)]
PrunePart(#[from] reth_primitives::PrunePartError),

#[error("Inconsistent data: {0}")]
InconsistentData(&'static str),

Expand Down
4 changes: 2 additions & 2 deletions crates/prune/src/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ impl<DB: Database> Pruner<DB> {
let provider = self.provider_factory.provider_rw()?;

if let Some((to_block, prune_mode)) =
self.modes.prune_target_block_receipts(tip_block_number)
self.modes.prune_target_block_receipts(tip_block_number)?
{
self.prune_receipts(&provider, to_block, prune_mode)?;
}

if let Some((to_block, prune_mode)) =
self.modes.prune_target_block_transaction_lookup(tip_block_number)
self.modes.prune_target_block_transaction_lookup(tip_block_number)?
{
self.prune_transaction_lookup(&provider, to_block, prune_mode)?;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/stages/benches/criterion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn merkle(c: &mut Criterion) {
// don't need to run each stage for that many times
group.sample_size(10);

let stage = MerkleStage::Both { clean_threshold: u64::MAX };
let stage = MerkleStage::Both { clean_threshold: u64::MAX, prune_modes: Default::default() };
measure_stage(
&mut group,
setup::unwind_hashes,
Expand All @@ -104,7 +104,7 @@ fn merkle(c: &mut Criterion) {
"Merkle-incremental".to_string(),
);

let stage = MerkleStage::Both { clean_threshold: 0 };
let stage = MerkleStage::Both { clean_threshold: 0, prune_modes: Default::default() };
measure_stage(
&mut group,
setup::unwind_hashes,
Expand Down
3 changes: 3 additions & 0 deletions crates/stages/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub enum StageError {
#[source]
error: executor::BlockExecutionError,
},
/// Invalid pruning configuration
#[error(transparent)]
PruningConfiguration(#[from] reth_primitives::PrunePartError),
/// Invalid checkpoint passed to the stage
#[error("Invalid stage checkpoint: {0}")]
StageCheckpoint(u64),
Expand Down
93 changes: 5 additions & 88 deletions crates/stages/src/stages/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ pub struct ExecutionStage<EF: ExecutorFactory> {
/// The commit thresholds of the execution stage.
thresholds: ExecutionStageThresholds,
/// Pruning configuration.
prune_targets: PruneModes,
prune_modes: PruneModes,
}

impl<EF: ExecutorFactory> ExecutionStage<EF> {
/// Create new execution stage with specified config.
pub fn new(
executor_factory: EF,
thresholds: ExecutionStageThresholds,
prune_targets: PruneModes,
prune_modes: PruneModes,
) -> Self {
Self { metrics_tx: None, executor_factory, thresholds, prune_targets }
Self { metrics_tx: None, executor_factory, thresholds, prune_modes }
}

/// Create an execution stage with the provided executor factory.
Expand Down Expand Up @@ -110,7 +110,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {

// Execute block range
let mut state = PostState::default();
state.add_prune_targets(self.prune_targets);
state.add_prune_modes(self.prune_modes);

for block_number in start_block..=max_block {
let td = provider
Expand Down Expand Up @@ -425,8 +425,7 @@ mod tests {
use reth_db::{models::AccountBeforeTx, test_utils::create_test_rw_db};
use reth_primitives::{
hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Bytecode,
ChainSpecBuilder, PruneMode, PruneModes, SealedBlock, StorageEntry, H160, H256, MAINNET,
U256,
ChainSpecBuilder, PruneModes, SealedBlock, StorageEntry, H160, H256, MAINNET, U256,
};
use reth_provider::{AccountReader, BlockWriter, ProviderFactory, ReceiptProvider};
use reth_revm::Factory;
Expand Down Expand Up @@ -894,86 +893,4 @@ mod tests {
]
);
}

#[tokio::test]
async fn test_prune() {
let test_tx = TestTransaction::default();
let factory = Arc::new(ProviderFactory::new(test_tx.tx.as_ref(), MAINNET.clone()));

let provider = factory.provider_rw().unwrap();
let input = ExecInput {
target: Some(1),
/// The progress of this stage the last time it was executed.
checkpoint: None,
};
let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice();
let genesis = SealedBlock::decode(&mut genesis_rlp).unwrap();
let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice();
let block = SealedBlock::decode(&mut block_rlp).unwrap();
provider.insert_block(genesis, None).unwrap();
provider.insert_block(block.clone(), None).unwrap();
provider.commit().unwrap();

// insert pre state
let provider = factory.provider_rw().unwrap();
let code = hex!("5a465a905090036002900360015500");
let code_hash = keccak256(hex!("5a465a905090036002900360015500"));
provider
.tx_ref()
.put::<tables::PlainAccountState>(
H160(hex!("1000000000000000000000000000000000000000")),
Account { nonce: 0, balance: U256::ZERO, bytecode_hash: Some(code_hash) },
)
.unwrap();
provider
.tx_ref()
.put::<tables::PlainAccountState>(
H160(hex!("a94f5374fce5edbc8e2a8697c15331677e6ebf0b")),
Account {
nonce: 0,
balance: U256::from(0x3635c9adc5dea00000u128),
bytecode_hash: None,
},
)
.unwrap();
provider
.tx_ref()
.put::<tables::Bytecodes>(code_hash, Bytecode::new_raw(code.to_vec().into()))
.unwrap();
provider.commit().unwrap();

let check_pruning = |factory: Arc<ProviderFactory<_>>,
prune_targets: PruneModes,
expect_num_receipts: usize| async move {
let provider = factory.provider_rw().unwrap();

let mut execution_stage = ExecutionStage::new(
Factory::new(Arc::new(ChainSpecBuilder::mainnet().berlin_activated().build())),
ExecutionStageThresholds { max_blocks: Some(100), max_changes: None },
prune_targets,
);

execution_stage.execute(&provider, input).await.unwrap();
assert_eq!(
provider.receipts_by_block(1.into()).unwrap().unwrap().len(),
expect_num_receipts
);
};

let mut prune = PruneModes::none();

check_pruning(factory.clone(), prune, 1).await;

prune.receipts = Some(PruneMode::Full);
check_pruning(factory.clone(), prune, 0).await;

prune.receipts = Some(PruneMode::Before(1));
check_pruning(factory.clone(), prune, 1).await;

prune.receipts = Some(PruneMode::Before(2));
check_pruning(factory.clone(), prune, 0).await;

prune.receipts = Some(PruneMode::Distance(0));
check_pruning(factory.clone(), prune, 1).await;
}
}
Loading