Skip to content

Commit

Permalink
add header sync tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed Oct 17, 2023
1 parent 244055f commit 495f44a
Show file tree
Hide file tree
Showing 16 changed files with 437 additions and 54 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions base_layer/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ tokio = { version = "1.23", features = ["time", "sync", "macros"] }
tracing = "0.1.26"
uint = { version = "0.9", default-features = false }
zeroize = "1"
tower = "0.4.11"

[dev-dependencies]
tari_p2p = { path = "../../base_layer/p2p", features = ["test-mocks"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ impl<B: BlockchainBackend + 'static> BaseNodeStateMachine<B> {
db.set_disable_add_block_flag();
HeaderSync(HeaderSyncState::new(sync_peers, local_metadata))
},
(HeaderSync(s), HeaderSyncFailed) => {
(HeaderSync(s), HeaderSyncFailed(_err)) => {
db.clear_disable_add_block_flag();
Waiting(s.into())
},
(HeaderSync(s), Continue | NetworkSilence) => {
db.clear_disable_add_block_flag();
Listening(s.into())
},
(HeaderSync(s), HeadersSynchronized(_)) => DecideNextSync(s.into()),
(HeaderSync(s), HeadersSynchronized(..)) => DecideNextSync(s.into()),

(DecideNextSync(_), ProceedToHorizonSync(peers)) => HorizonStateSync(peers.into()),
(DecideNextSync(s), Continue) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::base_node::{
Starting,
Waiting,
},
sync::{HorizonSyncInfo, SyncPeer},
sync::{AttemptSyncResult, HorizonSyncInfo, SyncPeer},
};

#[derive(Debug)]
Expand All @@ -57,8 +57,8 @@ pub enum BaseNodeState {
#[derive(Debug, Clone, PartialEq)]
pub enum StateEvent {
Initialized,
HeadersSynchronized(SyncPeer),
HeaderSyncFailed,
HeadersSynchronized(SyncPeer, AttemptSyncResult),
HeaderSyncFailed(String),
ProceedToHorizonSync(Vec<SyncPeer>),
ProceedToBlockSync(Vec<SyncPeer>),
HorizonStateSynchronized,
Expand Down Expand Up @@ -145,8 +145,8 @@ impl Display for StateEvent {
match self {
Initialized => write!(f, "Initialized"),
BlocksSynchronized => write!(f, "Synchronised Blocks"),
HeadersSynchronized(peer) => write!(f, "Headers Synchronized from peer `{}`", peer),
HeaderSyncFailed => write!(f, "Header Synchronization Failed"),
HeadersSynchronized(peer, result) => write!(f, "Headers Synchronized from peer `{}` ({:?})", peer, result),
HeaderSyncFailed(err) => write!(f, "Header Synchronization Failed ({})", err),
ProceedToHorizonSync(_) => write!(f, "Proceed to horizon sync"),
ProceedToBlockSync(_) => write!(f, "Proceed to block sync"),
HorizonStateSynchronized => write!(f, "Horizon State Synchronized"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use crate::{
},
chain_storage::BlockchainBackend,
};

const LOG_TARGET: &str = "c::bn::header_sync";

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -78,6 +77,7 @@ impl HeaderSyncState {
}

// converting u64 to i64 is okay as the future time limit is the hundreds so way below u32 even
#[allow(clippy::too_many_lines)]
#[allow(clippy::cast_possible_wrap)]
pub async fn next_event<B: BlockchainBackend + 'static>(
&mut self,
Expand Down Expand Up @@ -158,7 +158,7 @@ impl HeaderSyncState {
let mut mdc = vec![];
log_mdc::iter(|k, v| mdc.push((k.to_owned(), v.to_owned())));
match synchronizer.synchronize().await {
Ok(sync_peer) => {
Ok((sync_peer, sync_result)) => {
log_mdc::extend(mdc);
info!(
target: LOG_TARGET,
Expand All @@ -174,9 +174,10 @@ impl HeaderSyncState {
}
}
self.is_synced = true;
StateEvent::HeadersSynchronized(sync_peer)
StateEvent::HeadersSynchronized(sync_peer, sync_result)
},
Err(err) => {
println!("HeaderSyncState::next_event - {}", err);
let _ignore = shared.status_event_sender.send(StatusInfo {
bootstrapped,
state_info: StateInfo::SyncFailed("HeaderSyncFailed".to_string()),
Expand All @@ -193,7 +194,7 @@ impl HeaderSyncState {
_ => {
log_mdc::extend(mdc);
debug!(target: LOG_TARGET, "Header sync failed: {}", err);
StateEvent::HeaderSyncFailed
StateEvent::HeaderSyncFailed(err.to_string())
},
}
},
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/sync/header_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ pub use error::BlockHeaderSyncError;
mod validator;

mod synchronizer;
pub use synchronizer::HeaderSynchronizer;
pub use synchronizer::{AttemptSyncResult, HeaderSyncStatus, HeaderSynchronizer};
68 changes: 49 additions & 19 deletions base_layer/core/src/base_node/sync/header_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
self.hooks.add_on_rewind_hook(hook);
}

pub async fn synchronize(&mut self) -> Result<SyncPeer, BlockHeaderSyncError> {
pub async fn synchronize(&mut self) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
debug!(target: LOG_TARGET, "Starting header sync.",);

info!(
Expand All @@ -118,7 +118,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
let mut latency_increases_counter = 0;
loop {
match self.try_sync_from_all_peers(max_latency).await {
Ok(sync_peer) => break Ok(sync_peer),
Ok((peer, sync_result)) => break Ok((peer, sync_result)),
Err(err @ BlockHeaderSyncError::AllSyncPeersExceedLatency) => {
// If we have few sync peers, throw this out to be retried later
if self.sync_peers.len() < 2 {
Expand All @@ -136,7 +136,10 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
}

#[allow(clippy::too_many_lines)]
pub async fn try_sync_from_all_peers(&mut self, max_latency: Duration) -> Result<SyncPeer, BlockHeaderSyncError> {
pub async fn try_sync_from_all_peers(
&mut self,
max_latency: Duration,
) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
let sync_peer_node_ids = self.sync_peers.iter().map(|p| p.node_id()).cloned().collect::<Vec<_>>();
info!(
target: LOG_TARGET,
Expand All @@ -146,7 +149,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
let mut latency_counter = 0usize;
for node_id in sync_peer_node_ids {
match self.connect_and_attempt_sync(&node_id, max_latency).await {
Ok(peer) => return Ok(peer),
Ok((peer, sync_result)) => return Ok((peer, sync_result)),
Err(err) => {
let ban_reason = BlockHeaderSyncError::get_ban_reason(
&err,
Expand Down Expand Up @@ -181,7 +184,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
&mut self,
node_id: &NodeId,
max_latency: Duration,
) -> Result<SyncPeer, BlockHeaderSyncError> {
) -> Result<(SyncPeer, AttemptSyncResult), BlockHeaderSyncError> {
let peer_index = self
.get_sync_peer_index(node_id)
.ok_or(BlockHeaderSyncError::PeerNotFound)?;
Expand Down Expand Up @@ -215,8 +218,8 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {

debug!(target: LOG_TARGET, "Sync peer latency is {:.2?}", latency);
let sync_peer = self.sync_peers[peer_index].clone();
self.attempt_sync(&sync_peer, client, max_latency).await?;
Ok(sync_peer)
let sync_result = self.attempt_sync(&sync_peer, client, max_latency).await?;
Ok((sync_peer, sync_result))
}

async fn dial_sync_peer(&self, node_id: &NodeId) -> Result<PeerConnection, BlockHeaderSyncError> {
Expand All @@ -237,7 +240,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
sync_peer: &SyncPeer,
mut client: rpc::BaseNodeSyncRpcClient,
max_latency: Duration,
) -> Result<(), BlockHeaderSyncError> {
) -> Result<AttemptSyncResult, BlockHeaderSyncError> {
let latency = client.get_last_request_latency();
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -268,10 +271,10 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
.find_chain_split(sync_peer.node_id(), &mut client, NUM_INITIAL_HEADERS_TO_REQUEST as u64)
.await?;
let header_sync_status = self
.determine_sync_status(&sync_peer.to_string(), best_header, best_block, peer_response)
.determine_sync_status(&sync_peer.to_string(), best_header, best_block, peer_response.clone())
.await?;

match header_sync_status {
match header_sync_status.clone() {
HeaderSyncStatus::InSyncOrAhead => {
if best_block_height < best_header_height {
debug!(
Expand All @@ -281,7 +284,11 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
best_block_height
);

Ok(())
Ok(AttemptSyncResult {
headers_returned: peer_response.headers.len() as u64,
fork_hash_index: peer_response.fork_hash_index,
header_sync_status,
})
} else {
// We will only attempt sync if the our accumulated difficulty is less than the peer's claimed
// accumulated difficulty, thus this is adverse behaviour form the peer.
Expand Down Expand Up @@ -313,7 +320,11 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
);
self.synchronize_headers(sync_peer.clone(), &mut client, *split_info, max_latency)
.await?;
Ok(())
Ok(AttemptSyncResult {
headers_returned: peer_response.headers.len() as u64,
fork_hash_index: peer_response.fork_hash_index,
header_sync_status,
})
},
}
}
Expand Down Expand Up @@ -448,7 +459,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
.map(BlockHeader::try_from)
.collect::<Result<Vec<_>, _>>()
.map_err(BlockHeaderSyncError::ReceivedInvalidHeader)?;
let num_new_headers = headers.len();
let num_new_headers = headers.len(); // Required fro later use, no 'Copy' trait on 'BlockHeader'

// NOTE: We can trust that the header associated with this hash exists because `block_hashes` was supplied by
// this node. Bounds checking for fork_hash_index has been done above.
Expand Down Expand Up @@ -786,6 +797,7 @@ impl<'a, B: BlockchainBackend + 'static> HeaderSynchronizer<'a, B> {
}
}

#[derive(Debug, Clone)]
struct PeerChainSplitResponse {
block_hashes: Vec<HashOutput>,
reorg_steps_back: u64,
Expand All @@ -794,14 +806,32 @@ struct PeerChainSplitResponse {
remote_tip_height: u64,
}

struct ChainSplitInfo {
best_block: ChainHeader,
remote_tip_height: u64,
reorg_steps_back: u64,
chain_split_hash: HashOutput,
/// Information about the chain split from the remote node.
#[derive(Debug, Clone, PartialEq)]
pub struct ChainSplitInfo {
/// The best block on the local chain.
pub best_block: ChainHeader,
/// The height of the remote node's tip.
pub remote_tip_height: u64,
/// The number of blocks to reorg back to the fork.
pub reorg_steps_back: u64,
/// The hash of the block at the fork.
pub chain_split_hash: HashOutput,
}

/// The result of an attempt to synchronize headers with a peer.
#[derive(Debug, Clone, PartialEq)]
pub struct AttemptSyncResult {
/// The number of headers that were returned.
pub headers_returned: u64,
/// The fork hash index of the remote peer.
pub fork_hash_index: u64,
/// The header sync status.
pub header_sync_status: HeaderSyncStatus,
}

enum HeaderSyncStatus {
#[derive(Debug, Clone, PartialEq)]
pub enum HeaderSyncStatus {
/// Local and remote node are in sync or ahead
InSyncOrAhead,
/// Local node is lagging behind remote node
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub use block_sync::{BlockSyncError, BlockSynchronizer};
#[cfg(feature = "base_node")]
mod header_sync;
#[cfg(feature = "base_node")]
pub use header_sync::{BlockHeaderSyncError, HeaderSynchronizer};
pub use header_sync::{AttemptSyncResult, BlockHeaderSyncError, HeaderSyncStatus, HeaderSynchronizer};

#[cfg(feature = "base_node")]
mod horizon_state_sync;
Expand Down
Loading

0 comments on commit 495f44a

Please sign in to comment.