Skip to content

Commit

Permalink
test(sync): wip
Browse files Browse the repository at this point in the history
  • Loading branch information
CHr15F0x committed Nov 21, 2024
1 parent fbb66a0 commit 50978f7
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 34 deletions.
4 changes: 3 additions & 1 deletion crates/pathfinder/src/bin/pathfinder/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,17 +628,19 @@ fn start_p2p_sync(
l1_checkpoint_override: Option<pathfinder_ethereum::EthereumStateUpdate>,
verify_tree_hashes: bool,
) -> tokio::task::JoinHandle<anyhow::Result<()>> {
use pathfinder_block_hashes::BlockHashDb;

let sync = pathfinder_lib::sync::Sync {
storage,
p2p: p2p_client,
eth_client: ethereum_client,
eth_address: pathfinder_context.l1_core_address,
fgw_client: pathfinder_context.gateway,
chain_id: pathfinder_context.network_id,
chain: pathfinder_context.network,
public_key: gateway_public_key,
l1_checkpoint_override,
verify_tree_hashes,
block_hash_db: Some(BlockHashDb::new(pathfinder_context.network)),
};
tokio::spawn(sync.run())
}
Expand Down
191 changes: 176 additions & 15 deletions crates/pathfinder/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use p2p::client::peer_agnostic::traits::{
TransactionStream,
};
use p2p::PeerData;
use pathfinder_block_hashes::BlockHashDb;
use pathfinder_common::error::AnyhowExt;
use pathfinder_common::{
block_hash,
Expand Down Expand Up @@ -56,11 +57,11 @@ pub struct Sync<P, G> {
pub eth_client: pathfinder_ethereum::EthereumClient,
pub eth_address: H160,
pub fgw_client: G,
pub chain: Chain,
pub chain_id: ChainId,
pub public_key: PublicKey,
pub l1_checkpoint_override: Option<EthereumStateUpdate>,
pub verify_tree_hashes: bool,
pub block_hash_db: Option<BlockHashDb>,
}

impl<P, G> Sync<P, G>
Expand All @@ -83,6 +84,20 @@ where
self.track_sync(next, parent_hash).await
}

pub async fn run2(self) -> anyhow::Result<()> {
self.run_inner(true).await.map_err(|e| match e {
SyncError::Fatal(mut e) => e.take_or_deep_clone(),
_ => unreachable!("Non fatal errors should always be retried upon"),
})
}

async fn run_inner(self, retry_on_error: bool) -> Result<(), SyncError> {
let (next, parent_hash) = self.checkpoint_sync_inner(retry_on_error).await?;

self.track_sync_inner(next, parent_hash, retry_on_error)
.await
}

async fn handle_recoverable_error(&self, err: &error::SyncError) {
// TODO
tracing::debug!(%err, "Log and punish as appropriate");
Expand Down Expand Up @@ -134,11 +149,10 @@ where
eth_client: self.eth_client.clone(),
eth_address: self.eth_address,
fgw_client: self.fgw_client.clone(),
chain: self.chain,
chain_id: self.chain_id,
public_key: self.public_key,
verify_tree_hashes: self.verify_tree_hashes,
block_hash_db: Some(pathfinder_block_hashes::BlockHashDb::new(self.chain)),
block_hash_db: self.block_hash_db.clone(),
}
.run(checkpoint)
.await;
Expand Down Expand Up @@ -176,6 +190,68 @@ where
}
}

