Skip to content

Commit

Permalink
[State Sync] Update synced and synced epoch metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Oct 16, 2024
1 parent 184ece9 commit f3a3c4b
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 37 deletions.
26 changes: 12 additions & 14 deletions state-sync/state-sync-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,26 +350,24 @@ impl<
metrics::DRIVER_CONSENSUS_COMMIT_NOTIFICATION,
);

// Update the synced versions
let operations = [
metrics::StorageSynchronizerOperations::ExecutedTransactions,
metrics::StorageSynchronizerOperations::Synced,
];
for operation in operations {
metrics::increment_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
operation.get_label(),
consensus_commit_notification.transactions.len() as u64,
);
}
// Update the number of executed transactions
let num_synced_transactions = consensus_commit_notification.transactions.len();
metrics::increment_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
metrics::StorageSynchronizerOperations::ExecutedTransactions.get_label(),
num_synced_transactions as u64,
);

// Update the synced version metrics
utils::update_new_synced_metrics(self.storage.clone(), num_synced_transactions);

// Update the synced epoch
// Update the synced epoch metrics
if consensus_commit_notification
.subscribable_events
.iter()
.any(ContractEvent::is_new_epoch_event)
{
utils::update_new_epoch_metrics();
utils::update_new_epoch_metrics(self.storage.clone());
}
}

Expand Down
19 changes: 12 additions & 7 deletions state-sync/state-sync-driver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,16 @@ impl ExecutingComponent {
}
}

/// An enum of storage synchronizer operations performed by state sync
/// An enum of storage synchronizer operations performed by
/// state sync. Each of these is a metric label to track.
pub enum StorageSynchronizerOperations {
AppliedTransactionOutputs, // Applied a chunk of transactions outputs.
ExecutedTransactions, // Executed a chunk of transactions.
Synced, // Wrote a chunk of transactions and outputs to storage.
SyncedStates, // Wrote a chunk of state values to storage.
SyncedEpoch, // Wrote a chunk of transactions and outputs to storage that resulted in a new epoch.
AppliedTransactionOutputs, // The total number of applied transaction outputs
ExecutedTransactions, // The total number of executed transactions
Synced, // The latest synced version (as read from storage)
SyncedIncremental, // The latest synced version (calculated as the sum of all processed transactions)
SyncedStates, // The total number of synced states
SyncedEpoch, // The latest synced epoch (as read from storage)
SyncedEpochIncremental, // The latest synced epoch (calculated as the sum of all processed epochs)
}

impl StorageSynchronizerOperations {
Expand All @@ -74,8 +77,10 @@ impl StorageSynchronizerOperations {
},
StorageSynchronizerOperations::ExecutedTransactions => "executed_transactions",
StorageSynchronizerOperations::Synced => "synced",
StorageSynchronizerOperations::SyncedEpoch => "synced_epoch",
StorageSynchronizerOperations::SyncedIncremental => "synced_incremental",
StorageSynchronizerOperations::SyncedStates => "synced_states",
StorageSynchronizerOperations::SyncedEpoch => "synced_epoch",
StorageSynchronizerOperations::SyncedEpochIncremental => "synced_epoch_incremental",
}
}
}
Expand Down
13 changes: 6 additions & 7 deletions state-sync/state-sync-driver/src/storage_synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ impl<
commit_post_processor_notifier,
pending_data_chunks.clone(),
runtime.clone(),
storage.reader.clone(),
);

