From cd46651dac014e80def2e70701363c1e05546984 Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Thu, 21 Nov 2024 18:19:39 +0100 Subject: [PATCH] test(sync): add test cases for sync with recoverable and fatal errors --- crates/pathfinder/src/bin/pathfinder/main.rs | 4 +- crates/pathfinder/src/sync.rs | 494 ++++++++++++++++--- crates/pathfinder/src/sync/checkpoint.rs | 15 +- crates/pathfinder/src/sync/headers.rs | 3 - crates/pathfinder/src/sync/track.rs | 3 - crates/pathfinder/src/sync/transactions.rs | 7 + 6 files changed, 431 insertions(+), 95 deletions(-) diff --git a/crates/pathfinder/src/bin/pathfinder/main.rs b/crates/pathfinder/src/bin/pathfinder/main.rs index f6b24d7b4..0b8aed3c8 100644 --- a/crates/pathfinder/src/bin/pathfinder/main.rs +++ b/crates/pathfinder/src/bin/pathfinder/main.rs @@ -628,6 +628,8 @@ fn start_p2p_sync( l1_checkpoint_override: Option, verify_tree_hashes: bool, ) -> tokio::task::JoinHandle> { + use pathfinder_block_hashes::BlockHashDb; + let sync = pathfinder_lib::sync::Sync { storage, p2p: p2p_client, @@ -635,10 +637,10 @@ fn start_p2p_sync( eth_address: pathfinder_context.l1_core_address, fgw_client: pathfinder_context.gateway, chain_id: pathfinder_context.network_id, - chain: pathfinder_context.network, public_key: gateway_public_key, l1_checkpoint_override, verify_tree_hashes, + block_hash_db: Some(BlockHashDb::new(pathfinder_context.network)), }; tokio::spawn(sync.run()) } diff --git a/crates/pathfinder/src/sync.rs b/crates/pathfinder/src/sync.rs index 043991142..60dd2e9c2 100644 --- a/crates/pathfinder/src/sync.rs +++ b/crates/pathfinder/src/sync.rs @@ -1,7 +1,5 @@ #![allow(dead_code, unused)] -use core::panic; -use std::sync::Arc; use std::time::Duration; use anyhow::Context; @@ -17,6 +15,7 @@ use p2p::client::peer_agnostic::traits::{ TransactionStream, }; use p2p::PeerData; +use pathfinder_block_hashes::BlockHashDb; use pathfinder_common::error::AnyhowExt; use pathfinder_common::{ block_hash, @@ -56,11 +55,11 @@ pub struct Sync { pub eth_client: pathfinder_ethereum::EthereumClient, pub eth_address: H160, pub fgw_client: G, - pub chain: Chain, pub chain_id: ChainId, pub public_key: PublicKey, pub l1_checkpoint_override: Option, pub verify_tree_hashes: bool, + pub block_hash_db: Option, } impl Sync @@ -79,7 +78,6 @@ where pub async fn run(self) -> anyhow::Result<()> { let (next, parent_hash) = self.checkpoint_sync().await?; - // TODO: depending on how this is implemented, we might want to loop around it. self.track_sync(next, parent_hash).await } @@ -134,11 +132,10 @@ where eth_client: self.eth_client.clone(), eth_address: self.eth_address, fgw_client: self.fgw_client.clone(), - chain: self.chain, chain_id: self.chain_id, public_key: self.public_key, verify_tree_hashes: self.verify_tree_hashes, - block_hash_db: Some(pathfinder_block_hashes::BlockHashDb::new(self.chain)), + block_hash_db: self.block_hash_db.clone(), } .run(checkpoint) .await; @@ -193,14 +190,13 @@ where loop { let mut result = track::Sync { - latest: LatestStream::spawn(self.fgw_client.clone(), Duration::from_secs(2)), + latest: LatestStream::spawn(self.fgw_client.clone(), Duration::from_secs(2), false), p2p: self.p2p.clone(), storage: self.storage.clone(), - chain: self.chain, chain_id: self.chain_id, public_key: self.public_key, - block_hash_db: Some(pathfinder_block_hashes::BlockHashDb::new(self.chain)), verify_tree_hashes: self.verify_tree_hashes, + block_hash_db: self.block_hash_db.clone(), } .run(&mut next, &mut parent_hash, self.fgw_client.clone()) .await; @@ -251,7 +247,7 @@ impl Stream for LatestStream { } impl LatestStream { - fn spawn(fgw: G, head_poll_interval: Duration) -> Self + fn spawn(fgw: G, head_poll_interval: Duration, single_shot: bool) -> Self where G: GatewayApi + Clone + Send + 'static, { @@ -281,7 +277,6 @@ impl LatestStream { } tx.send_if_modified(|current| { - // TODO: handle reorgs correctly if *current != latest { tracing::info!(?latest, "LatestStream"); *current = latest; @@ -290,6 +285,10 @@ impl LatestStream { false } }); + + if single_shot { + return; + } } }); @@ -302,8 +301,14 @@ impl LatestStream { #[cfg(test)] mod tests { + use std::collections::{HashMap, VecDeque}; + use std::ops::{Range, RangeInclusive}; + use std::sync::atomic::{AtomicU64, Ordering}; + use std::sync::{Arc, Mutex}; + use fake::{Fake, Faker}; use futures::stream; + use http::header; use p2p::client::types::{ ClassDefinition, ClassDefinitionsError, @@ -314,14 +319,26 @@ mod tests { }; use p2p::libp2p::PeerId; use pathfinder_common::event::Event; + use pathfinder_common::receipt::Receipt; use pathfinder_common::state_update::StateUpdateData; use pathfinder_common::transaction::Transaction; - use pathfinder_common::{BlockHeader, BlockId, ClassHash, SignedBlockHeader, TransactionHash}; + use pathfinder_common::{ + BlockHeader, + BlockId, + ClassHash, + SierraHash, + SignedBlockHeader, + TransactionHash, + }; use pathfinder_crypto::signature::ecdsa_sign; use pathfinder_crypto::Felt; use pathfinder_ethereum::EthereumClient; use pathfinder_storage::fake::{generate, Block, Config}; - use pathfinder_storage::StorageBuilder; + use pathfinder_storage::{Storage, StorageBuilder}; + use rand::Rng; + use rayon::iter::Rev; + use rstest::rstest; + use sha3::digest::consts::U6; use starknet_gateway_types::error::SequencerError; use super::*; @@ -361,15 +378,91 @@ mod tests { (public_key, blocks) } + async fn sync_done_watch(storage: Storage, expected_last: BlockNumber) { + let mut start = std::time::Instant::now(); + tokio::task::spawn_blocking(move || loop { + std::thread::sleep(Duration::from_millis(200)); + let mut db = storage.connection().unwrap(); + let db = db.transaction().unwrap(); + let header = db.block_header(expected_last.into()).unwrap(); + if let Some(header) = header { + if header.number == expected_last { + let after = start.elapsed(); + tracing::info!(?after, "Sync done"); + break; + } + } + }) + .await + .unwrap(); + } + + #[derive(Copy, Clone, Debug)] + struct ErrorSetup { + fatal_at: Option, + expected_last_synced: LastSynced, + } + + #[derive(Copy, Clone, Debug)] + enum LastSynced { + Full(BlockNumber), + HeadersOnly(BlockNumber), + } + + impl LastSynced { + fn block_number(&self) -> BlockNumber { + match self { + LastSynced::Full(b) | LastSynced::HeadersOnly(b) => *b, + } + } + + fn is_full(&self) -> bool { + matches!(self, LastSynced::Full(_)) + } + } + + const ERROR_CONSUMED: u64 = u64::MAX; + const CHECKPOINT_BLOCKS: u64 = 10; + const TRACK_BLOCKS: u64 = CHECKPOINT_MARGIN - 1; + const ALL_BLOCKS: u64 = CHECKPOINT_BLOCKS + TRACK_BLOCKS; + const LAST_IN_CHECKPOINT: BlockNumber = BlockNumber::new_or_panic(CHECKPOINT_BLOCKS - 1); + const LAST_IN_TRACK: BlockNumber = BlockNumber::new_or_panic(ALL_BLOCKS - 1); + + #[rstest] + #[case::sync_restarts_after_recoverable_errors_and_succeeds(ErrorSetup { + // Each sync stage will experience a recoverable error at random blocks + fatal_at: None, + // All blocks will be stored successfully + expected_last_synced: LastSynced::Full(LAST_IN_TRACK), + })] + #[case::checkpoint_bails_after_fatal_error(ErrorSetup { + fatal_at: Some(LAST_IN_CHECKPOINT), + // All headers are stored but transactions fail + expected_last_synced: LastSynced::HeadersOnly(LAST_IN_CHECKPOINT), + })] + #[case::track_bails_after_fatal_error(ErrorSetup { + fatal_at: Some(LAST_IN_TRACK), + // The last block is not stored + expected_last_synced: LastSynced::Full(LAST_IN_TRACK - 1), + })] #[test_log::test(tokio::test)] - async fn checkpoint_restarts_after_recoverable_error() { - let (public_key, blocks) = generate_fake_blocks(20); + async fn sync(#[case] error_setup: ErrorSetup) { + let (public_key, blocks) = generate_fake_blocks(ALL_BLOCKS as usize); let last_header = &blocks.last().unwrap().header.header; - let mid_header = &blocks[9].header.header; + let last_checkpoint_header = &blocks[LAST_IN_CHECKPOINT.get() as usize].header.header; + let storage = StorageBuilder::in_tempdir().unwrap(); + + let expected_last_synced_block = error_setup.expected_last_synced.block_number(); + let expect_fully_synced_blocks = error_setup.expected_last_synced.is_full(); + + let error_trigger = ErrorTrigger::new(error_setup.fatal_at); + let sync = Sync { - storage: StorageBuilder::in_tempdir().unwrap(), + storage: storage.clone(), p2p: FakeP2PClient { blocks: blocks.clone(), + error_trigger: error_trigger.clone(), + storage: storage.clone(), }, // We use `l1_checkpoint_override` instead eth_client: EthereumClient::new("https://unused.com").unwrap(), @@ -377,49 +470,188 @@ mod tests { fgw_client: FakeFgw { head: (last_header.number, last_header.hash), }, - chain: Chain::SepoliaTestnet, chain_id: ChainId::SEPOLIA_TESTNET, - public_key: PublicKey::ZERO, // TODO + public_key, l1_checkpoint_override: Some(EthereumStateUpdate { - state_root: mid_header.state_commitment, - block_number: mid_header.number, - block_hash: mid_header.hash, + state_root: last_checkpoint_header.state_commitment, + block_number: last_checkpoint_header.number, + block_hash: last_checkpoint_header.hash, }), verify_tree_hashes: true, + block_hash_db: None, }; - sync.run().await.unwrap(); + tokio::select! { + _ = tokio::time::timeout(Duration::from_secs(10), sync.run()) => (), + _ = sync_done_watch(storage.clone(), expected_last_synced_block) => (), + } - // TODO - // 2 cases here: - // - recoverable error - // - premature end of "current" stream - } + assert!(error_trigger.all_errors_triggered()); - #[test] - fn track_restarts_after_recoverable_error() { - // TODO - // 2 cases here: - // - recoverable error - // - premature end of "current" stream + let mut db = storage.connection().unwrap(); + let db = db.transaction().unwrap(); + for mut expected in blocks + .into_iter() + .take_while(|block| block.header.header.number <= expected_last_synced_block) + { + let block_number = expected.header.header.number; + let block_id = block_number.into(); + let header = db.block_header(block_id).unwrap().unwrap(); + let signature = db.signature(block_id).unwrap().unwrap(); + + pretty_assertions_sorted::assert_eq!( + header, + expected.header.header, + "block {}", + block_number + ); + pretty_assertions_sorted::assert_eq!( + signature, + expected.header.signature, + "block {}", + block_number + ); + + if expect_fully_synced_blocks { + let transaction_data = db.transaction_data_for_block(block_id).unwrap().unwrap(); + let state_update_data: StateUpdateData = + db.state_update(block_id).unwrap().unwrap().into(); + let declared = db.declared_classes_at(block_id).unwrap().unwrap(); + + let mut cairo_defs = HashMap::new(); + let mut sierra_defs = HashMap::new(); + + for class_hash in declared { + let class = db.class_definition(class_hash).unwrap().unwrap(); + match db.casm_hash(class_hash).unwrap() { + Some(casm_hash) => { + let casm = db.casm_definition(class_hash).unwrap().unwrap(); + sierra_defs.insert(SierraHash(class_hash.0), (class, casm)); + } + None => { + cairo_defs.insert(class_hash, class); + } + } + } - // Check if tracking has restarted from the last stored block - // ie if next and parent_hash have advanced + pretty_assertions_sorted::assert_eq!( + header.state_diff_commitment, + expected.header.header.state_diff_commitment, + "block {}", + block_number + ); + pretty_assertions_sorted::assert_eq!( + header.state_diff_length, + expected.header.header.state_diff_length, + "block {}", + block_number + ); + pretty_assertions_sorted::assert_eq!( + transaction_data, + expected.transaction_data, + "block {}", + block_number + ); + pretty_assertions_sorted::assert_eq!( + state_update_data, + expected.state_update.unwrap().into(), + "block {}", + block_number + ); + pretty_assertions_sorted::assert_eq!( + cairo_defs, + expected.cairo_defs.into_iter().collect::>(), + "block {}", + block_number + ); + pretty_assertions_sorted::assert_eq!( + sierra_defs, + expected + .sierra_defs + .into_iter() + // All sierra fixtures are not compile-able + .map(|(h, s, _)| (h, (s, b"I'm from the fgw!".to_vec()))) + .collect::>(), + "block {}", + block_number + ); + } + } } - #[test] - fn checkpoint_stops_after_fatal_error() { - // TODO + #[derive(Clone)] + struct FakeP2PClient { + pub blocks: Vec, + pub error_trigger: ErrorTrigger, + pub storage: Storage, } - #[test] - fn track_stops_after_fatal_error() { - // TODO + #[derive(Clone)] + enum ErrorTrigger { + Recoverable(Arc>), + Fatal(Arc), } - #[derive(Clone)] - struct FakeP2PClient { - pub blocks: Vec, + impl ErrorTrigger { + fn new(fatal_at: Option) -> Self { + match fatal_at { + Some(fatal_at) => Self::Fatal(Arc::new(AtomicU64::new(fatal_at.get()))), + None => Self::Recoverable(Arc::new( + (0..=4) + .map(|_| AtomicU64::new((0..CHECKPOINT_BLOCKS).fake())) + .chain( + (5..=9).map(|_| AtomicU64::new((CHECKPOINT_BLOCKS..ALL_BLOCKS).fake())), + ) + .collect(), + )), + } + } + + fn fatal(&self, block: BlockNumber) -> bool { + match self { + Self::Fatal(at) => at + .compare_exchange( + block.get(), + ERROR_CONSUMED, + Ordering::Relaxed, + Ordering::Relaxed, + ) + .is_ok(), + Self::Recoverable { .. } => false, + } + } + + // Sync stages: + // - 0: checkpoint, header + // ... + // - 4: checkpoint, event + // - 5: track, header + // ... + // - 9: track, event + fn recoverable(&self, block: BlockNumber, sync_stage: usize) -> bool { + match self { + Self::Fatal(_) => false, + Self::Recoverable(triggers) => { + let at = &triggers[sync_stage]; + at.compare_exchange( + block.get(), + ERROR_CONSUMED, + Ordering::Relaxed, + Ordering::Relaxed, + ) + .is_ok() + } + } + } + + fn all_errors_triggered(&self) -> bool { + match self { + Self::Fatal(at) => at.load(Ordering::Relaxed) == ERROR_CONSUMED, + Self::Recoverable(triggers) => triggers + .iter() + .all(|at| at.load(Ordering::Relaxed) == ERROR_CONSUMED), + } + } } impl FakeP2PClient { @@ -436,9 +668,9 @@ mod tests { let mut blocks = self .blocks .into_iter() - .take_while(move |b| { + .filter_map(move |b| { let n = b.header.header.number; - n >= start && n <= stop + (n >= start && n <= stop).then_some(b) }) .collect::>(); @@ -457,8 +689,20 @@ mod tests { stop: BlockNumber, reverse: bool, ) -> impl Stream> + Send { - stream::iter(self.blocks(start, stop, reverse, |block| { - PeerData::for_tests(block.header) + let error_trigger2 = self.error_trigger.clone(); + + stream::iter(self.blocks(start, stop, reverse, |mut b| { + let block = b.header.header.number; + + if error_trigger2.recoverable(block, 0) || error_trigger2.recoverable(block, 5) { + tracing::debug!(%block, + "FakeP2PClient::header_stream triggering recoverable error at", + ); + // This will cause discontinuity + b.header.header = Faker.fake(); + } + + PeerData::for_tests(b.header) })) } } @@ -468,19 +712,35 @@ mod tests { self, start: BlockNumber, stop: BlockNumber, - // transaction_count_stream: impl Stream> + Send + - // 'static, _: impl Stream> + Send + 'static, ) -> impl Stream> + Send { - stream::iter(self.blocks(start, stop, false, |block| { + let error_trigger2 = self.error_trigger.clone(); + + stream::iter(self.blocks(start, stop, false, |mut b| { + let block = b.header.header.number; + + if error_trigger2.recoverable(block, 1) { + tracing::debug!(%block, + "FakeP2PClient::transaction_stream triggering recoverable error at", + ); + // This will cause transaction commitment mismatch + b.transaction_data.pop(); + } + + if error_trigger2.fatal(block) { + tracing::debug!(%block, + "FakeP2PClient::transaction_stream triggering fatal error at", + ); + return Err(anyhow::anyhow!("Fatal error at block {block}",)); + } + Ok(PeerData::for_tests(( - block - .transaction_data + b.transaction_data .into_iter() .map(|(t, r, _)| (t, r.into())) .collect(), - block.header.header.number, + b.header.header.number, ))) })) } @@ -491,14 +751,28 @@ mod tests { self, start: BlockNumber, stop: BlockNumber, - // state_diff_length_stream: impl Stream> + Send + - // 'static, _: impl Stream> + Send + 'static, ) -> impl Stream> + Send { - stream::iter(self.blocks(start, stop, false, |block| { + let error_trigger2 = self.error_trigger.clone(); + + stream::iter(self.blocks(start, stop, false, |mut b| { + let block = b.header.header.number; + + if error_trigger2.recoverable(block, 2) { + tracing::debug!(%block, + "FakeP2PClient::state_diff_stream triggering recoverable error at", + ); + // This will cause commitment mismatch + b.state_update + .as_mut() + .unwrap() + .contract_updates + .insert(Faker.fake(), Faker.fake()); + } + Ok(PeerData::for_tests(( - block.state_update.unwrap().into(), - block.header.header.number, + b.state_update.unwrap().into(), + b.header.header.number, ))) })) } @@ -509,15 +783,24 @@ mod tests { self, start: BlockNumber, stop: BlockNumber, - // declared_class_count_stream: impl Stream> + Send + - // 'static, _: impl Stream> + Send + 'static, ) -> impl Stream> + Send { + let error_trigger2 = self.error_trigger.clone(); + stream::iter( - self.blocks(start, stop, false, |block| { - let block_number = block.header.header.number; - block - .cairo_defs + self.blocks(start, stop, false, |mut b| { + let block = b.header.header.number; + + if error_trigger2.recoverable(block, 3) { + tracing::debug!(%block, + "FakeP2PClient::class_stream triggering recoverable error at", + ); + // This will trigger unexpected class + b.cairo_defs.push((Faker.fake(), Faker.fake())); + } + + let block_number = b.header.header.number; + b.cairo_defs .into_iter() .map(move |(hash, definition)| { Ok(PeerData::for_tests(ClassDefinition::Cairo { @@ -526,7 +809,7 @@ mod tests { hash, })) }) - .chain(block.sierra_defs.into_iter().map( + .chain(b.sierra_defs.into_iter().map( move |(hash, sierra_definition, _)| { Ok(PeerData::for_tests(ClassDefinition::Sierra { block_number, @@ -547,14 +830,24 @@ mod tests { self, start: BlockNumber, stop: BlockNumber, - // event_count_stream: impl Stream> + Send + 'static, _: impl Stream> + Send + 'static, ) -> impl Stream> { - stream::iter(self.blocks(start, stop, false, |block| { + let error_trigger2 = self.error_trigger.clone(); + + stream::iter(self.blocks(start, stop, false, |mut b| { + let block = b.header.header.number; + + if error_trigger2.recoverable(block, 4) { + tracing::debug!(%block, + "FakeP2PClient::event_stream triggering recoverable error at", + ); + // This will trigger event commitment mismatch + b.transaction_data.last_mut().unwrap().2.push(Faker.fake()); + } + Ok(PeerData::for_tests(( - block.header.header.number, - block - .transaction_data + b.header.header.number, + b.transaction_data .into_iter() .map(|(t, _, e)| (t.hash, e)) .collect(), @@ -571,7 +864,7 @@ mod tests { PeerId, impl Stream> + Send, )> { - let tr = self + let mut tr = self .blocks .iter() .find(|b| b.header.header.number == block) @@ -581,6 +874,31 @@ mod tests { .map(|(t, r, e)| Ok((t.clone(), P2PReceipt::from(r.clone())))) .collect::>>(); + if self.error_trigger.recoverable(block, 6) { + tracing::debug!(%block, + "FakeP2PClient::transactions_for_block triggering recoverable error at", + ); + // This will cause transaction hash mismatch + tr.last_mut().unwrap().as_mut().unwrap().0.variant = Faker.fake(); + } + + if self.error_trigger.fatal(block) { + tracing::debug!(%block, + "FakeP2PClient::transactions_for_block triggering fatal error at", + ); + // Returning an error from the "for_block" apis does not trigger a fatal error + // so instead we insert a fake header for this very block to trigger an + // insertion conflict when track is about to store the entire block + let mut db = self.storage.connection().unwrap(); + let db = db.transaction().unwrap(); + let header = BlockHeader { + number: block, + ..Default::default() + }; + db.insert_block_header(&header).unwrap(); + db.commit().unwrap(); + } + Some((PeerId::random(), stream::iter(tr))) } @@ -589,7 +907,7 @@ mod tests { block: BlockNumber, state_diff_length: u64, ) -> Result, StateDiffsError> { - let sd: StateUpdateData = self + let mut sd: StateUpdateData = self .blocks .iter() .find(|b| b.header.header.number == block) @@ -601,6 +919,14 @@ mod tests { assert_eq!(sd.state_diff_length() as u64, state_diff_length); + if self.error_trigger.recoverable(block, 7) { + tracing::debug!(%block, + "FakeP2PClient::state_diff_for_block triggering recoverable error at", + ); + // This will cause commitment mismatch + sd.contract_updates.insert(Faker.fake(), Faker.fake()); + } + Ok(Some((PeerId::random(), sd))) } @@ -614,7 +940,7 @@ mod tests { .iter() .find(|b| b.header.header.number == block) .unwrap(); - let defs = b + let mut defs = b .cairo_defs .iter() .map(|(h, x)| ClassDefinition::Cairo { @@ -633,6 +959,14 @@ mod tests { ) .collect::>(); + if self.error_trigger.recoverable(block, 8) { + tracing::debug!(%block, + "FakeP2PClient::class_definitions_for_block triggering recoverable error at", + ); + // This will cause unexpected class + defs.push(Faker.fake()); + } + Ok(Some((PeerId::random(), defs))) } @@ -643,7 +977,7 @@ mod tests { PeerId, impl Stream> + Send, )> { - let e = self + let mut e = self .blocks .iter() .find(|b| b.header.header.number == block) @@ -654,6 +988,14 @@ mod tests { .map(Ok) .collect::>(); + if self.error_trigger.recoverable(block, 9) { + tracing::debug!(%block, + "FakeP2PClient::events_for_block triggering recoverable error at", + ); + // This will trigger commitment mismatch + e.push(Ok(Faker.fake())); + } + Some((PeerId::random(), stream::iter(e))) } } diff --git a/crates/pathfinder/src/sync/checkpoint.rs b/crates/pathfinder/src/sync/checkpoint.rs index a3f66ca54..e42702836 100644 --- a/crates/pathfinder/src/sync/checkpoint.rs +++ b/crates/pathfinder/src/sync/checkpoint.rs @@ -56,7 +56,6 @@ pub struct Sync { pub eth_client: pathfinder_ethereum::EthereumClient, pub eth_address: H160, pub fgw_client: G, - pub chain: Chain, pub chain_id: ChainId, pub public_key: PublicKey, pub verify_tree_hashes: bool, @@ -81,11 +80,11 @@ where p2p: P, ethereum: (pathfinder_ethereum::EthereumClient, H160), fgw_client: G, - chain: Chain, chain_id: ChainId, public_key: PublicKey, l1_anchor_override: Option, verify_tree_hashes: bool, + block_hash_db: Option, ) -> Self { Self { storage, @@ -93,11 +92,10 @@ where eth_client: ethereum.0, eth_address: ethereum.1, fgw_client, - chain, chain_id, public_key, verify_tree_hashes, - block_hash_db: Some(pathfinder_block_hashes::BlockHashDb::new(chain)), + block_hash_db, } } @@ -175,7 +173,6 @@ where handle_header_stream( self.p2p.clone().header_stream(gap.tail, gap.head, true), gap.head(), - self.chain, self.chain_id, self.public_key, self.block_hash_db.clone(), @@ -309,7 +306,6 @@ where async fn handle_header_stream( stream: impl Stream> + Send + 'static, head: (BlockNumber, BlockHash), - chain: Chain, chain_id: ChainId, public_key: PublicKey, block_hash_db: Option, @@ -319,7 +315,7 @@ async fn handle_header_stream( .spawn() .pipe(headers::BackwardContinuity::new(head.0, head.1), 10) .pipe( - headers::VerifyHashAndSignature::new(chain, chain_id, public_key, block_hash_db), + headers::VerifyHashAndSignature::new(chain_id, public_key, block_hash_db), 10, ) .try_chunks(1000, 10) @@ -875,7 +871,6 @@ mod tests { handle_header_stream( stream::iter(streamed_headers), head, - Chain::SepoliaTestnet, ChainId::SEPOLIA_TESTNET, public_key, block_hash_db, @@ -920,7 +915,6 @@ mod tests { handle_header_stream( stream::iter(streamed_headers), head, - Chain::SepoliaTestnet, ChainId::SEPOLIA_TESTNET, public_key, Some(pathfinder_block_hashes::BlockHashDb::new( @@ -948,7 +942,6 @@ mod tests { stream::iter(streamed_headers), head, // Causes mismatches for all block hashes because setup assumes Sepolia - Chain::Mainnet, ChainId::MAINNET, public_key, None, @@ -973,7 +966,6 @@ mod tests { handle_header_stream( stream::iter(streamed_headers), head, - Chain::SepoliaTestnet, ChainId::SEPOLIA_TESTNET, PublicKey::ZERO, // Invalid public key block_hash_db, @@ -1007,7 +999,6 @@ mod tests { handle_header_stream( stream::iter(streamed_headers), head, - Chain::SepoliaTestnet, ChainId::SEPOLIA_TESTNET, public_key, Some(pathfinder_block_hashes::BlockHashDb::new( diff --git a/crates/pathfinder/src/sync/headers.rs b/crates/pathfinder/src/sync/headers.rs index 29488e5f0..9211afd5b 100644 --- a/crates/pathfinder/src/sync/headers.rs +++ b/crates/pathfinder/src/sync/headers.rs @@ -159,7 +159,6 @@ pub struct BackwardContinuity { /// Ensures that the block hash and signature are correct. pub struct VerifyHashAndSignature { - chain: Chain, chain_id: ChainId, public_key: PublicKey, block_hash_db: Option, @@ -251,13 +250,11 @@ impl ProcessStage for VerifyHashAndSignature { impl VerifyHashAndSignature { pub fn new( - chain: Chain, chain_id: ChainId, public_key: PublicKey, block_hash_db: Option, ) -> Self { Self { - chain, chain_id, public_key, block_hash_db, diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index bb1b26825..62d4dd80e 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -56,7 +56,6 @@ pub struct Sync { pub latest: L, pub p2p: P, pub storage: Storage, - pub chain: Chain, pub chain_id: ChainId, pub public_key: PublicKey, pub block_hash_db: Option, @@ -89,7 +88,6 @@ impl Sync { .pipe(headers::ForwardContinuity::new(*next, *parent_hash), 100) .pipe( headers::VerifyHashAndSignature::new( - self.chain, self.chain_id, self.public_key, self.block_hash_db, @@ -922,7 +920,6 @@ mod tests { latest: futures::stream::iter(vec![latest]), p2p, storage: storage.clone(), - chain: Chain::SepoliaTestnet, chain_id: ChainId::SEPOLIA_TESTNET, public_key, block_hash_db: None, diff --git a/crates/pathfinder/src/sync/transactions.rs b/crates/pathfinder/src/sync/transactions.rs index 7c4499bc6..03eb09ef6 100644 --- a/crates/pathfinder/src/sync/transactions.rs +++ b/crates/pathfinder/src/sync/transactions.rs @@ -87,7 +87,11 @@ impl ProcessStage for CalculateHashes { fn map(&mut self, peer: &PeerId, input: Self::Input) -> Result { use rayon::prelude::*; + let (transactions, block_number, version, expected_commitment) = input; + + tracing::error!(%block_number,"Transactions::Hashes"); + let transactions = transactions .into_par_iter() .map(|(tx, r)| { @@ -173,6 +177,9 @@ impl ProcessStage for VerifyCommitment { version, block_number, } = transactions; + + tracing::error!(%block_number,"Transactions::Verify"); + let txs: Vec<_> = transactions.iter().map(|(t, _)| t.clone()).collect(); // This computation can only fail in case of internal trie error which is always // a fatal error