async fn checkpoint_sync_inner(
&self,
retry_on_error: bool,
) -> Result<(BlockNumber, BlockHash), SyncError> {
let mut checkpoint = self.get_checkpoint().await;
let from = (checkpoint.block_number, checkpoint.block_hash);

tracing::info!(?from, "Checkpoint sync started");

loop {
let result = checkpoint::Sync {
storage: self.storage.clone(),
p2p: self.p2p.clone(),
eth_client: self.eth_client.clone(),
eth_address: self.eth_address,
fgw_client: self.fgw_client.clone(),
chain_id: self.chain_id,
public_key: self.public_key,
verify_tree_hashes: self.verify_tree_hashes,
block_hash_db: self.block_hash_db.clone(),
}
.run(checkpoint)
.await;

// Handle the error
let continue_from = match result {
Ok(continue_from) => {
tracing::debug!(?continue_from, "Checkpoint sync complete");
continue_from
}
Err(error @ SyncError::Fatal(_)) => {
tracing::error!(%error, "Stopping checkpoint sync");
return Err(error);
}
Err(error) => {
tracing::debug!(%error, "Restarting checkpoint sync");
self.handle_recoverable_error(&error).await;

if retry_on_error {
continue;
} else {
return Err(error);
}
}
};

// Initial sync might take so long that the latest checkpoint is actually far
// ahead again. Repeat until we are within some margin of L1.
let latest_checkpoint = self.get_checkpoint().await;
if checkpoint.block_number + CHECKPOINT_MARGIN < latest_checkpoint.block_number {
checkpoint = latest_checkpoint;
tracing::debug!(
local_checkpoint=%checkpoint.block_number, latest_checkpoint=%latest_checkpoint.block_number,
"Restarting checkpoint sync: L1 checkpoint has advanced"
);
continue;
}

break Ok(continue_from);
}
}

/// Run the track sync forever, requires the number and parent hash of the
/// first block to sync.
///
Expand All @@ -193,14 +269,13 @@ where

loop {
let mut result = track::Sync {
latest: LatestStream::spawn(self.fgw_client.clone(), Duration::from_secs(2)),
latest: LatestStream::spawn(self.fgw_client.clone(), Duration::from_secs(2), false),
p2p: self.p2p.clone(),
storage: self.storage.clone(),
chain: self.chain,
chain_id: self.chain_id,
public_key: self.public_key,
block_hash_db: Some(pathfinder_block_hashes::BlockHashDb::new(self.chain)),
verify_tree_hashes: self.verify_tree_hashes,
block_hash_db: self.block_hash_db.clone(),
}
.run(&mut next, &mut parent_hash, self.fgw_client.clone())
.await;
Expand All @@ -219,6 +294,54 @@ where
}
}
}

async fn track_sync_inner(
&self,
mut next: BlockNumber,
mut parent_hash: BlockHash,
retry_on_error: bool,
) -> Result<(), SyncError> {
tracing::info!(next_block=%next, "Track sync started");

loop {
let mut result = track::Sync {
latest: LatestStream::spawn(
self.fgw_client.clone(),
Duration::from_secs(2),
!retry_on_error,
),
p2p: self.p2p.clone(),
storage: self.storage.clone(),
chain_id: self.chain_id,
public_key: self.public_key,
verify_tree_hashes: self.verify_tree_hashes,
block_hash_db: self.block_hash_db.clone(),
}
.run(&mut next, &mut parent_hash, self.fgw_client.clone())
.await;

match result {
Ok(_) => {
tracing::debug!("Restarting track sync: unexpected end of Block stream");
if !retry_on_error {
return Ok(());
}
}
Err(error @ SyncError::Fatal(_)) => {
tracing::error!(%error, "Stopping track sync");
return Err(error);
}
Err(error) => {
tracing::debug!(%error, "Restarting track sync");
self.handle_recoverable_error(&error).await;

if !retry_on_error {
return Err(error);
}
}
}
}
}
}

