diff --git a/config/src/config/consensus_observer_config.rs b/config/src/config/consensus_observer_config.rs index 384513b2ceabd..602542ba55ac1 100644 --- a/config/src/config/consensus_observer_config.rs +++ b/config/src/config/consensus_observer_config.rs @@ -54,7 +54,9 @@ pub struct ConsensusObserverConfig { /// Duration (in milliseconds) we'll wait on startup before considering fallback mode pub observer_fallback_startup_period_ms: u64, /// Duration (in milliseconds) we'll wait for syncing progress before entering fallback mode - pub observer_fallback_sync_threshold_ms: u64, + pub observer_fallback_progress_threshold_ms: u64, + /// Duration (in milliseconds) of acceptable sync lag before entering fallback mode + pub observer_fallback_sync_lag_threshold_ms: u64, } impl Default for ConsensusObserverConfig { @@ -76,7 +78,8 @@ impl Default for ConsensusObserverConfig { subscription_refresh_interval_ms: 600_000, // 10 minutes observer_fallback_duration_ms: 600_000, // 10 minutes observer_fallback_startup_period_ms: 60_000, // 60 seconds - observer_fallback_sync_threshold_ms: 30_000, // 30 seconds + observer_fallback_progress_threshold_ms: 10_000, // 10 seconds + observer_fallback_sync_lag_threshold_ms: 15_000, // 15 seconds } } } diff --git a/consensus/src/consensus_observer/common/error.rs b/consensus/src/consensus_observer/common/error.rs index 6201e66fc4313..6bc1a063e7fb8 100644 --- a/consensus/src/consensus_observer/common/error.rs +++ b/consensus/src/consensus_observer/common/error.rs @@ -12,6 +12,9 @@ pub enum Error { #[error("Network error: {0}")] NetworkError(String), + #[error("Consensus observer falling behind: {0}")] + ObserverFallingBehind(String), + #[error("Consensus observer progress stopped: {0}")] ObserverProgressStopped(String), @@ -43,6 +46,7 @@ impl Error { match self { Self::InvalidMessageError(_) => "invalid_message_error", Self::NetworkError(_) => "network_error", + Self::ObserverFallingBehind(_) => "observer_falling_behind", Self::ObserverProgressStopped(_) => "observer_progress_stopped", Self::RpcError(_) => "rpc_error", Self::SubscriptionDisconnected(_) => "subscription_disconnected", diff --git a/consensus/src/consensus_observer/observer/fallback_manager.rs b/consensus/src/consensus_observer/observer/fallback_manager.rs index dcdab0dd42392..0c048f860fe15 100644 --- a/consensus/src/consensus_observer/observer/fallback_manager.rs +++ b/consensus/src/consensus_observer/observer/fallback_manager.rs @@ -1,11 +1,15 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::consensus_observer::common::error::Error; +use crate::consensus_observer::common::{ + error::Error, + logging::{LogEntry, LogSchema}, +}; use aptos_config::config::ConsensusObserverConfig; +use aptos_logger::warn; use aptos_storage_interface::DbReader; use aptos_time_service::{TimeService, TimeServiceTrait}; -use aptos_types::ledger_info::LedgerInfoWithSignatures; +use aptos_types::{ledger_info::LedgerInfoWithSignatures, transaction::Version}; use std::{ sync::Arc, time::{Duration, Instant}, @@ -48,7 +52,8 @@ impl ObserverFallbackManager { } } - /// Verifies that the DB is continuing to sync and commit new data. + /// Verifies that the DB is continuing to sync and commit new data, and that + /// the node has not fallen too far behind the rest of the network. /// If not, an error is returned, indicating that we should enter fallback mode. pub fn check_syncing_progress(&mut self) -> Result<(), Error> { // If we're still within the startup period, we don't need to verify progress @@ -61,8 +66,8 @@ impl ObserverFallbackManager { return Ok(()); // We're still in the startup period } - // Otherwise, fetch the synced version from storage - let current_synced_version = + // Fetch the synced ledger info version from storage + let latest_ledger_info_version = self.db_reader .get_latest_ledger_info_version() .map_err(|error| { @@ -72,15 +77,29 @@ impl ObserverFallbackManager { )) })?; + // Verify that the synced version is increasing appropriately + self.verify_increasing_sync_versions(latest_ledger_info_version, time_now)?; + + // Verify that the sync lag is within acceptable limits + self.verify_sync_lag_health(latest_ledger_info_version) + } + + /// Verifies that the synced version is increasing appropriately. If not + /// (i.e., too much time has passed without an increase), an error is returned. + fn verify_increasing_sync_versions( + &mut self, + latest_ledger_info_version: Version, + time_now: Instant, + ) -> Result<(), Error> { // Verify that the synced version is increasing appropriately let (highest_synced_version, highest_version_timestamp) = self.highest_synced_version_and_time; - if current_synced_version <= highest_synced_version { + if latest_ledger_info_version <= highest_synced_version { // The synced version hasn't increased. Check if we should enter fallback mode. let duration_since_highest_seen = time_now.duration_since(highest_version_timestamp); let fallback_threshold = Duration::from_millis( self.consensus_observer_config - .observer_fallback_sync_threshold_ms, + .observer_fallback_progress_threshold_ms, ); if duration_since_highest_seen > fallback_threshold { Err(Error::ObserverProgressStopped(format!( @@ -92,11 +111,48 @@ impl ObserverFallbackManager { } } else { // The synced version has increased. Update the highest synced version and time. - self.highest_synced_version_and_time = (current_synced_version, time_now); + self.highest_synced_version_and_time = (latest_ledger_info_version, time_now); Ok(()) } } + /// Verifies that the sync lag is within acceptable limits. If not, an error is returned. + fn verify_sync_lag_health(&self, latest_ledger_info_version: Version) -> Result<(), Error> { + // Get the latest block timestamp from storage + let latest_block_timestamp_usecs = match self + .db_reader + .get_block_timestamp(latest_ledger_info_version) + { + Ok(block_timestamp_usecs) => block_timestamp_usecs, + Err(error) => { + // Log a warning and return without entering fallback mode + warn!(LogSchema::new(LogEntry::ConsensusObserver) + .message(&format!("Failed to read block timestamp: {:?}", error))); + return Ok(()); + }, + }; + + // Get the current time (in microseconds) + let timestamp_now_usecs = self.time_service.now_unix_time().as_micros() as u64; + + // Calculate the block timestamp lag (saturating at 0) + let timestamp_lag_usecs = timestamp_now_usecs.saturating_sub(latest_block_timestamp_usecs); + let timestamp_lag_duration = Duration::from_micros(timestamp_lag_usecs); + + // Check if the sync lag is within acceptable limits + let sync_lag_threshold_ms = self + .consensus_observer_config + .observer_fallback_sync_lag_threshold_ms; + if timestamp_lag_duration > Duration::from_millis(sync_lag_threshold_ms) { + return Err(Error::ObserverFallingBehind(format!( + "Consensus observer is falling behind! Highest synced version: {}, sync lag: {:?}", + latest_ledger_info_version, timestamp_lag_duration + ))); + } + + Ok(()) + } + /// Resets the syncing progress to the latest synced ledger info and current time pub fn reset_syncing_progress(&mut self, latest_synced_ledger_info: &LedgerInfoWithSignatures) { // Get the current time and highest synced version @@ -124,18 +180,19 @@ mod test { mock! { pub DatabaseReader {} impl DbReader for DatabaseReader { + fn get_block_timestamp(&self, version: Version) -> Result; + fn get_latest_ledger_info_version(&self) -> Result; } } #[test] - fn test_check_syncing_progress() { + fn test_verify_increasing_sync_versions() { // Create a consensus observer config - let observer_fallback_sync_threshold_ms = 10_000; - let observer_fallback_startup_period_ms = 0; // Disable the startup period + let observer_fallback_progress_threshold_ms = 10_000; let consensus_observer_config = ConsensusObserverConfig { - observer_fallback_startup_period_ms, - observer_fallback_sync_threshold_ms, + observer_fallback_startup_period_ms: 0, // Disable the startup period + observer_fallback_progress_threshold_ms, ..ConsensusObserverConfig::default() }; @@ -150,6 +207,9 @@ mod test { mock_db_reader .expect_get_latest_ledger_info_version() .returning(move || Ok(second_synced_version)); // Allow multiple calls for the second version + mock_db_reader + .expect_get_block_timestamp() + .returning(move |_| Ok(u64::MAX)); // Return a dummy block timestamp // Create a new fallback manager let time_service = TimeService::mock(); @@ -169,7 +229,7 @@ mod test { // Elapse enough time to bypass the fallback threshold mock_time_service.advance(Duration::from_millis( - observer_fallback_sync_threshold_ms + 1, + observer_fallback_progress_threshold_ms + 1, )); // Verify that the DB is still making sync progress (the next DB version is higher) @@ -182,7 +242,7 @@ mod test { // Elapse some amount of time (but not enough to bypass the fallback threshold) mock_time_service.advance(Duration::from_millis( - observer_fallback_sync_threshold_ms - 1, + observer_fallback_progress_threshold_ms - 1, )); // Verify that the DB is still making sync progress (the threshold hasn't been reached) @@ -194,7 +254,7 @@ mod test { // Elapse enough time to bypass the fallback threshold mock_time_service.advance(Duration::from_millis( - observer_fallback_sync_threshold_ms + 1, + observer_fallback_progress_threshold_ms + 1, )); // Verify that the DB is not making sync progress and that fallback mode should be entered @@ -209,13 +269,13 @@ mod test { } #[test] - fn test_check_syncing_progress_startup_period() { + fn test_verify_increasing_sync_versions_startup_period() { // Create a consensus observer config - let observer_fallback_sync_threshold_ms = 10_000; + let observer_fallback_progress_threshold_ms = 10_000; let observer_fallback_startup_period_ms = 90_0000; let consensus_observer_config = ConsensusObserverConfig { observer_fallback_startup_period_ms, - observer_fallback_sync_threshold_ms, + observer_fallback_progress_threshold_ms, ..ConsensusObserverConfig::default() }; @@ -231,6 +291,9 @@ mod test { mock_db_reader .expect_get_latest_ledger_info_version() .returning(move || Ok(second_synced_version)); // Allow multiple calls for the second version + mock_db_reader + .expect_get_block_timestamp() + .returning(move |_| Ok(u64::MAX)); // Return a dummy block timestamp // Create a new fallback manager let time_service = TimeService::mock(); @@ -246,7 +309,7 @@ mod test { for _ in 0..5 { // Elapse enough time to bypass the fallback threshold mock_time_service.advance(Duration::from_millis( - observer_fallback_sync_threshold_ms + 1, + observer_fallback_progress_threshold_ms + 1, )); // Verify that the DB is still making sync progress (we're still in the startup period) @@ -271,7 +334,7 @@ mod test { // Elapse enough time to bypass the fallback threshold mock_time_service.advance(Duration::from_millis( - observer_fallback_sync_threshold_ms + 1, + observer_fallback_progress_threshold_ms + 1, )); // Verify that the DB is still making sync progress (the next DB version is higher) @@ -284,7 +347,7 @@ mod test { // Elapse enough time to bypass the fallback threshold mock_time_service.advance(Duration::from_millis( - observer_fallback_sync_threshold_ms + 1, + observer_fallback_progress_threshold_ms + 1, )); // Verify that the DB is not making sync progress and that fallback mode should be entered @@ -298,6 +361,113 @@ mod test { ); } + #[test] + fn test_verify_sync_lag_health() { + // Create a consensus observer config + let observer_fallback_sync_lag_threshold_ms = 10_000; + let consensus_observer_config = ConsensusObserverConfig { + observer_fallback_startup_period_ms: 0, // Disable the startup period + observer_fallback_progress_threshold_ms: 999_999_999, // Disable the progress check + observer_fallback_sync_lag_threshold_ms, + ..ConsensusObserverConfig::default() + }; + + // Create a mock DB reader with expectations + let time_service = TimeService::mock(); + let latest_block_timestamp = time_service.now_unix_time().as_micros() as u64; + let mut mock_db_reader = MockDatabaseReader::new(); + mock_db_reader + .expect_get_latest_ledger_info_version() + .returning(move || Ok(1)); + mock_db_reader + .expect_get_block_timestamp() + .returning(move |_| Ok(latest_block_timestamp)); + + // Create a new fallback manager + let mut fallback_manager = ObserverFallbackManager::new( + consensus_observer_config, + Arc::new(mock_db_reader), + time_service.clone(), + ); + + // Verify that the DB is making sync progress and that the sync lag is acceptable + assert!(fallback_manager.check_syncing_progress().is_ok()); + + // Elapse some amount of time (but not enough to bypass the sync lag threshold) + let mock_time_service = time_service.into_mock(); + mock_time_service.advance(Duration::from_millis( + observer_fallback_sync_lag_threshold_ms - 1, + )); + + // Verify that the DB is making sync progress and that the sync lag is acceptable + assert!(fallback_manager.check_syncing_progress().is_ok()); + + // Elapse enough time to bypass the sync lag threshold + mock_time_service.advance(Duration::from_millis( + observer_fallback_sync_lag_threshold_ms + 1, + )); + + // Verify that the sync lag is too high and that fallback mode should be entered + assert_matches!( + fallback_manager.check_syncing_progress(), + Err(Error::ObserverFallingBehind(_)) + ); + } + + #[test] + fn test_verify_sync_lag_health_startup_period() { + // Create a consensus observer config + let observer_fallback_sync_lag_threshold_ms = 10_000; + let observer_fallback_startup_period_ms = 90_0000; + let consensus_observer_config = ConsensusObserverConfig { + observer_fallback_startup_period_ms, + observer_fallback_progress_threshold_ms: 999_999_999, // Disable the progress check + observer_fallback_sync_lag_threshold_ms, + ..ConsensusObserverConfig::default() + }; + + // Create a mock DB reader with expectations + let time_service = TimeService::mock(); + let latest_block_timestamp = time_service.now_unix_time().as_micros() as u64; + let mut mock_db_reader = MockDatabaseReader::new(); + mock_db_reader + .expect_get_latest_ledger_info_version() + .returning(move || Ok(1)); + mock_db_reader + .expect_get_block_timestamp() + .returning(move |_| Ok(latest_block_timestamp)); + + // Create a new fallback manager + let mut fallback_manager = ObserverFallbackManager::new( + consensus_observer_config, + Arc::new(mock_db_reader), + time_service.clone(), + ); + + // Verify that the DB is making sync progress and that we're still in the startup period + let mock_time_service = time_service.into_mock(); + for _ in 0..5 { + // Elapse enough time to bypass the sync lag threshold + mock_time_service.advance(Duration::from_millis( + observer_fallback_sync_lag_threshold_ms + 1, + )); + + // Verify that the DB is still making sync progress (we're still in the startup period) + assert!(fallback_manager.check_syncing_progress().is_ok()); + } + + // Elapse enough time to bypass the startup period + mock_time_service.advance(Duration::from_millis( + observer_fallback_startup_period_ms + 1, + )); + + // Verify that the sync lag is too high and that fallback mode should be entered + assert_matches!( + fallback_manager.check_syncing_progress(), + Err(Error::ObserverFallingBehind(_)) + ); + } + #[test] fn test_reset_syncing_progress() { // Create a new fallback manager