Skip to content

Commit

Permalink
Investigate block sync timing issues
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed May 17, 2024
1 parent 69bc94a commit 484d6cf
Show file tree
Hide file tree
Showing 13 changed files with 353 additions and 360 deletions.
67 changes: 54 additions & 13 deletions base_layer/core/src/base_node/sync/block_sync/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
let mut last_sync_timer = Instant::now();
let mut avg_latency = RollingAverageTime::new(20);
while let Some(block_result) = block_stream.next().await {
let timer = Instant::now();
let total_timer = Instant::now();
let latency = last_sync_timer.elapsed();
avg_latency.add_sample(latency);
let block_body_response = block_result?;
Expand Down Expand Up @@ -310,6 +312,18 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
body.to_counts_string(),
latency
);
trace!(
target: LOG_TARGET,
"[block sync timings] 1.1 #{} Latency in {:.2?}",
current_height,
latency,
);
trace!(
target: LOG_TARGET,
"[block sync timings] 1.2 #{} Initial in {:.2?}",
current_height,
timer.elapsed(),
);

let timer = Instant::now();
let (header, header_accum_data) = header.into_parts();
Expand Down Expand Up @@ -351,10 +365,11 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
.map(Arc::new)
.ok_or(BlockSyncError::FailedToConstructChainBlock)?;

let validation_time = timer.elapsed();
debug!(
target: LOG_TARGET,
"Validated in {:.0?}. Storing block body #{} (PoW = {}, {})",
timer.elapsed(),
validation_time,
block.header().height,
block.header().pow_algo(),
block.block().body.to_counts_string(),
Expand All @@ -363,6 +378,12 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
target: LOG_TARGET,
"{}",block
);
trace!(
target: LOG_TARGET,
"[block sync timings] 1.3 #{} Validation in {:.2?}",
current_height,
validation_time,
);

let timer = Instant::now();
self.db
Expand All @@ -378,6 +399,26 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
)
.commit()
.await?;
let db_write_time = timer.elapsed();
debug!(
target: LOG_TARGET,
"Block body #{} added in {:.2?}, Tot_acc_diff {}, Monero {}, SHA3 {}, latency: {:.2?}",
block.height(),
db_write_time,
block
.accumulated_data()
.total_accumulated_difficulty,
block.accumulated_data().accumulated_randomx_difficulty,
block.accumulated_data().accumulated_sha3x_difficulty,
latency
);
trace!(
target: LOG_TARGET,
"[block sync timings] 1.4 #{} Db write in {:.2?}",
current_height,
db_write_time,
);
let timer = Instant::now();

// Average time between receiving blocks from the peer - used to detect a slow sync peer
let last_avg_latency = avg_latency.calculate_average_with_min_samples(5);
Expand All @@ -389,18 +430,6 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
self.hooks
.call_on_progress_block_hooks(block.clone(), tip_height, &sync_peer);

debug!(
target: LOG_TARGET,
"Block body #{} added in {:.0?}, Tot_acc_diff {}, Monero {}, SHA3 {}, latency: {:.2?}",
block.height(),
timer.elapsed(),
block
.accumulated_data()
.total_accumulated_difficulty,
block.accumulated_data().accumulated_randomx_difficulty,
block.accumulated_data().accumulated_sha3x_difficulty,
latency
);
if let Some(avg_latency) = last_avg_latency {
if avg_latency > max_latency {
return Err(BlockSyncError::MaxLatencyExceeded {
Expand All @@ -410,6 +439,18 @@ impl<'a, B: BlockchainBackend + 'static> BlockSynchronizer<'a, B> {
});
}
}
trace!(
target: LOG_TARGET,
"[block sync timings] 1.5 #{} After write in {:.2?}",
current_height,
timer.elapsed(),
);
trace!(
target: LOG_TARGET,
"[block sync timings] 1.6 #{} Total time in {:.2?}",
current_height,
total_timer.elapsed(),
);

current_block = Some(block);
last_sync_timer = Instant::now();
Expand Down
29 changes: 29 additions & 0 deletions base_layer/core/src/base_node/sync/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{
cmp,
convert::{TryFrom, TryInto},
sync::{Arc, Weak},
time::Instant,
};

use log::*;
Expand Down Expand Up @@ -184,6 +185,7 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
// Move token into this task
let peer_node_id = session_token;
for (start, end) in iter {
let timer = Instant::now();
if tx.is_closed() {
break;
}
Expand Down Expand Up @@ -231,6 +233,10 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
break;
}

let num_intputs = blocks.as_ref().map(|blocks| blocks.iter().map(|b| b.block().body.inputs().len()).sum::<usize>()).unwrap_or_default();
let num_outputs = blocks.as_ref().map(|blocks| blocks.iter().map(|b| b.block().body.outputs().len()).sum::<usize>()).unwrap_or_default();
let num_kernels = blocks.as_ref().map(|blocks| blocks.iter().map(|b| b.block().body.kernels().len()).sum::<usize>()).unwrap_or_default();