struct LatestStream {
Expand Down Expand Up @@ -251,7 +374,7 @@ impl Stream for LatestStream {
}

impl LatestStream {
fn spawn<G>(fgw: G, head_poll_interval: Duration) -> Self
fn spawn<G>(fgw: G, head_poll_interval: Duration, single_shot: bool) -> Self
where
G: GatewayApi + Clone + Send + 'static,
{
Expand Down Expand Up @@ -281,7 +404,6 @@ impl LatestStream {
}

tx.send_if_modified(|current| {
// TODO: handle reorgs correctly
if *current != latest {
tracing::info!(?latest, "LatestStream");
*current = latest;
Expand All @@ -290,6 +412,10 @@ impl LatestStream {
false
}
});

if single_shot {
return;
}
}
});

Expand All @@ -302,6 +428,9 @@ impl LatestStream {

#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use std::sync::Mutex;

use fake::{Fake, Faker};
use futures::stream;
use p2p::client::types::{
Expand Down Expand Up @@ -363,9 +492,12 @@ mod tests {

#[test_log::test(tokio::test)]
async fn checkpoint_restarts_after_recoverable_error() {
let (public_key, blocks) = generate_fake_blocks(20);
const CHECKPOINT_BLOCKS: usize = 10;
// const TRACK_BLOCKS: usize = CHECKPOINT_MARGIN as usize - 1;
const TRACK_BLOCKS: usize = 1;
let (public_key, blocks) = generate_fake_blocks(CHECKPOINT_BLOCKS + TRACK_BLOCKS);
let last_header = &blocks.last().unwrap().header.header;
let mid_header = &blocks[9].header.header;
let mid_header = &blocks[CHECKPOINT_BLOCKS - 1].header.header;
let sync = Sync {
storage: StorageBuilder::in_tempdir().unwrap(),
p2p: FakeP2PClient {
Expand All @@ -377,18 +509,21 @@ mod tests {
fgw_client: FakeFgw {
head: (last_header.number, last_header.hash),
},
chain: Chain::SepoliaTestnet,
chain_id: ChainId::SEPOLIA_TESTNET,
public_key: PublicKey::ZERO, // TODO
public_key,
l1_checkpoint_override: Some(EthereumStateUpdate {
state_root: mid_header.state_commitment,
block_number: mid_header.number,
block_hash: mid_header.hash,
}),
verify_tree_hashes: true,
block_hash_db: None,
};

sync.run().await.unwrap();
tokio::time::timeout(Duration::from_secs(10), sync.run_inner(false))
.await
.expect("test timed out")
.expect("No sync errors");

// TODO
// 2 cases here:
Expand Down Expand Up @@ -436,9 +571,9 @@ mod tests {
let mut blocks = self
.blocks
.into_iter()
.take_while(move |b| {
.filter_map(move |b| {
let n = b.header.header.number;
n >= start && n <= stop
(n >= start && n <= stop).then_some(b)
})
.collect::<Vec<_>>();

Expand All @@ -457,6 +592,8 @@ mod tests {
stop: BlockNumber,
reverse: bool,
) -> impl Stream<Item = PeerData<SignedBlockHeader>> + Send {
tracing::error!(%start, %stop, %reverse, "FakeP2PClient::header_stream");

stream::iter(self.blocks(start, stop, reverse, |block| {
PeerData::for_tests(block.header)
}))
Expand Down Expand Up @@ -677,4 +814,28 @@ mod tests {
Ok(self.head)
}
}

#[derive(Clone)]
struct FakeFgw2 {
heads: Arc<Mutex<VecDeque<(BlockNumber, BlockHash)>>>,
}

#[async_trait::async_trait]
impl GatewayApi for FakeFgw2 {
async fn pending_casm_by_hash(&self, _: ClassHash) -> Result<bytes::Bytes, SequencerError> {
Ok(bytes::Bytes::from_static(b"I'm from the fgw!"))
}

async fn block_header(
&self,
block: BlockId,
) -> Result<(BlockNumber, BlockHash), SequencerError> {
assert_eq!(block, BlockId::Latest);

let mut heads = self.heads.lock().unwrap();
let head = heads.pop_front().unwrap();

Ok(head)
}
}
}
Loading

0 comments on commit 50978f7

Please sign in to comment.