From daa4616cd0b551edc01c47866842447f6890bcde Mon Sep 17 00:00:00 2001 From: Josh Lind Date: Wed, 16 Oct 2024 13:03:16 -0400 Subject: [PATCH] [Consensus Observer] Add startup period before fallback. --- .../src/config/consensus_observer_config.rs | 3 + .../observer/fallback_manager.rs | 123 ++++++++++++++++-- 2 files changed, 117 insertions(+), 9 deletions(-) diff --git a/config/src/config/consensus_observer_config.rs b/config/src/config/consensus_observer_config.rs index e55f82bf052d5..bddc66843d614 100644 --- a/config/src/config/consensus_observer_config.rs +++ b/config/src/config/consensus_observer_config.rs @@ -49,6 +49,8 @@ pub struct ConsensusObserverConfig { /// Duration (in milliseconds) to require state sync to synchronize when in fallback mode pub observer_fallback_duration_ms: u64, + /// Duration (in milliseconds) we'll wait on startup before considering fallback mode + pub observer_fallback_startup_period_ms: u64, /// Duration (in milliseconds) we'll wait for syncing progress before entering fallback mode pub observer_fallback_sync_threshold_ms: u64, } @@ -70,6 +72,7 @@ impl Default for ConsensusObserverConfig { subscription_peer_change_interval_ms: 60_000, // 1 minute subscription_refresh_interval_ms: 300_000, // 5 minutes observer_fallback_duration_ms: 600_000, // 10 minutes + observer_fallback_startup_period_ms: 60_000, // 60 seconds observer_fallback_sync_threshold_ms: 30_000, // 30 seconds } } diff --git a/consensus/src/consensus_observer/observer/fallback_manager.rs b/consensus/src/consensus_observer/observer/fallback_manager.rs index 2b7bee64b424e..dcdab0dd42392 100644 --- a/consensus/src/consensus_observer/observer/fallback_manager.rs +++ b/consensus/src/consensus_observer/observer/fallback_manager.rs @@ -22,6 +22,9 @@ pub struct ObserverFallbackManager { // The highest synced version we've seen from storage, along with the time at which it was seen highest_synced_version_and_time: (u64, Instant), + // The time at which the fallback manager started running + start_time: Instant, + // The time service (used to check the storage update time) time_service: TimeService, } @@ -40,6 +43,7 @@ impl ObserverFallbackManager { consensus_observer_config, db_reader, highest_synced_version_and_time: (0, time_now), + start_time: time_now, time_service, } } @@ -47,8 +51,17 @@ impl ObserverFallbackManager { /// Verifies that the DB is continuing to sync and commit new data. /// If not, an error is returned, indicating that we should enter fallback mode. pub fn check_syncing_progress(&mut self) -> Result<(), Error> { - // Get the current time and synced version from storage + // If we're still within the startup period, we don't need to verify progress let time_now = self.time_service.now(); + let startup_period = Duration::from_millis( + self.consensus_observer_config + .observer_fallback_startup_period_ms, + ); + if time_now.duration_since(self.start_time) < startup_period { + return Ok(()); // We're still in the startup period + } + + // Otherwise, fetch the synced version from storage let current_synced_version = self.db_reader .get_latest_ledger_info_version() @@ -70,18 +83,18 @@ impl ObserverFallbackManager { .observer_fallback_sync_threshold_ms, ); if duration_since_highest_seen > fallback_threshold { - return Err(Error::ObserverProgressStopped(format!( + Err(Error::ObserverProgressStopped(format!( "Consensus observer is not making progress! Highest synced version: {}, elapsed: {:?}", highest_synced_version, duration_since_highest_seen - ))); + ))) + } else { + Ok(()) // We haven't passed the fallback threshold yet } - return Ok(()); // We haven't passed the fallback threshold yet + } else { + // The synced version has increased. Update the highest synced version and time. + self.highest_synced_version_and_time = (current_synced_version, time_now); + Ok(()) } - - // Update the highest synced version and time - self.highest_synced_version_and_time = (current_synced_version, time_now); - - Ok(()) } /// Resets the syncing progress to the latest synced ledger info and current time @@ -119,7 +132,9 @@ mod test { fn test_check_syncing_progress() { // Create a consensus observer config let observer_fallback_sync_threshold_ms = 10_000; + let observer_fallback_startup_period_ms = 0; // Disable the startup period let consensus_observer_config = ConsensusObserverConfig { + observer_fallback_startup_period_ms, observer_fallback_sync_threshold_ms, ..ConsensusObserverConfig::default() }; @@ -193,6 +208,96 @@ mod test { ); } + #[test] + fn test_check_syncing_progress_startup_period() { + // Create a consensus observer config + let observer_fallback_sync_threshold_ms = 10_000; + let observer_fallback_startup_period_ms = 90_0000; + let consensus_observer_config = ConsensusObserverConfig { + observer_fallback_startup_period_ms, + observer_fallback_sync_threshold_ms, + ..ConsensusObserverConfig::default() + }; + + // Create a mock DB reader with expectations + let initial_version = 0; + let first_synced_version = 1; + let second_synced_version = 2; + let mut mock_db_reader = MockDatabaseReader::new(); + mock_db_reader + .expect_get_latest_ledger_info_version() + .returning(move || Ok(first_synced_version)) + .times(1); // Only allow one call for the first version + mock_db_reader + .expect_get_latest_ledger_info_version() + .returning(move || Ok(second_synced_version)); // Allow multiple calls for the second version + + // Create a new fallback manager + let time_service = TimeService::mock(); + let mut fallback_manager = ObserverFallbackManager::new( + consensus_observer_config, + Arc::new(mock_db_reader), + time_service.clone(), + ); + + // Verify that syncing progress is not checked during the startup period + let mock_time_service = time_service.into_mock(); + let time_now = mock_time_service.now(); + for _ in 0..5 { + // Elapse enough time to bypass the fallback threshold + mock_time_service.advance(Duration::from_millis( + observer_fallback_sync_threshold_ms + 1, + )); + + // Verify that the DB is still making sync progress (we're still in the startup period) + assert!(fallback_manager.check_syncing_progress().is_ok()); + assert_eq!( + fallback_manager.highest_synced_version_and_time, + (initial_version, time_now) + ); + } + + // Elapse enough time to bypass the startup period + mock_time_service.advance(Duration::from_millis( + observer_fallback_startup_period_ms + 1, + )); + + // Verify that the DB is making sync progress and that the highest synced version is updated + assert!(fallback_manager.check_syncing_progress().is_ok()); + assert_eq!( + fallback_manager.highest_synced_version_and_time, + (first_synced_version, mock_time_service.now()) + ); + + // Elapse enough time to bypass the fallback threshold + mock_time_service.advance(Duration::from_millis( + observer_fallback_sync_threshold_ms + 1, + )); + + // Verify that the DB is still making sync progress (the next DB version is higher) + let time_now = mock_time_service.now(); + assert!(fallback_manager.check_syncing_progress().is_ok()); + assert_eq!( + fallback_manager.highest_synced_version_and_time, + (second_synced_version, time_now) + ); + + // Elapse enough time to bypass the fallback threshold + mock_time_service.advance(Duration::from_millis( + observer_fallback_sync_threshold_ms + 1, + )); + + // Verify that the DB is not making sync progress and that fallback mode should be entered + assert_matches!( + fallback_manager.check_syncing_progress(), + Err(Error::ObserverProgressStopped(_)) + ); + assert_eq!( + fallback_manager.highest_synced_version_and_time, + (second_synced_version, time_now) + ); + } + #[test] fn test_reset_syncing_progress() { // Create a new fallback manager