diff --git a/state-sync/state-sync-driver/src/driver.rs b/state-sync/state-sync-driver/src/driver.rs index 832fe54e6b7fe6..03ffaa75acf7de 100644 --- a/state-sync/state-sync-driver/src/driver.rs +++ b/state-sync/state-sync-driver/src/driver.rs @@ -350,26 +350,24 @@ impl< metrics::DRIVER_CONSENSUS_COMMIT_NOTIFICATION, ); - // Update the synced versions - let operations = [ - metrics::StorageSynchronizerOperations::ExecutedTransactions, - metrics::StorageSynchronizerOperations::Synced, - ]; - for operation in operations { - metrics::increment_gauge( - &metrics::STORAGE_SYNCHRONIZER_OPERATIONS, - operation.get_label(), - consensus_commit_notification.transactions.len() as u64, - ); - } + // Update the number of executed transactions + let num_synced_transactions = consensus_commit_notification.transactions.len(); + metrics::increment_gauge( + &metrics::STORAGE_SYNCHRONIZER_OPERATIONS, + metrics::StorageSynchronizerOperations::ExecutedTransactions.get_label(), + num_synced_transactions as u64, + ); + + // Update the synced version metrics + utils::update_new_synced_metrics(self.storage.clone(), num_synced_transactions); - // Update the synced epoch + // Update the synced epoch metrics if consensus_commit_notification .subscribable_events .iter() .any(ContractEvent::is_new_epoch_event) { - utils::update_new_epoch_metrics(); + utils::update_new_epoch_metrics(self.storage.clone()); } } diff --git a/state-sync/state-sync-driver/src/metrics.rs b/state-sync/state-sync-driver/src/metrics.rs index b87b4f910536bb..0838749ced3fce 100644 --- a/state-sync/state-sync-driver/src/metrics.rs +++ b/state-sync/state-sync-driver/src/metrics.rs @@ -57,13 +57,16 @@ impl ExecutingComponent { } } -/// An enum of storage synchronizer operations performed by state sync +/// An enum of storage synchronizer operations performed by +/// state sync. Each of these is a metric label to track. pub enum StorageSynchronizerOperations { - AppliedTransactionOutputs, // Applied a chunk of transactions outputs. - ExecutedTransactions, // Executed a chunk of transactions. - Synced, // Wrote a chunk of transactions and outputs to storage. - SyncedStates, // Wrote a chunk of state values to storage. - SyncedEpoch, // Wrote a chunk of transactions and outputs to storage that resulted in a new epoch. + AppliedTransactionOutputs, // The total number of applied transaction outputs + ExecutedTransactions, // The total number of executed transactions + Synced, // The latest synced version (as read from storage) + SyncedIncremental, // The latest synced version (calculated as the sum of all processed transactions) + SyncedStates, // The total number of synced states + SyncedEpoch, // The latest synced epoch (as read from storage) + SyncedEpochIncremental, // The latest synced epoch (calculated as the sum of all processed epochs) } impl StorageSynchronizerOperations { @@ -74,8 +77,10 @@ impl StorageSynchronizerOperations { }, StorageSynchronizerOperations::ExecutedTransactions => "executed_transactions", StorageSynchronizerOperations::Synced => "synced", - StorageSynchronizerOperations::SyncedEpoch => "synced_epoch", + StorageSynchronizerOperations::SyncedIncremental => "synced_incremental", StorageSynchronizerOperations::SyncedStates => "synced_states", + StorageSynchronizerOperations::SyncedEpoch => "synced_epoch", + StorageSynchronizerOperations::SyncedEpochIncremental => "synced_epoch_incremental", } } } diff --git a/state-sync/state-sync-driver/src/storage_synchronizer.rs b/state-sync/state-sync-driver/src/storage_synchronizer.rs index 82aa8e4750d1e1..3b1b3b9b6f15c1 100644 --- a/state-sync/state-sync-driver/src/storage_synchronizer.rs +++ b/state-sync/state-sync-driver/src/storage_synchronizer.rs @@ -259,6 +259,7 @@ impl< commit_post_processor_notifier, pending_data_chunks.clone(), runtime.clone(), + storage.reader.clone(), ); // Spawn the commit post-processor that handles commit notifications @@ -694,6 +695,7 @@ fn spawn_committer( mut commit_post_processor_notifier: mpsc::Sender, pending_data_chunks: Arc, runtime: Option, + storage: Arc, ) -> JoinHandle<()> { // Create a committer let committer = async move { @@ -721,14 +723,11 @@ fn spawn_committer( ); // Update the metrics for the newly committed data - metrics::increment_gauge( - &metrics::STORAGE_SYNCHRONIZER_OPERATIONS, - metrics::StorageSynchronizerOperations::Synced.get_label(), - notification.committed_transactions.len() as u64, + utils::update_new_synced_metrics( + storage.clone(), + notification.committed_transactions.len(), ); - if notification.reconfiguration_occurred { - utils::update_new_epoch_metrics(); - } + utils::update_new_epoch_metrics(storage.clone()); // Update the metrics for the data notification commit post-process latency metrics::observe_duration( diff --git a/state-sync/state-sync-driver/src/utils.rs b/state-sync/state-sync-driver/src/utils.rs index 24ca021cca3738..2d8a9023f5fc27 100644 --- a/state-sync/state-sync-driver/src/utils.rs +++ b/state-sync/state-sync-driver/src/utils.rs @@ -293,6 +293,7 @@ pub fn initialize_sync_gauges(storage: Arc) -> Result<(), Error> { metrics::StorageSynchronizerOperations::AppliedTransactionOutputs, metrics::StorageSynchronizerOperations::ExecutedTransactions, metrics::StorageSynchronizerOperations::Synced, + metrics::StorageSynchronizerOperations::SyncedIncremental, ]; for metric in metrics { metrics::set_gauge( @@ -302,13 +303,19 @@ pub fn initialize_sync_gauges(storage: Arc) -> Result<(), Error> { ); } - // Update the latest synced epoch + // Update the latest synced epochs let highest_synced_epoch = fetch_latest_epoch_state(storage)?.epoch; - metrics::set_gauge( - &metrics::STORAGE_SYNCHRONIZER_OPERATIONS, - metrics::StorageSynchronizerOperations::SyncedEpoch.get_label(), - highest_synced_epoch, - ); + let metrics = [ + metrics::StorageSynchronizerOperations::SyncedEpoch, + metrics::StorageSynchronizerOperations::SyncedEpochIncremental, + ]; + for metric in metrics { + metrics::set_gauge( + &metrics::STORAGE_SYNCHRONIZER_OPERATIONS, + metric.get_label(), + highest_synced_epoch, + ); + } Ok(()) } @@ -365,15 +372,59 @@ pub async fn handle_committed_transactions< } /// Updates the metrics to handle an epoch change event -pub fn update_new_epoch_metrics() { - // Increment the epoch - metrics::increment_gauge( +pub fn update_new_epoch_metrics(storage: Arc) { + // Update the epoch metric (by reading directly from storage) + let highest_synced_epoch = match fetch_latest_epoch_state(storage.clone()) { + Ok(epoch_state) => epoch_state.epoch, + Err(error) => { + error!(LogSchema::new(LogEntry::Driver).message(&format!( + "Failed to fetch the latest epoch state from storage! Error: {:?}", + error + ))); + return; + }, + }; + metrics::set_gauge( &metrics::STORAGE_SYNCHRONIZER_OPERATIONS, metrics::StorageSynchronizerOperations::SyncedEpoch.get_label(), + highest_synced_epoch, + ); + + // Update the incremental epoch metric (by incrementing the current value) + metrics::increment_gauge( + &metrics::STORAGE_SYNCHRONIZER_OPERATIONS, + metrics::StorageSynchronizerOperations::SyncedEpochIncremental.get_label(), 1, ); } +/// Updates the metrics to handle newly synced transactions +pub fn update_new_synced_metrics(storage: Arc, num_synced_transactions: usize) { + // Update the version metric (by reading directly from storage) + let highest_synced_version = match fetch_pre_committed_version(storage.clone()) { + Ok(highest_synced_version) => highest_synced_version, + Err(error) => { + error!(LogSchema::new(LogEntry::Driver).message(&format!( + "Failed to fetch the pre committed version from storage! Error: {:?}", + error + ))); + return; + }, + }; + metrics::set_gauge( + &metrics::STORAGE_SYNCHRONIZER_OPERATIONS, + metrics::StorageSynchronizerOperations::Synced.get_label(), + highest_synced_version, + ); + + // Update the incremental version metric (by incrementing the current value) + metrics::increment_gauge( + &metrics::STORAGE_SYNCHRONIZER_OPERATIONS, + metrics::StorageSynchronizerOperations::SyncedIncremental.get_label(), + num_synced_transactions as u64, + ); +} + /// Executes the given list of transactions and /// returns the number of transactions in the list. pub async fn execute_transactions(