diff --git a/state-sync/state-sync-driver/src/driver.rs b/state-sync/state-sync-driver/src/driver.rs index 68481cdb354bf..2ed74268af4cf 100644 --- a/state-sync/state-sync-driver/src/driver.rs +++ b/state-sync/state-sync-driver/src/driver.rs @@ -362,27 +362,23 @@ impl< metrics::DRIVER_CONSENSUS_COMMIT_NOTIFICATION, ); - // Update the synced versions - let operations = [ - metrics::StorageSynchronizerOperations::ExecutedTransactions, - metrics::StorageSynchronizerOperations::Synced, - ]; - for operation in operations { - metrics::increment_gauge( - &metrics::STORAGE_SYNCHRONIZER_OPERATIONS, - operation.get_label(), - consensus_commit_notification.get_transactions().len() as u64, - ); - } + // Update the number of executed transactions + let num_synced_transactions = consensus_commit_notification.get_transactions().len(); + metrics::increment_gauge( + &metrics::STORAGE_SYNCHRONIZER_OPERATIONS, + metrics::StorageSynchronizerOperations::ExecutedTransactions.get_label(), + num_synced_transactions as u64, + ); - // Update the synced epoch - if consensus_commit_notification + // Update the synced version metrics + utils::update_new_synced_metrics(self.storage.clone(), num_synced_transactions); + + // Update the synced epoch metrics + let reconfiguration_occurred = consensus_commit_notification .get_subscribable_events() .iter() - .any(ContractEvent::is_new_epoch_event) - { - utils::update_new_epoch_metrics(); - } + .any(ContractEvent::is_new_epoch_event); + utils::update_new_epoch_metrics(self.storage.clone(), reconfiguration_occurred); } /// Handles a consensus or consensus observer request to sync for a specified duration diff --git a/state-sync/state-sync-driver/src/metrics.rs b/state-sync/state-sync-driver/src/metrics.rs index 8d27f2da10cd7..dae906ff5016a 100644 --- a/state-sync/state-sync-driver/src/metrics.rs +++ b/state-sync/state-sync-driver/src/metrics.rs @@ -60,13 +60,16 @@ impl ExecutingComponent { } } -/// An enum of storage synchronizer operations performed by state sync +/// An enum of storage synchronizer operations performed by +/// state sync. Each of these is a metric label to track. pub enum StorageSynchronizerOperations { - AppliedTransactionOutputs, // Applied a chunk of transactions outputs. - ExecutedTransactions, // Executed a chunk of transactions. - Synced, // Wrote a chunk of transactions and outputs to storage. - SyncedStates, // Wrote a chunk of state values to storage. - SyncedEpoch, // Wrote a chunk of transactions and outputs to storage that resulted in a new epoch. + AppliedTransactionOutputs, // The total number of applied transaction outputs + ExecutedTransactions, // The total number of executed transactions + Synced, // The latest synced version (as read from storage) + SyncedIncremental, // The latest synced version (calculated as the sum of all processed transactions) + SyncedStates, // The total number of synced states + SyncedEpoch, // The latest synced epoch (as read from storage) + SyncedEpochIncremental, // The latest synced epoch (calculated as the sum of all processed epochs) } impl StorageSynchronizerOperations { @@ -77,8 +80,10 @@ impl StorageSynchronizerOperations { }, StorageSynchronizerOperations::ExecutedTransactions => "executed_transactions", StorageSynchronizerOperations::Synced => "synced", - StorageSynchronizerOperations::SyncedEpoch => "synced_epoch", + StorageSynchronizerOperations::SyncedIncremental => "synced_incremental", StorageSynchronizerOperations::SyncedStates => "synced_states", + StorageSynchronizerOperations::SyncedEpoch => "synced_epoch", + StorageSynchronizerOperations::SyncedEpochIncremental => "synced_epoch_incremental", } } } diff --git a/state-sync/state-sync-driver/src/storage_synchronizer.rs b/state-sync/state-sync-driver/src/storage_synchronizer.rs index 82aa8e4750d1e..7b22e39dd997c 100644 --- a/state-sync/state-sync-driver/src/storage_synchronizer.rs +++ b/state-sync/state-sync-driver/src/storage_synchronizer.rs @@ -259,6 +259,7 @@ impl< commit_post_processor_notifier, pending_data_chunks.clone(), runtime.clone(), + storage.reader.clone(), ); // Spawn the commit post-processor that handles commit notifications @@ -694,6 +695,7 @@ fn spawn_committer( mut commit_post_processor_notifier: mpsc::Sender, pending_data_chunks: Arc, runtime: Option, + storage: Arc, ) -> JoinHandle<()> { // Create a committer let committer = async move { @@ -720,15 +722,15 @@ fn spawn_committer( )) ); - // Update the metrics for the newly committed data - metrics::increment_gauge( - &metrics::STORAGE_SYNCHRONIZER_OPERATIONS, - metrics::StorageSynchronizerOperations::Synced.get_label(), - notification.committed_transactions.len() as u64, + // Update the synced version metrics + utils::update_new_synced_metrics( + storage.clone(), + notification.committed_transactions.len(), ); - if notification.reconfiguration_occurred { - utils::update_new_epoch_metrics(); - } + + // Update the synced epoch metrics + let reconfiguration_occurred = notification.reconfiguration_occurred; + utils::update_new_epoch_metrics(storage.clone(), reconfiguration_occurred); // Update the metrics for the data notification commit post-process latency metrics::observe_duration( diff --git a/state-sync/state-sync-driver/src/utils.rs b/state-sync/state-sync-driver/src/utils.rs index 24ca021cca373..fa64ceccf388d 100644 --- a/state-sync/state-sync-driver/src/utils.rs +++ b/state-sync/state-sync-driver/src/utils.rs @@ -293,6 +293,7 @@ pub fn initialize_sync_gauges(storage: Arc) -> Result<(), Error> { metrics::StorageSynchronizerOperations::AppliedTransactionOutputs, metrics::StorageSynchronizerOperations::ExecutedTransactions, metrics::StorageSynchronizerOperations::Synced, + metrics::StorageSynchronizerOperations::SyncedIncremental, ]; for metric in metrics { metrics::set_gauge( @@ -302,13 +303,19 @@ pub fn initialize_sync_gauges(storage: Arc) -> Result<(), Error> { ); } - // Update the latest synced epoch + // Update the latest synced epochs let highest_synced_epoch = fetch_latest_epoch_state(storage)?.epoch; - metrics::set_gauge( - &metrics::STORAGE_SYNCHRONIZER_OPERATIONS, - metrics::StorageSynchronizerOperations::SyncedEpoch.get_label(), - highest_synced_epoch, - ); + let metrics = [ + metrics::StorageSynchronizerOperations::SyncedEpoch, + metrics::StorageSynchronizerOperations::SyncedEpochIncremental, + ]; + for metric in metrics { + metrics::set_gauge( + &metrics::STORAGE_SYNCHRONIZER_OPERATIONS, + metric.get_label(), + highest_synced_epoch, + ); + } Ok(()) } @@ -365,12 +372,58 @@ pub async fn handle_committed_transactions< } /// Updates the metrics to handle an epoch change event -pub fn update_new_epoch_metrics() { - // Increment the epoch - metrics::increment_gauge( +pub fn update_new_epoch_metrics(storage: Arc, reconfiguration_occurred: bool) { + // Update the epoch metric (by reading directly from storage) + let highest_synced_epoch = match fetch_latest_epoch_state(storage.clone()) { + Ok(epoch_state) => epoch_state.epoch, + Err(error) => { + error!(LogSchema::new(LogEntry::Driver).message(&format!( + "Failed to fetch the latest epoch state from storage! Error: {:?}", + error + ))); + return; + }, + }; + metrics::set_gauge( &metrics::STORAGE_SYNCHRONIZER_OPERATIONS, metrics::StorageSynchronizerOperations::SyncedEpoch.get_label(), - 1, + highest_synced_epoch, + ); + + // Update the incremental epoch metric (by incrementing the current value) + if reconfiguration_occurred { + metrics::increment_gauge( + &metrics::STORAGE_SYNCHRONIZER_OPERATIONS, + metrics::StorageSynchronizerOperations::SyncedEpochIncremental.get_label(), + 1, + ); + } +} + +/// Updates the metrics to handle newly synced transactions +pub fn update_new_synced_metrics(storage: Arc, num_synced_transactions: usize) { + // Update the version metric (by reading directly from storage) + let highest_synced_version = match fetch_pre_committed_version(storage.clone()) { + Ok(highest_synced_version) => highest_synced_version, + Err(error) => { + error!(LogSchema::new(LogEntry::Driver).message(&format!( + "Failed to fetch the pre committed version from storage! Error: {:?}", + error + ))); + return; + }, + }; + metrics::set_gauge( + &metrics::STORAGE_SYNCHRONIZER_OPERATIONS, + metrics::StorageSynchronizerOperations::Synced.get_label(), + highest_synced_version, + ); + + // Update the incremental version metric (by incrementing the current value) + metrics::increment_gauge( + &metrics::STORAGE_SYNCHRONIZER_OPERATIONS, + metrics::StorageSynchronizerOperations::SyncedIncremental.get_label(), + num_synced_transactions as u64, ); }