Skip to content

Commit

Permalink
record last executed timestamp age v.s. the timestamp itself (#11801) (
Browse files Browse the repository at this point in the history
…#11883)

## Description 

Cherry pick #11801 

## Test Plan 

How did you test the new or updated feature?

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
longbowlu authored May 11, 2023
1 parent e4fe9f5 commit f2c6bd3
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 59 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 6 additions & 7 deletions crates/sui-core/src/checkpoints/checkpoint_executor/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ use std::sync::Arc;
pub struct CheckpointExecutorMetrics {
pub checkpoint_exec_sync_tps: IntGauge,
pub last_executed_checkpoint: IntGauge,
pub last_executed_checkpoint_timestamp_ms: IntGauge,
pub checkpoint_exec_errors: IntCounter,
pub checkpoint_exec_epoch: IntGauge,
pub checkpoint_exec_inflight: IntGauge,
pub checkpoint_exec_latency_us: Histogram,
pub checkpoint_prepare_latency_us: Histogram,
pub checkpoint_transaction_count: Histogram,
pub checkpoint_contents_age_ms: Histogram,
pub last_executed_checkpoint_age_ms: Histogram,
pub accumulator_inconsistent_state: IntGauge,
}

Expand All @@ -37,12 +37,6 @@ impl CheckpointExecutorMetrics {
registry
)
.unwrap(),
last_executed_checkpoint_timestamp_ms: register_int_gauge_with_registry!(
"last_executed_checkpoint_timestamp_ms",
"Last executed checkpoint timestamp ms",
registry
)
.unwrap(),
checkpoint_exec_errors: register_int_counter_with_registry!(
"checkpoint_exec_errors",
"Checkpoint execution errors count",
Expand Down Expand Up @@ -81,6 +75,11 @@ impl CheckpointExecutorMetrics {
"Age of checkpoints when they arrive for execution",
registry,
),
last_executed_checkpoint_age_ms: Histogram::new_in_registry(
"last_executed_checkpoint_age_ms",
"Age of the last executed checkpoint",
registry
),
accumulator_inconsistent_state: register_int_gauge_with_registry!(
"accumulator_inconsistent_state",
"1 if accumulated live object set differs from StateAccumulator root state hash for the previous epoch",
Expand Down
17 changes: 5 additions & 12 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant, SystemTime},
time::{Duration, Instant},
};

use futures::stream::FuturesOrdered;
Expand All @@ -39,7 +39,7 @@ use sui_types::{
messages_checkpoint::{CheckpointSequenceNumber, VerifiedCheckpoint},
};
use sui_types::{error::SuiResult, messages::TransactionDataAPI};
use tap::{TapFallible, TapOptional};
use tap::TapOptional;
use tokio::{
sync::broadcast::{self, error::RecvError},
task::JoinHandle,
Expand Down Expand Up @@ -202,12 +202,7 @@ impl CheckpointExecutor {
sequence_number = ?checkpoint.sequence_number,
"received checkpoint summary from state sync"
);
SystemTime::now().duration_since(checkpoint.timestamp())
.map(|latency|
self.metrics.checkpoint_contents_age_ms.report(latency.as_millis() as u64)
)
.tap_err(|err| warn!("unable to compute checkpoint age: {}", err))
.ok();
checkpoint.report_checkpoint_age_ms(&self.metrics.checkpoint_contents_age_ms);
},
// In this case, messages in the mailbox have been overwritten
// as a result of lagging too far behind.
Expand Down Expand Up @@ -236,7 +231,6 @@ impl CheckpointExecutor {
fn process_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
// Ensure that we are not skipping checkpoints at any point
let seq = *checkpoint.sequence_number();
let timestamp_ms = checkpoint.timestamp_ms;
if let Some(prev_highest) = self
.checkpoint_store
.get_highest_executed_checkpoint_seq_number()
Expand Down Expand Up @@ -275,9 +269,8 @@ impl CheckpointExecutor {
.update_highest_executed_checkpoint(checkpoint)
.unwrap();
self.metrics.last_executed_checkpoint.set(seq as i64);
self.metrics
.last_executed_checkpoint_timestamp_ms
.set(timestamp_ms as i64);

checkpoint.report_checkpoint_age_ms(&self.metrics.last_executed_checkpoint_age_ms);
}

async fn schedule_synced_checkpoints(
Expand Down
12 changes: 12 additions & 0 deletions crates/sui-core/src/checkpoints/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub struct CheckpointMetrics {
pub last_sent_checkpoint_signature: IntGauge,
pub highest_accumulated_epoch: IntGauge,
pub checkpoint_creation_latency_ms: Histogram,
pub last_created_checkpoint_age_ms: Histogram,
pub last_certified_checkpoint_age_ms: Histogram,
}

impl CheckpointMetrics {
Expand All @@ -39,6 +41,16 @@ impl CheckpointMetrics {
registry
)
.unwrap(),
last_created_checkpoint_age_ms: Histogram::new_in_registry(
"last_created_checkpoint_age_ms",
"Age of the last created checkpoint",
registry
),
last_certified_checkpoint_age_ms: Histogram::new_in_registry(
"last_certified_checkpoint_age_ms",
"Age of the last certified checkpoint",
registry
),
checkpoint_errors: register_int_counter_with_registry!(
"checkpoint_errors",
"Checkpoints errors count",
Expand Down
62 changes: 35 additions & 27 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,10 @@ impl CheckpointBuilder {
let mut last = self.epoch_store.last_built_checkpoint_commit_height();
for (height, pending) in self.epoch_store.get_pending_checkpoints(last) {
last = Some(height);
debug!("Making checkpoint at commit height {height}");
debug!(
checkpoint_commit_height = height,
"Making checkpoint at commit height"
);
if let Err(e) = self.make_checkpoint(height, pending).await {
error!("Error while making checkpoint, will retry in 1s: {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
Expand Down Expand Up @@ -608,8 +611,9 @@ impl CheckpointBuilder {
let mut batch = self.tables.checkpoint_content.batch();
for (summary, contents) in &new_checkpoint {
debug!(
"Created checkpoint from commit height {height} with sequence {}",
summary.sequence_number
checkpoint_commit_height = height,
checkpoint_seq = summary.sequence_number,
"Created checkpoint",
);
self.output
.checkpoint_created(summary, contents, &self.epoch_store)
Expand Down Expand Up @@ -874,9 +878,10 @@ impl CheckpointBuilder {
end_of_epoch_data,
timestamp_ms,
);
summary.report_checkpoint_age_ms(&self.metrics.last_created_checkpoint_age_ms);
if last_checkpoint_of_epoch {
info!(
?sequence_number,
checkpoint_seq = sequence_number,
"creating last checkpoint of epoch {}", epoch
);
if let Some(stats) = self.tables.get_epoch_stats(epoch, &summary) {
Expand Down Expand Up @@ -1090,15 +1095,15 @@ impl CheckpointAggregator {
for ((seq, index), data) in iter {
if seq != current.summary.sequence_number {
debug!(
"Not enough checkpoint signatures on height {}",
current.summary.sequence_number
checkpoint_seq =? current.summary.sequence_number,
"Not enough checkpoint signatures",
);
// No more signatures (yet) for this checkpoint
return Ok(result);
}
debug!(
"Processing signature for checkpoint {} (digest: {:?}) from {:?}",
current.summary.sequence_number,
checkpoint_seq = current.summary.sequence_number,
"Processing signature for checkpoint (digest: {:?}) from {:?}",
current.summary.digest(),
data.summary.auth_sig().authority.concise()
);
Expand All @@ -1121,6 +1126,9 @@ impl CheckpointAggregator {
self.metrics
.last_certified_checkpoint
.set(current.summary.sequence_number as i64);
current
.summary
.report_checkpoint_age_ms(&self.metrics.last_certified_checkpoint_age_ms);
result.push(summary.into_inner());
self.current = None;
continue 'outer;
Expand Down Expand Up @@ -1159,16 +1167,16 @@ impl CheckpointSignatureAggregator {
self.failures.insert_generic(author, signature)
{
panic!("Checkpoint fork detected - f+1 validators submitted checkpoint digest at seq {} different from our digest {}. Validators with different digests: {:?}",
self.summary.sequence_number,
self.digest,
data.keys()
self.summary.sequence_number,
self.digest,
data.keys()
);
}
warn!(
"Validator {:?} has mismatching checkpoint digest {} at seq {}, we have digest {}",
checkpoint_seq = self.summary.sequence_number,
"Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
author.concise(),
their_digest,
self.summary.sequence_number,
self.digest
);
return Err(());
Expand All @@ -1179,9 +1187,9 @@ impl CheckpointSignatureAggregator {
match self.signatures.insert(envelope) {
InsertResult::Failed { error } => {
warn!(
"Failed to aggregate new signature from validator {:?} for checkpoint {}: {:?}",
checkpoint_seq = self.summary.sequence_number,
"Failed to aggregate new signature from validator {:?}: {:?}",
author.concise(),
self.summary.sequence_number,
error
);
Err(())
Expand Down Expand Up @@ -1299,15 +1307,15 @@ impl CheckpointServiceNotify for CheckpointService {
{
if sequence <= last_certified {
debug!(
"Ignore signature for checkpoint sequence {} from {} - already certified",
info.summary.sequence_number, signer,
checkpoint_seq = sequence,
"Ignore checkpoint signature from {} - already certified", signer,
);
return Ok(());
}
}
debug!(
"Received signature for checkpoint sequence {}, digest {} from {}",
sequence,
checkpoint_seq = sequence,
"Received checkpoint signature, digest {} from {}",
info.summary.digest(),
signer,
);
Expand All @@ -1334,25 +1342,25 @@ impl CheckpointServiceNotify for CheckpointService {
panic!("Received checkpoint at index {} that contradicts previously stored checkpoint. Old digests: {:?}, new digests: {:?}", checkpoint.height(), pending.roots, checkpoint.roots);
}
debug!(
"Ignoring duplicate checkpoint notification at height {}",
checkpoint.height()
checkpoint_commit_height = checkpoint.height(),
"Ignoring duplicate checkpoint notification",
);
return Ok(());
}
debug!(
"Pending checkpoint at height {} has {} roots",
checkpoint.height(),
checkpoint_commit_height = checkpoint.height(),
"Pending checkpoint has {} roots",
checkpoint.roots.len(),
);
trace!(
"Transaction roots for pending checkpoint at height {}: {:?}",
checkpoint.height(),
checkpoint_commit_height = checkpoint.height(),
"Transaction roots for pending checkpoint: {:?}",
checkpoint.roots
);
epoch_store.insert_pending_checkpoint(&checkpoint.height(), &checkpoint)?;
debug!(
"Notifying builder about checkpoint at {}",
checkpoint.height()
checkpoint_commit_height = checkpoint.height(),
"Notifying builder about checkpoint",
);
self.notify_builder.notify_one();
Ok(())
Expand Down
8 changes: 3 additions & 5 deletions crates/sui-network/src/state_sync/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use mysten_metrics::histogram::Histogram;
use prometheus::{register_int_gauge_with_registry, IntGauge, Registry};
use std::sync::Arc;
use std::time::Duration;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use tap::Pipe;

Expand Down Expand Up @@ -46,12 +45,11 @@ impl Metrics {
}
}

pub fn report_checkpoint_summary_age(&self, age: Duration) {
pub fn checkpoint_summary_age_metric(&self) -> Option<&Histogram> {
if let Some(inner) = &self.0 {
inner
.checkpoint_summary_age_ms
.report(age.as_millis() as u64);
return Some(&inner.checkpoint_summary_age_ms);
}
None
}
}

Expand Down
14 changes: 6 additions & 8 deletions crates/sui-network/src/state_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use futures::{stream::FuturesOrdered, FutureExt, StreamExt};
use std::{
collections::HashMap,
sync::{Arc, RwLock},
time::{Duration, SystemTime},
time::Duration,
};
use sui_config::p2p::StateSyncConfig;
use sui_types::{
Expand All @@ -69,7 +69,7 @@ use tokio::{
sync::{broadcast, mpsc, watch},
task::{AbortHandle, JoinSet},
};
use tracing::{debug, info, trace, warn};
use tracing::{debug, info, trace};

mod generated {
include!(concat!(env!("OUT_DIR"), "/sui.StateSync.rs"));
Expand Down Expand Up @@ -906,12 +906,10 @@ where
}
};

debug!(sequence_number = ?checkpoint.sequence_number(), "verified checkpoint summary");
SystemTime::now()
.duration_since(checkpoint.timestamp())
.map(|latency| metrics.report_checkpoint_summary_age(latency))
.tap_err(|err| warn!("unable to compute checkpoint age: {}", err))
.ok();
debug!(checkpoint_seq = ?checkpoint.sequence_number(), "verified checkpoint summary");
if let Some(checkpoint_summary_age_metric) = metrics.checkpoint_summary_age_metric() {
checkpoint.report_checkpoint_age_ms(checkpoint_summary_age_metric);
}

current = checkpoint.clone();
// Insert the newly verified checkpoint into our store, which will bump our highest
Expand Down
1 change: 1 addition & 0 deletions crates/sui-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ sui-cost-tables = { path = "../sui-cost-tables"}
sui-protocol-config = { path = "../sui-protocol-config" }
shared-crypto = { path = "../shared-crypto" }
mysten-network = { path = "../mysten-network" }
mysten-metrics = { path = "../mysten-metrics" }
sui-macros = { path = "../sui-macros" }

fastcrypto = { workspace = true, features = ["copy_key"] }
Expand Down
17 changes: 17 additions & 0 deletions crates/sui-types/src/messages_checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ use shared_crypto::intent::IntentScope;
use std::fmt::{Debug, Display, Formatter};
use std::slice::Iter;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tap::TapFallible;
use tracing::warn;

pub use crate::digests::CheckpointContentsDigest;
pub use crate::digests::CheckpointDigest;

pub type CheckpointSequenceNumber = u64;
pub type CheckpointTimestamp = u64;

use mysten_metrics::histogram::Histogram;

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CheckpointRequest {
/// if a sequence number is specified, return the checkpoint with that sequence number;
Expand Down Expand Up @@ -204,6 +208,19 @@ impl CheckpointSummary {
.as_ref()
.map(|e| e.next_epoch_committee.as_slice())
}

pub fn report_checkpoint_age_ms(&self, metrics: &Histogram) {
SystemTime::now()
.duration_since(self.timestamp())
.map(|latency| metrics.report(latency.as_millis() as u64))
.tap_err(|err| {
warn!(
checkpoint_seq = self.sequence_number,
"unable to compute checkpoint age: {}", err
)
})
.ok();
}
}

impl Display for CheckpointSummary {
Expand Down

0 comments on commit f2c6bd3

Please sign in to comment.