diff --git a/crates/pathfinder/src/bin/pathfinder/main.rs b/crates/pathfinder/src/bin/pathfinder/main.rs index f6b24d7b40..0b8aed3c82 100644 --- a/crates/pathfinder/src/bin/pathfinder/main.rs +++ b/crates/pathfinder/src/bin/pathfinder/main.rs @@ -628,6 +628,8 @@ fn start_p2p_sync( l1_checkpoint_override: Option, verify_tree_hashes: bool, ) -> tokio::task::JoinHandle> { + use pathfinder_block_hashes::BlockHashDb; + let sync = pathfinder_lib::sync::Sync { storage, p2p: p2p_client, @@ -635,10 +637,10 @@ fn start_p2p_sync( eth_address: pathfinder_context.l1_core_address, fgw_client: pathfinder_context.gateway, chain_id: pathfinder_context.network_id, - chain: pathfinder_context.network, public_key: gateway_public_key, l1_checkpoint_override, verify_tree_hashes, + block_hash_db: Some(BlockHashDb::new(pathfinder_context.network)), }; tokio::spawn(sync.run()) } diff --git a/crates/pathfinder/src/sync.rs b/crates/pathfinder/src/sync.rs index 0439911429..e395621e7f 100644 --- a/crates/pathfinder/src/sync.rs +++ b/crates/pathfinder/src/sync.rs @@ -17,6 +17,7 @@ use p2p::client::peer_agnostic::traits::{ TransactionStream, }; use p2p::PeerData; +use pathfinder_block_hashes::BlockHashDb; use pathfinder_common::error::AnyhowExt; use pathfinder_common::{ block_hash, @@ -56,11 +57,11 @@ pub struct Sync { pub eth_client: pathfinder_ethereum::EthereumClient, pub eth_address: H160, pub fgw_client: G, - pub chain: Chain, pub chain_id: ChainId, pub public_key: PublicKey, pub l1_checkpoint_override: Option, pub verify_tree_hashes: bool, + pub block_hash_db: Option, } impl Sync @@ -83,6 +84,20 @@ where self.track_sync(next, parent_hash).await } + pub async fn run2(self) -> anyhow::Result<()> { + self.run_inner(true).await.map_err(|e| match e { + SyncError::Fatal(mut e) => e.take_or_deep_clone(), + _ => unreachable!("Non fatal errors should always be retried upon"), + }) + } + + async fn run_inner(self, retry_on_error: bool) -> Result<(), SyncError> { + let (next, parent_hash) = self.checkpoint_sync_inner(retry_on_error).await?; + + self.track_sync_inner(next, parent_hash, retry_on_error) + .await + } + async fn handle_recoverable_error(&self, err: &error::SyncError) { // TODO tracing::debug!(%err, "Log and punish as appropriate"); @@ -134,11 +149,10 @@ where eth_client: self.eth_client.clone(), eth_address: self.eth_address, fgw_client: self.fgw_client.clone(), - chain: self.chain, chain_id: self.chain_id, public_key: self.public_key, verify_tree_hashes: self.verify_tree_hashes, - block_hash_db: Some(pathfinder_block_hashes::BlockHashDb::new(self.chain)), + block_hash_db: self.block_hash_db.clone(), } .run(checkpoint) .await; @@ -176,6 +190,68 @@ where } } + async fn checkpoint_sync_inner( + &self, + retry_on_error: bool, + ) -> Result<(BlockNumber, BlockHash), SyncError> { + let mut checkpoint = self.get_checkpoint().await; + let from = (checkpoint.block_number, checkpoint.block_hash); + + tracing::info!(?from, "Checkpoint sync started"); + + loop { + let result = checkpoint::Sync { + storage: self.storage.clone(), + p2p: self.p2p.clone(), + eth_client: self.eth_client.clone(), + eth_address: self.eth_address, + fgw_client: self.fgw_client.clone(), + chain_id: self.chain_id, + public_key: self.public_key, + verify_tree_hashes: self.verify_tree_hashes, + block_hash_db: self.block_hash_db.clone(), + } + .run(checkpoint) + .await; + + // Handle the error + let continue_from = match result { + Ok(continue_from) => { + tracing::debug!(?continue_from, "Checkpoint sync complete"); + continue_from + } + Err(error @ SyncError::Fatal(_)) => { + tracing::error!(%error, "Stopping checkpoint sync"); + return Err(error); + } + Err(error) => { + tracing::debug!(%error, "Restarting checkpoint sync"); + self.handle_recoverable_error(&error).await; + + if retry_on_error { + continue; + } else { + return Err(error); + } + } + }; + + // Initial sync might take so long that the latest checkpoint is actually far + // ahead again. Repeat until we are within some margin of L1. + let latest_checkpoint = self.get_checkpoint().await; + if checkpoint.block_number + CHECKPOINT_MARGIN < latest_checkpoint.block_number { + checkpoint = latest_checkpoint; + tracing::debug!( + local_checkpoint=%checkpoint.block_number, latest_checkpoint=%latest_checkpoint.block_number, + "Restarting checkpoint sync: L1 checkpoint has advanced" + ); + continue; + } + + break Ok(continue_from); + } + } + /// Run the track sync forever, requires the number and parent hash of the /// first block to sync. /// @@ -193,14 +269,13 @@ where loop { let mut result = track::Sync { - latest: LatestStream::spawn(self.fgw_client.clone(), Duration::from_secs(2)), + latest: LatestStream::spawn(self.fgw_client.clone(), Duration::from_secs(2), false), p2p: self.p2p.clone(), storage: self.storage.clone(), - chain: self.chain, chain_id: self.chain_id, public_key: self.public_key, - block_hash_db: Some(pathfinder_block_hashes::BlockHashDb::new(self.chain)), verify_tree_hashes: self.verify_tree_hashes, + block_hash_db: self.block_hash_db.clone(), } .run(&mut next, &mut parent_hash, self.fgw_client.clone()) .await; @@ -219,6 +294,54 @@ where } } } + + async fn track_sync_inner( + &self, + mut next: BlockNumber, + mut parent_hash: BlockHash, + retry_on_error: bool, + ) -> Result<(), SyncError> { + tracing::info!(next_block=%next, "Track sync started"); + + loop { + let mut result = track::Sync { + latest: LatestStream::spawn( + self.fgw_client.clone(), + Duration::from_secs(2), + !retry_on_error, + ), + p2p: self.p2p.clone(), + storage: self.storage.clone(), + chain_id: self.chain_id, + public_key: self.public_key, + verify_tree_hashes: self.verify_tree_hashes, + block_hash_db: self.block_hash_db.clone(), + } + .run(&mut next, &mut parent_hash, self.fgw_client.clone()) + .await; + + match result { + Ok(_) => { + tracing::debug!("Restarting track sync: unexpected end of Block stream"); + if !retry_on_error { + return Ok(()); + } + } + Err(error @ SyncError::Fatal(_)) => { + tracing::error!(%error, "Stopping track sync"); + return Err(error); + } + Err(error) => { + tracing::debug!(%error, "Restarting track sync"); + self.handle_recoverable_error(&error).await; + + if !retry_on_error { + return Err(error); + } + } + } + } + } } struct LatestStream { @@ -251,7 +374,7 @@ impl Stream for LatestStream { } impl LatestStream { - fn spawn(fgw: G, head_poll_interval: Duration) -> Self + fn spawn(fgw: G, head_poll_interval: Duration, single_shot: bool) -> Self where G: GatewayApi + Clone + Send + 'static, { @@ -281,7 +404,6 @@ impl LatestStream { } tx.send_if_modified(|current| { - // TODO: handle reorgs correctly if *current != latest { tracing::info!(?latest, "LatestStream"); *current = latest; @@ -290,6 +412,10 @@ impl LatestStream { false } }); + + if single_shot { + return; + } } }); @@ -302,6 +428,9 @@ impl LatestStream { #[cfg(test)] mod tests { + use std::collections::VecDeque; + use std::sync::Mutex; + use fake::{Fake, Faker}; use futures::stream; use p2p::client::types::{ @@ -363,9 +492,12 @@ mod tests { #[test_log::test(tokio::test)] async fn checkpoint_restarts_after_recoverable_error() { - let (public_key, blocks) = generate_fake_blocks(20); + const CHECKPOINT_BLOCKS: usize = 10; + // const TRACK_BLOCKS: usize = CHECKPOINT_MARGIN as usize - 1; + const TRACK_BLOCKS: usize = 1; + let (public_key, blocks) = generate_fake_blocks(CHECKPOINT_BLOCKS + TRACK_BLOCKS); let last_header = &blocks.last().unwrap().header.header; - let mid_header = &blocks[9].header.header; + let mid_header = &blocks[CHECKPOINT_BLOCKS - 1].header.header; let sync = Sync { storage: StorageBuilder::in_tempdir().unwrap(), p2p: FakeP2PClient { @@ -377,18 +509,21 @@ mod tests { fgw_client: FakeFgw { head: (last_header.number, last_header.hash), }, - chain: Chain::SepoliaTestnet, chain_id: ChainId::SEPOLIA_TESTNET, - public_key: PublicKey::ZERO, // TODO + public_key, l1_checkpoint_override: Some(EthereumStateUpdate { state_root: mid_header.state_commitment, block_number: mid_header.number, block_hash: mid_header.hash, }), verify_tree_hashes: true, + block_hash_db: None, }; - sync.run().await.unwrap(); + tokio::time::timeout(Duration::from_secs(10), sync.run_inner(false)) + .await + .expect("test timed out") + .expect("No sync errors"); // TODO // 2 cases here: @@ -436,9 +571,9 @@ mod tests { let mut blocks = self .blocks .into_iter() - .take_while(move |b| { + .filter_map(move |b| { let n = b.header.header.number; - n >= start && n <= stop + (n >= start && n <= stop).then_some(b) }) .collect::>(); @@ -457,6 +592,8 @@ mod tests { stop: BlockNumber, reverse: bool, ) -> impl Stream> + Send { + tracing::error!(%start, %stop, %reverse, "FakeP2PClient::header_stream"); + stream::iter(self.blocks(start, stop, reverse, |block| { PeerData::for_tests(block.header) })) @@ -677,4 +814,28 @@ mod tests { Ok(self.head) } } + + #[derive(Clone)] + struct FakeFgw2 { + heads: Arc>>, + } + + #[async_trait::async_trait] + impl GatewayApi for FakeFgw2 { + async fn pending_casm_by_hash(&self, _: ClassHash) -> Result { + Ok(bytes::Bytes::from_static(b"I'm from the fgw!")) + } + + async fn block_header( + &self, + block: BlockId, + ) -> Result<(BlockNumber, BlockHash), SequencerError> { + assert_eq!(block, BlockId::Latest); + + let mut heads = self.heads.lock().unwrap(); + let head = heads.pop_front().unwrap(); + + Ok(head) + } + } } diff --git a/crates/pathfinder/src/sync/checkpoint.rs b/crates/pathfinder/src/sync/checkpoint.rs index a3f66ca545..e42702836d 100644 --- a/crates/pathfinder/src/sync/checkpoint.rs +++ b/crates/pathfinder/src/sync/checkpoint.rs @@ -56,7 +56,6 @@ pub struct Sync { pub eth_client: pathfinder_ethereum::EthereumClient, pub eth_address: H160, pub fgw_client: G, - pub chain: Chain, pub chain_id: ChainId, pub public_key: PublicKey, pub verify_tree_hashes: bool, @@ -81,11 +80,11 @@ where p2p: P, ethereum: (pathfinder_ethereum::EthereumClient, H160), fgw_client: G, - chain: Chain, chain_id: ChainId, public_key: PublicKey, l1_anchor_override: Option, verify_tree_hashes: bool, + block_hash_db: Option, ) -> Self { Self { storage, @@ -93,11 +92,10 @@ where eth_client: ethereum.0, eth_address: ethereum.1, fgw_client, - chain, chain_id, public_key, verify_tree_hashes, - block_hash_db: Some(pathfinder_block_hashes::BlockHashDb::new(chain)), + block_hash_db, } } @@ -175,7 +173,6 @@ where handle_header_stream( self.p2p.clone().header_stream(gap.tail, gap.head, true), gap.head(), - self.chain, self.chain_id, self.public_key, self.block_hash_db.clone(), @@ -309,7 +306,6 @@ where async fn handle_header_stream( stream: impl Stream> + Send + 'static, head: (BlockNumber, BlockHash), - chain: Chain, chain_id: ChainId, public_key: PublicKey, block_hash_db: Option, @@ -319,7 +315,7 @@ async fn handle_header_stream( .spawn() .pipe(headers::BackwardContinuity::new(head.0, head.1), 10) .pipe( - headers::VerifyHashAndSignature::new(chain, chain_id, public_key, block_hash_db), + headers::VerifyHashAndSignature::new(chain_id, public_key, block_hash_db), 10, ) .try_chunks(1000, 10) @@ -875,7 +871,6 @@ mod tests { handle_header_stream( stream::iter(streamed_headers), head, - Chain::SepoliaTestnet, ChainId::SEPOLIA_TESTNET, public_key, block_hash_db, @@ -920,7 +915,6 @@ mod tests { handle_header_stream( stream::iter(streamed_headers), head, - Chain::SepoliaTestnet, ChainId::SEPOLIA_TESTNET, public_key, Some(pathfinder_block_hashes::BlockHashDb::new( @@ -948,7 +942,6 @@ mod tests { stream::iter(streamed_headers), head, // Causes mismatches for all block hashes because setup assumes Sepolia - Chain::Mainnet, ChainId::MAINNET, public_key, None, @@ -973,7 +966,6 @@ mod tests { handle_header_stream( stream::iter(streamed_headers), head, - Chain::SepoliaTestnet, ChainId::SEPOLIA_TESTNET, PublicKey::ZERO, // Invalid public key block_hash_db, @@ -1007,7 +999,6 @@ mod tests { handle_header_stream( stream::iter(streamed_headers), head, - Chain::SepoliaTestnet, ChainId::SEPOLIA_TESTNET, public_key, Some(pathfinder_block_hashes::BlockHashDb::new( diff --git a/crates/pathfinder/src/sync/headers.rs b/crates/pathfinder/src/sync/headers.rs index 29488e5f0e..9211afd5bf 100644 --- a/crates/pathfinder/src/sync/headers.rs +++ b/crates/pathfinder/src/sync/headers.rs @@ -159,7 +159,6 @@ pub struct BackwardContinuity { /// Ensures that the block hash and signature are correct. pub struct VerifyHashAndSignature { - chain: Chain, chain_id: ChainId, public_key: PublicKey, block_hash_db: Option, @@ -251,13 +250,11 @@ impl ProcessStage for VerifyHashAndSignature { impl VerifyHashAndSignature { pub fn new( - chain: Chain, chain_id: ChainId, public_key: PublicKey, block_hash_db: Option, ) -> Self { Self { - chain, chain_id, public_key, block_hash_db, diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index 01dbebb1c4..b4894ec138 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -56,7 +56,6 @@ pub struct Sync { pub latest: L, pub p2p: P, pub storage: Storage, - pub chain: Chain, pub chain_id: ChainId, pub public_key: PublicKey, pub block_hash_db: Option, @@ -89,7 +88,6 @@ impl Sync { .pipe(headers::ForwardContinuity::new(*next, *parent_hash), 100) .pipe( headers::VerifyHashAndSignature::new( - self.chain, self.chain_id, self.public_key, self.block_hash_db, @@ -922,7 +920,6 @@ mod tests { latest: futures::stream::iter(vec![latest]), p2p, storage: storage.clone(), - chain: Chain::SepoliaTestnet, chain_id: ChainId::SEPOLIA_TESTNET, public_key, block_hash_db: None,