diff --git a/Cargo.lock b/Cargo.lock index d013576e75c9..b5cfbf016df9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5050,12 +5050,14 @@ dependencies = [ "reth-blockchain-tree", "reth-consensus-common", "reth-db", + "reth-downloaders", "reth-interfaces", "reth-metrics", "reth-payload-builder", "reth-primitives", "reth-provider", "reth-prune", + "reth-revm", "reth-rpc-types", "reth-stages", "reth-tasks", diff --git a/crates/consensus/beacon/Cargo.toml b/crates/consensus/beacon/Cargo.toml index 5f7ba35dd225..bb676f71d101 100644 --- a/crates/consensus/beacon/Cargo.toml +++ b/crates/consensus/beacon/Cargo.toml @@ -40,5 +40,7 @@ reth-blockchain-tree = { path = "../../blockchain-tree", features = ["test-utils reth-db = { path = "../../storage/db", features = ["test-utils"] } reth-provider = { workspace = true, features = ["test-utils"] } reth-tracing = { path = "../../tracing" } +reth-revm = { path = "../../revm" } +reth-downloaders = { path = "../../net/downloaders" } assert_matches = "1.5" diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index e42bc3c71709..8b41a93abab8 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -1709,20 +1709,30 @@ mod tests { BlockchainTree, ShareableBlockchainTree, }; use reth_db::{test_utils::create_test_rw_db, DatabaseEnv}; + use reth_downloaders::{ + bodies::bodies::BodiesDownloaderBuilder, + headers::reverse_headers::ReverseHeadersDownloaderBuilder, + }; use reth_interfaces::{ + consensus::Consensus, + p2p::either::EitherDownloader, sync::NoopSyncStateUpdater, test_utils::{NoopFullBlockClient, TestConsensus}, }; use reth_payload_builder::test_utils::spawn_test_payload_service; use reth_primitives::{stage::StageCheckpoint, ChainSpec, ChainSpecBuilder, H256, MAINNET}; use reth_provider::{ - providers::BlockchainProvider, test_utils::TestExecutorFactory, BlockWriter, - ProviderFactory, + providers::BlockchainProvider, test_utils::TestExecutorFactory, BlockExecutor, BlockWriter, + ExecutorFactory, ProviderFactory, StateProvider, }; + use reth_revm::Factory; use reth_rpc_types::engine::{ ExecutionPayload, ForkchoiceState, ForkchoiceUpdated, PayloadStatus, }; - use reth_stages::{test_utils::TestStages, ExecOutput, PipelineError, StageError}; + use reth_stages::{ + sets::DefaultStages, stages::HeaderSyncMode, test_utils::TestStages, ExecOutput, + PipelineError, StageError, + }; use reth_tasks::TokioTaskExecutor; use std::{collections::VecDeque, sync::Arc, time::Duration}; use tokio::sync::{ @@ -1730,13 +1740,17 @@ mod tests { watch, }; - type TestBeaconConsensusEngine = BeaconConsensusEngine< + type TestBeaconConsensusEngine = BeaconConsensusEngine< Arc, BlockchainProvider< Arc, - ShareableBlockchainTree, TestConsensus, TestExecutorFactory>, + ShareableBlockchainTree< + Arc, + Arc, + EitherExecutorFactory, + >, >, - NoopFullBlockClient, + Arc>, >; struct TestEnv { @@ -1799,22 +1813,124 @@ mod tests { } } - struct TestConsensusEngineBuilder { + /// Represents either test pipeline outputs, or real pipeline configuration. + #[derive(Default)] + enum TestPipelineConfig { + /// Test pipeline outputs. + Test(VecDeque>), + /// Real pipeline configuration. + #[default] + Real, + } + + /// Represents either test executor results, or real executor configuration. + #[derive(Default)] + enum TestExecutorConfig { + /// Test executor results. + Test(Vec), + /// Real executor configuration. + #[default] + Real, + } + + /// A type that represents one of two possible executor factories. + #[derive(Debug, Clone)] + enum EitherExecutorFactory { + /// The first factory variant + Left(A), + /// The second factory variant + Right(B), + } + + // A type that represents one of two possible BlockExecutor types. + #[derive(Debug)] + enum EitherBlockExecutor { + /// The first executor variant + Left(A), + /// The second executor variant + Right(B), + } + + impl BlockExecutor for EitherBlockExecutor + where + A: BlockExecutor, + B: BlockExecutor, + SP: StateProvider, + { + fn execute( + &mut self, + block: &reth_primitives::Block, + total_difficulty: U256, + senders: Option>, + ) -> Result { + match self { + EitherBlockExecutor::Left(a) => a.execute(block, total_difficulty, senders), + EitherBlockExecutor::Right(b) => b.execute(block, total_difficulty, senders), + } + } + + fn execute_and_verify_receipt( + &mut self, + block: &reth_primitives::Block, + total_difficulty: U256, + senders: Option>, + ) -> Result { + match self { + EitherBlockExecutor::Left(a) => { + a.execute_and_verify_receipt(block, total_difficulty, senders) + } + EitherBlockExecutor::Right(b) => { + b.execute_and_verify_receipt(block, total_difficulty, senders) + } + } + } + } + + impl ExecutorFactory for EitherExecutorFactory + where + A: ExecutorFactory, + B: ExecutorFactory, + { + type Executor = EitherBlockExecutor, B::Executor>; + + fn chain_spec(&self) -> &ChainSpec { + match self { + EitherExecutorFactory::Left(a) => a.chain_spec(), + EitherExecutorFactory::Right(b) => b.chain_spec(), + } + } + + fn with_sp(&self, sp: SP) -> Self::Executor { + match self { + EitherExecutorFactory::Left(a) => EitherBlockExecutor::Left(a.with_sp(sp)), + EitherExecutorFactory::Right(b) => EitherBlockExecutor::Right(b.with_sp(sp)), + } + } + } + + /// A builder for `TestConsensusEngine`, allows configuration of mocked pipeline outputs and + /// mocked executor results. + struct TestConsensusEngineBuilder { chain_spec: Arc, - pipeline_exec_outputs: VecDeque>, - executor_results: Vec, + pipeline_config: TestPipelineConfig, + executor_config: TestExecutorConfig, pipeline_run_threshold: Option, max_block: Option, + client: Option, } - impl TestConsensusEngineBuilder { + impl TestConsensusEngineBuilder + where + Client: HeadersClient + BodiesClient + 'static, + { /// Create a new `TestConsensusEngineBuilder` with the given `ChainSpec`. fn new(chain_spec: Arc) -> Self { Self { chain_spec, - pipeline_exec_outputs: VecDeque::new(), - executor_results: Vec::new(), + pipeline_config: Default::default(), + executor_config: Default::default(), pipeline_run_threshold: None, + client: None, max_block: None, } } @@ -1824,13 +1940,13 @@ mod tests { mut self, pipeline_exec_outputs: VecDeque>, ) -> Self { - self.pipeline_exec_outputs = pipeline_exec_outputs; + self.pipeline_config = TestPipelineConfig::Test(pipeline_exec_outputs); self } /// Set the executor results to use for the test consensus engine. fn with_executor_results(mut self, executor_results: Vec) -> Self { - self.executor_results = executor_results; + self.executor_config = TestExecutorConfig::Test(executor_results); self } @@ -1840,6 +1956,13 @@ mod tests { self } + /// Sets the client to use for network operations. + #[allow(dead_code)] + fn with_client(mut self, client: Client) -> Self { + self.client = Some(client); + self + } + /// Disables blockchain tree driven sync. This is the same as setting the pipeline run /// threshold to 0. fn disable_blockchain_tree_sync(mut self) -> Self { @@ -1848,20 +1971,55 @@ mod tests { } /// Builds the test consensus engine into a `TestConsensusEngine` and `TestEnv`. - fn build(self) -> (TestBeaconConsensusEngine, TestEnv>) { + fn build(self) -> (TestBeaconConsensusEngine, TestEnv>) { reth_tracing::init_test_tracing(); let db = create_test_rw_db(); - let consensus = TestConsensus::default(); + let consensus = Arc::new(TestConsensus::default()); let payload_builder = spawn_test_payload_service(); - let executor_factory = TestExecutorFactory::new(self.chain_spec.clone()); - executor_factory.extend(self.executor_results); + // use either noop client or a user provided client (for example TestFullBlockClient) + let client = Arc::new( + self.client + .map(EitherDownloader::Left) + .unwrap_or_else(|| EitherDownloader::Right(NoopFullBlockClient::default())), + ); + + // use either test executor or real executor + let executor_factory = match self.executor_config { + TestExecutorConfig::Test(results) => { + let executor_factory = TestExecutorFactory::new(self.chain_spec.clone()); + executor_factory.extend(results); + EitherExecutorFactory::Left(executor_factory) + } + TestExecutorConfig::Real => { + EitherExecutorFactory::Right(Factory::new(self.chain_spec.clone())) + } + }; // Setup pipeline let (tip_tx, tip_rx) = watch::channel(H256::default()); - let mut pipeline = Pipeline::builder() - .add_stages(TestStages::new(self.pipeline_exec_outputs, Default::default())) - .with_tip_sender(tip_tx); + let mut pipeline = match self.pipeline_config { + TestPipelineConfig::Test(outputs) => Pipeline::builder() + .add_stages(TestStages::new(outputs, Default::default())) + .with_tip_sender(tip_tx), + TestPipelineConfig::Real => { + let header_downloader = ReverseHeadersDownloaderBuilder::default() + .build(client.clone(), consensus.clone()) + .into_task(); + + let body_downloader = BodiesDownloaderBuilder::default() + .build(client.clone(), consensus.clone(), db.clone()) + .into_task(); + + Pipeline::builder().add_stages(DefaultStages::new( + HeaderSyncMode::Tip(tip_rx.clone()), + Arc::clone(&consensus) as Arc, + header_downloader, + body_downloader, + executor_factory.clone(), + )) + } + }; if let Some(max_block) = self.max_block { pipeline = pipeline.with_max_block(max_block); @@ -1889,7 +2047,7 @@ mod tests { let pruner = Pruner::new(5, 0); let (mut engine, handle) = BeaconConsensusEngine::new( - NoopFullBlockClient::default(), + client, pipeline, blockchain_provider, Box::::default(), @@ -1911,8 +2069,8 @@ mod tests { } } - fn spawn_consensus_engine( - engine: TestBeaconConsensusEngine, + fn spawn_consensus_engine( + engine: TestBeaconConsensusEngine, ) -> oneshot::Receiver> { let (tx, rx) = oneshot::channel(); tokio::spawn(async move { @@ -1933,11 +2091,12 @@ mod tests { .build(), ); - let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) - .with_pipeline_exec_outputs(VecDeque::from([Err(StageError::ChannelClosed)])) - .disable_blockchain_tree_sync() - .with_max_block(1) - .build(); + let (consensus_engine, env) = + TestConsensusEngineBuilder::::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Err(StageError::ChannelClosed)])) + .disable_blockchain_tree_sync() + .with_max_block(1) + .build(); let res = spawn_consensus_engine(consensus_engine); @@ -1964,11 +2123,12 @@ mod tests { .build(), ); - let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) - .with_pipeline_exec_outputs(VecDeque::from([Err(StageError::ChannelClosed)])) - .disable_blockchain_tree_sync() - .with_max_block(1) - .build(); + let (consensus_engine, env) = + TestConsensusEngineBuilder::::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Err(StageError::ChannelClosed)])) + .disable_blockchain_tree_sync() + .with_max_block(1) + .build(); let mut rx = spawn_consensus_engine(consensus_engine); @@ -2026,14 +2186,15 @@ mod tests { .build(), ); - let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) - .with_pipeline_exec_outputs(VecDeque::from([ - Ok(ExecOutput { checkpoint: StageCheckpoint::new(1), done: true }), - Err(StageError::ChannelClosed), - ])) - .disable_blockchain_tree_sync() - .with_max_block(2) - .build(); + let (consensus_engine, env) = + TestConsensusEngineBuilder::::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([ + Ok(ExecOutput { checkpoint: StageCheckpoint::new(1), done: true }), + Err(StageError::ChannelClosed), + ])) + .disable_blockchain_tree_sync() + .with_max_block(2) + .build(); let rx = spawn_consensus_engine(consensus_engine); @@ -2061,14 +2222,15 @@ mod tests { .build(), ); - let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) - .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { - checkpoint: StageCheckpoint::new(max_block), - done: true, - })])) - .with_max_block(max_block) - .disable_blockchain_tree_sync() - .build(); + let (consensus_engine, env) = + TestConsensusEngineBuilder::::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { + checkpoint: StageCheckpoint::new(max_block), + done: true, + })])) + .with_max_block(max_block) + .disable_blockchain_tree_sync() + .build(); let rx = spawn_consensus_engine(consensus_engine); @@ -2110,12 +2272,13 @@ mod tests { .build(), ); - let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) - .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { - checkpoint: StageCheckpoint::new(0), - done: true, - })])) - .build(); + let (consensus_engine, env) = + TestConsensusEngineBuilder::::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { + checkpoint: StageCheckpoint::new(0), + done: true, + })])) + .build(); let mut engine_rx = spawn_consensus_engine(consensus_engine); @@ -2141,12 +2304,13 @@ mod tests { .build(), ); - let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) - .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { - checkpoint: StageCheckpoint::new(0), - done: true, - })])) - .build(); + let (consensus_engine, env) = + TestConsensusEngineBuilder::::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { + checkpoint: StageCheckpoint::new(0), + done: true, + })])) + .build(); let genesis = random_block(&mut rng, 0, None, None, Some(0)); let block1 = random_block(&mut rng, 1, Some(genesis.hash), None, Some(0)); @@ -2190,13 +2354,14 @@ mod tests { .build(), ); - let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) - .with_pipeline_exec_outputs(VecDeque::from([ - Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }), - Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }), - ])) - .disable_blockchain_tree_sync() - .build(); + let (consensus_engine, env) = + TestConsensusEngineBuilder::::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([ + Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }), + Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }), + ])) + .disable_blockchain_tree_sync() + .build(); let genesis = random_block(&mut rng, 0, None, None, Some(0)); let block1 = random_block(&mut rng, 1, Some(genesis.hash), None, Some(0)); @@ -2240,13 +2405,14 @@ mod tests { .build(), ); - let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) - .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { - checkpoint: StageCheckpoint::new(0), - done: true, - })])) - .disable_blockchain_tree_sync() - .build(); + let (consensus_engine, env) = + TestConsensusEngineBuilder::::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { + checkpoint: StageCheckpoint::new(0), + done: true, + })])) + .disable_blockchain_tree_sync() + .build(); let genesis = random_block(&mut rng, 0, None, None, Some(0)); let block1 = random_block(&mut rng, 1, Some(genesis.hash), None, Some(0)); @@ -2278,12 +2444,13 @@ mod tests { .build(), ); - let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) - .with_pipeline_exec_outputs(VecDeque::from([ - Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }), - Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }), - ])) - .build(); + let (consensus_engine, env) = + TestConsensusEngineBuilder::::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([ + Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }), + Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }), + ])) + .build(); let genesis = random_block(&mut rng, 0, None, None, Some(0)); let mut block1 = random_block(&mut rng, 1, Some(genesis.hash), None, Some(0)); @@ -2331,12 +2498,13 @@ mod tests { .build(), ); - let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) - .with_pipeline_exec_outputs(VecDeque::from([ - Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }), - Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }), - ])) - .build(); + let (consensus_engine, env) = + TestConsensusEngineBuilder::::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([ + Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }), + Ok(ExecOutput { checkpoint: StageCheckpoint::new(0), done: true }), + ])) + .build(); let genesis = random_block(&mut rng, 0, None, None, Some(0)); let block1 = random_block(&mut rng, 1, Some(genesis.hash), None, Some(0)); @@ -2378,12 +2546,13 @@ mod tests { .build(), ); - let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) - .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { - checkpoint: StageCheckpoint::new(0), - done: true, - })])) - .build(); + let (consensus_engine, env) = + TestConsensusEngineBuilder::::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { + checkpoint: StageCheckpoint::new(0), + done: true, + })])) + .build(); let mut engine_rx = spawn_consensus_engine(consensus_engine); @@ -2413,12 +2582,13 @@ mod tests { .build(), ); - let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) - .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { - checkpoint: StageCheckpoint::new(0), - done: true, - })])) - .build(); + let (consensus_engine, env) = + TestConsensusEngineBuilder::::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { + checkpoint: StageCheckpoint::new(0), + done: true, + })])) + .build(); let genesis = random_block(&mut rng, 0, None, None, Some(0)); let block1 = random_block(&mut rng, 1, Some(genesis.hash), None, Some(0)); @@ -2463,12 +2633,13 @@ mod tests { .build(), ); - let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) - .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { - checkpoint: StageCheckpoint::new(0), - done: true, - })])) - .build(); + let (consensus_engine, env) = + TestConsensusEngineBuilder::::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { + checkpoint: StageCheckpoint::new(0), + done: true, + })])) + .build(); let genesis = random_block(&mut rng, 0, None, None, Some(0)); @@ -2519,13 +2690,14 @@ mod tests { .build(), ); - let (consensus_engine, env) = TestConsensusEngineBuilder::new(chain_spec.clone()) - .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { - checkpoint: StageCheckpoint::new(0), - done: true, - })])) - .with_executor_results(Vec::from([exec_result2])) - .build(); + let (consensus_engine, env) = + TestConsensusEngineBuilder::::new(chain_spec.clone()) + .with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput { + checkpoint: StageCheckpoint::new(0), + done: true, + })])) + .with_executor_results(Vec::from([exec_result2])) + .build(); insert_blocks( env.db.as_ref(),