// Spawn the commit post-processor that handles commit notifications
Expand Down Expand Up @@ -694,6 +695,7 @@ fn spawn_committer<ChunkExecutor: ChunkExecutorTrait + 'static>(
mut commit_post_processor_notifier: mpsc::Sender<ChunkCommitNotification>,
pending_data_chunks: Arc<AtomicU64>,
runtime: Option<Handle>,
storage: Arc<dyn DbReader>,
) -> JoinHandle<()> {
// Create a committer
let committer = async move {
Expand Down Expand Up @@ -721,14 +723,11 @@ fn spawn_committer<ChunkExecutor: ChunkExecutorTrait + 'static>(
);

// Update the metrics for the newly committed data
metrics::increment_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
metrics::StorageSynchronizerOperations::Synced.get_label(),
notification.committed_transactions.len() as u64,
utils::update_new_synced_metrics(
storage.clone(),
notification.committed_transactions.len(),
);
if notification.reconfiguration_occurred {
utils::update_new_epoch_metrics();
}
utils::update_new_epoch_metrics(storage.clone());

// Update the metrics for the data notification commit post-process latency
metrics::observe_duration(
Expand Down
69 changes: 60 additions & 9 deletions state-sync/state-sync-driver/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ pub fn initialize_sync_gauges(storage: Arc<dyn DbReader>) -> Result<(), Error> {
metrics::StorageSynchronizerOperations::AppliedTransactionOutputs,
metrics::StorageSynchronizerOperations::ExecutedTransactions,
metrics::StorageSynchronizerOperations::Synced,
metrics::StorageSynchronizerOperations::SyncedIncremental,
];
for metric in metrics {
metrics::set_gauge(
Expand All @@ -302,13 +303,19 @@ pub fn initialize_sync_gauges(storage: Arc<dyn DbReader>) -> Result<(), Error> {
);
}

// Update the latest synced epoch
// Update the latest synced epochs
let highest_synced_epoch = fetch_latest_epoch_state(storage)?.epoch;
metrics::set_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
metrics::StorageSynchronizerOperations::SyncedEpoch.get_label(),
highest_synced_epoch,
);
let metrics = [
metrics::StorageSynchronizerOperations::SyncedEpoch,
metrics::StorageSynchronizerOperations::SyncedEpochIncremental,
];
for metric in metrics {
metrics::set_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
metric.get_label(),
highest_synced_epoch,
);
}

Ok(())
}
Expand Down Expand Up @@ -365,15 +372,59 @@ pub async fn handle_committed_transactions<
}

/// Updates the metrics to handle an epoch change event
pub fn update_new_epoch_metrics() {
// Increment the epoch
metrics::increment_gauge(
pub fn update_new_epoch_metrics(storage: Arc<dyn DbReader>) {
// Update the epoch metric (by reading directly from storage)
let highest_synced_epoch = match fetch_latest_epoch_state(storage.clone()) {
Ok(epoch_state) => epoch_state.epoch,
Err(error) => {
error!(LogSchema::new(LogEntry::Driver).message(&format!(
"Failed to fetch the latest epoch state from storage! Error: {:?}",
error
)));
return;
},
};
metrics::set_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
metrics::StorageSynchronizerOperations::SyncedEpoch.get_label(),
highest_synced_epoch,
);

// Update the incremental epoch metric (by incrementing the current value)
metrics::increment_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
metrics::StorageSynchronizerOperations::SyncedEpochIncremental.get_label(),
1,
);
}

/// Updates the metrics to handle newly synced transactions
pub fn update_new_synced_metrics(storage: Arc<dyn DbReader>, num_synced_transactions: usize) {
// Update the version metric (by reading directly from storage)
let highest_synced_version = match fetch_pre_committed_version(storage.clone()) {
Ok(highest_synced_version) => highest_synced_version,
Err(error) => {
error!(LogSchema::new(LogEntry::Driver).message(&format!(
"Failed to fetch the pre committed version from storage! Error: {:?}",
error
)));
return;
},
};
metrics::set_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
metrics::StorageSynchronizerOperations::Synced.get_label(),
highest_synced_version,
);

// Update the incremental version metric (by incrementing the current value)
metrics::increment_gauge(
&metrics::STORAGE_SYNCHRONIZER_OPERATIONS,
metrics::StorageSynchronizerOperations::SyncedIncremental.get_label(),
num_synced_transactions as u64,
);
}

/// Executes the given list of transactions and
/// returns the number of transactions in the list.
pub async fn execute_transactions<StorageSyncer: StorageSynchronizerInterface>(
Expand Down

0 comments on commit f3a3c4b

Please sign in to comment.