match blocks {
Ok(blocks) if blocks.is_empty() => {
break;
Expand All @@ -244,6 +250,18 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
})
});

trace!(
target: LOG_TARGET,
"[block sync timings] 2.1 #{} - #{}, inputs: {}, outputs: {}, kernels: {}, Fetch blocks in {:.2?}",
start,
end,
num_intputs,
num_outputs,
num_kernels,
timer.elapsed()
);
let timer = Instant::now();

// Ensure task stops if the peer prematurely stops their RPC session
if utils::mpsc::send_all(&tx, blocks).await.is_err() {
debug!(
Expand All @@ -252,6 +270,17 @@ impl<B: BlockchainBackend + 'static> BaseNodeSyncService for BaseNodeSyncRpcServ
);
break;
}

trace!(
target: LOG_TARGET,
"[block sync timings] 2.2 #{} - #{}, inputs: {}, outputs: {}, kernels: {}, Send blocks in {:.2?}",
start,
end,
num_intputs,
num_outputs,
num_kernels,
timer.elapsed()
);
},
Err(err) => {
let _result = tx.send(Err(err)).await;
Expand Down
59 changes: 59 additions & 0 deletions base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2049,12 +2049,16 @@ fn swap_to_highest_pow_chain<T: BlockchainBackend>(
consensus: &ConsensusManager,
smt: Arc<RwLock<OutputSmt>>,
) -> Result<BlockAddResult, ChainStorageError> {
let timer = Instant::now();
let metadata = db.fetch_chain_metadata()?;
let fetch_metadata_time_01 = timer.elapsed();
// lets clear out all remaining headers that dont have a matching block
// rewind to height will first delete the headers, then try delete from blocks, if we call this to the current
// height it will only trim the extra headers with no blocks
rewind_to_height(db, metadata.best_block_height(), smt.clone())?;
let rewind_to_height_time_02 = timer.elapsed();
let strongest_orphan_tips = db.fetch_strongest_orphan_chain_tips()?;
let fetch_orphan_tips_time_03 = timer.elapsed();
if strongest_orphan_tips.is_empty() {
// we have no orphan chain tips, we have trimmed remaining headers, we are on the best tip we have, so lets
// return ok
Expand All @@ -2071,6 +2075,7 @@ fn swap_to_highest_pow_chain<T: BlockchainBackend>(
);
ChainStorageError::InvalidOperation("No chain tips found in orphan pool".to_string())
})?;
let best_fork_header_time_04 = timer.elapsed();
let tip_header = db.fetch_tip_header()?;
match chain_strength_comparer.compare(&best_fork_header, &tip_header) {
Ordering::Greater => {
Expand All @@ -2095,17 +2100,63 @@ fn swap_to_highest_pow_chain<T: BlockchainBackend>(
return Ok(BlockAddResult::OrphanBlock);
},
}
let chain_strength_comparer_time_05 = timer.elapsed();

let reorg_chain = get_orphan_link_main_chain(db, best_fork_header.hash())?;
let fork_hash = reorg_chain
.front()
.expect("The new orphan block should be in the queue")
.header()
.prev_hash;
let get_orphan_link_time_06 = timer.elapsed();

let num_added_blocks = reorg_chain.len();
let removed_blocks = reorganize_chain(db, block_validator, fork_hash, &reorg_chain, consensus, smt)?;
let num_removed_blocks = removed_blocks.len();
let reorganize_chain_time_07 = timer.elapsed();

trace!(
target: LOG_TARGET,
"[block sync timings] 5.1 #{} swap_to_highest fetch metadata: {:.2?}",
metadata.best_block_height(),
fetch_metadata_time_01,
);
trace!(
target: LOG_TARGET,
"[block sync timings] 5.2 #{} rewind to height: {:.2?}",
metadata.best_block_height(),
rewind_to_height_time_02 - fetch_metadata_time_01,
);
trace!(
target: LOG_TARGET,
"[block sync timings] 5.3 #{} fetch orphan tips: {:.2?}",
metadata.best_block_height(),
fetch_orphan_tips_time_03 - rewind_to_height_time_02,
);
trace!(
target: LOG_TARGET,
"[block sync timings] 5.4 #{} best fork header: {:.2?}",
metadata.best_block_height(),
best_fork_header_time_04 - fetch_orphan_tips_time_03,
);
trace!(
target: LOG_TARGET,
"[block sync timings] 5.5 #{} chain strength compare: {:.2?}",
metadata.best_block_height(),
chain_strength_comparer_time_05 - best_fork_header_time_04,
);
trace!(
target: LOG_TARGET,
"[block sync timings] 5.6 #{} get orphan link: {:.2?}",
metadata.best_block_height(),
get_orphan_link_time_06 - chain_strength_comparer_time_05,
);
trace!(
target: LOG_TARGET,
"[block sync timings] 5.7 #{} reorganize chain: {:.2?}",
metadata.best_block_height(),
reorganize_chain_time_07 - get_orphan_link_time_06,
);

// reorg is required when any blocks are removed or more than one are added
// see https://github.com/tari-project/tari/issues/2101
Expand All @@ -2117,6 +2168,14 @@ fn swap_to_highest_pow_chain<T: BlockchainBackend>(
error!(target: LOG_TARGET, "Failed to track reorg: {}", e);
}
}
let track_reorgs_time_08 = timer.elapsed();

trace!(
target: LOG_TARGET,
"[block sync timings] 5.8 #{} track reorgs: {:.2?}",
metadata.best_block_height(),
track_reorgs_time_08 - reorganize_chain_time_07,
);

log!(
target: LOG_TARGET,
Expand Down
23 changes: 22 additions & 1 deletion base_layer/core/src/chain_storage/lmdb_db/lmdb_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ impl LMDBDatabase {
expected_prev_best_block,
timestamp,
} => {
let timer = Instant::now();
// for security we check that the best block does exist, and we check the previous value
// we dont want to check this if the prev block has never been set, this means a empty hash of 32
// bytes.
Expand Down Expand Up @@ -456,6 +457,12 @@ impl LMDBDatabase {
MetadataKey::BestBlockTimestamp,
&MetadataValue::BestBlockTimestamp(*timestamp),
)?;
trace!(
target: LOG_TARGET,
"[block sync timings] #{} Set best block height in {:.2?}",
height,
timer.elapsed(),
);
},
SetPruningHorizonConfig(pruning_horizon) => {
self.set_metadata(
Expand Down Expand Up @@ -1085,6 +1092,7 @@ impl LMDBDatabase {
}

fn delete_orphan(&self, txn: &WriteTransaction<'_>, hash: &HashOutput) -> Result<(), ChainStorageError> {
let timer = Instant::now();
let orphan = match lmdb_get::<_, Block>(txn, &self.orphans_db, hash.as_slice())? {
Some(orphan) => orphan,
None => {
Expand Down Expand Up @@ -1161,6 +1169,12 @@ impl LMDBDatabase {
)?;
}
lmdb_delete(txn, &self.orphans_db, hash.as_slice(), "orphans_db")?;
trace!(
target: LOG_TARGET,
"[block sync timings] #{} Deleted orphan block in {:.2?}",
orphan.header.height,
timer.elapsed(),
);
Ok(())
}

Expand All @@ -1173,6 +1187,7 @@ impl LMDBDatabase {
body: AggregateBody,
smt: Arc<RwLock<OutputSmt>>,
) -> Result<(), ChainStorageError> {
let timer = Instant::now();
let mut output_smt = smt.write().map_err(|e| {
error!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1341,6 +1356,12 @@ impl LMDBDatabase {
&BlockAccumulatedData::new(kernel_mmr.get_pruned_hash_set()?, total_kernel_sum),
)?;

trace!(
target: LOG_TARGET,
"[block sync timings] #{} Insert tip block body in {:.2?}",
header.height,
timer.elapsed(),
);
Ok(())
}

Expand Down Expand Up @@ -1769,7 +1790,7 @@ impl BlockchainBackend for LMDBDatabase {
Ok(_) => {
trace!(
target: LOG_TARGET,
"Database completed {} operation(s) in {:.0?}",
"Database completed {} operation(s) in {:.2?}",
num_operations,
mark.elapsed()
);
Expand Down
7 changes: 7 additions & 0 deletions base_layer/core/src/transactions/aggregated_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use std::{
cmp::max,
fmt::{Display, Error, Formatter},
time::Instant,
};

use borsh::{BorshDeserialize, BorshSerialize};
Expand Down Expand Up @@ -209,13 +210,19 @@ impl AggregateBody {
/// Verify the signatures in all kernels contained in this aggregate body. Clients must provide an offset that
/// will be added to the public key used in the signature verification.
pub fn verify_kernel_signatures(&self) -> Result<(), TransactionError> {
let timer = Instant::now();
trace!(target: LOG_TARGET, "Checking kernel signatures",);
for kernel in &self.kernels {
kernel.verify_signature().map_err(|e| {
warn!(target: LOG_TARGET, "Kernel ({}) signature failed {:?}.", kernel, e);
e
})?;
}
trace!(
target: LOG_TARGET,
"[block sync timings] 1.3.1 verify_kernel_signatures in {:.2?}",
timer.elapsed(),
);
Ok(())
}

Expand Down
Loading

0 comments on commit 484d6cf

Please sign in to comment.