Skip to content

Commit

Permalink
[Consensus Observer] Add startup period before fallback.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Oct 16, 2024
1 parent 85f5e60 commit 1761a2b
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 9 deletions.
3 changes: 3 additions & 0 deletions config/src/config/consensus_observer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ pub struct ConsensusObserverConfig {

/// Duration (in milliseconds) to require state sync to synchronize when in fallback mode
pub observer_fallback_duration_ms: u64,
/// Duration (in milliseconds) we'll wait on startup before considering fallback mode
pub observer_fallback_startup_period_ms: u64,
/// Duration (in milliseconds) we'll wait for syncing progress before entering fallback mode
pub observer_fallback_sync_threshold_ms: u64,
}
Expand All @@ -70,6 +72,7 @@ impl Default for ConsensusObserverConfig {
subscription_peer_change_interval_ms: 60_000, // 1 minute
subscription_refresh_interval_ms: 300_000, // 5 minutes
observer_fallback_duration_ms: 600_000, // 10 minutes
observer_fallback_startup_period_ms: 90_000, // 90 seconds
observer_fallback_sync_threshold_ms: 30_000, // 30 seconds
}
}
Expand Down
121 changes: 112 additions & 9 deletions consensus/src/consensus_observer/observer/fallback_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ pub struct ObserverFallbackManager {
// The highest synced version we've seen from storage, along with the time at which it was seen
highest_synced_version_and_time: (u64, Instant),

// The time at which the fallback manager started running
start_time: Instant,

// The time service (used to check the storage update time)
time_service: TimeService,
}
Expand All @@ -40,15 +43,25 @@ impl ObserverFallbackManager {
consensus_observer_config,
db_reader,
highest_synced_version_and_time: (0, time_now),
start_time: time_now,
time_service,
}
}

/// Verifies that the DB is continuing to sync and commit new data.
/// If not, an error is returned, indicating that we should enter fallback mode.
pub fn check_syncing_progress(&mut self) -> Result<(), Error> {
// Get the current time and synced version from storage
// If we're still within the startup period, we don't need to verify progress
let time_now = self.time_service.now();
let startup_period = Duration::from_millis(
self.consensus_observer_config
.observer_fallback_startup_period_ms,
);
if time_now.duration_since(self.start_time) < startup_period {
return Ok(()); // We're still in the startup period
}

// Otherwise, fetch the synced version from storage
let current_synced_version =
self.db_reader
.get_latest_ledger_info_version()
Expand All @@ -70,18 +83,18 @@ impl ObserverFallbackManager {
.observer_fallback_sync_threshold_ms,
);
if duration_since_highest_seen > fallback_threshold {
return Err(Error::ObserverProgressStopped(format!(
Err(Error::ObserverProgressStopped(format!(
"Consensus observer is not making progress! Highest synced version: {}, elapsed: {:?}",
highest_synced_version, duration_since_highest_seen
)));
)))
} else {
Ok(()) // We haven't passed the fallback threshold yet
}
return Ok(()); // We haven't passed the fallback threshold yet
} else {
// The synced version has increased. Update the highest synced version and time.
self.highest_synced_version_and_time = (current_synced_version, time_now);
Ok(())
}

// Update the highest synced version and time
self.highest_synced_version_and_time = (current_synced_version, time_now);

Ok(())
}

/// Resets the syncing progress to the latest synced ledger info and current time
Expand Down Expand Up @@ -119,7 +132,9 @@ mod test {
fn test_check_syncing_progress() {
// Create a consensus observer config
let observer_fallback_sync_threshold_ms = 10_000;
let observer_fallback_startup_period_ms = 0; // Disable the startup period
let consensus_observer_config = ConsensusObserverConfig {
observer_fallback_startup_period_ms,
observer_fallback_sync_threshold_ms,
..ConsensusObserverConfig::default()
};
Expand Down Expand Up @@ -193,6 +208,94 @@ mod test {
);
}

#[test]
fn test_check_syncing_progress_startup_period() {
// Create a consensus observer config
let observer_fallback_sync_threshold_ms = 10_000;
let observer_fallback_startup_period_ms = 90_0000;
let consensus_observer_config = ConsensusObserverConfig {
observer_fallback_startup_period_ms,
observer_fallback_sync_threshold_ms,
..ConsensusObserverConfig::default()
};

// Create a mock DB reader with expectations
let initial_version = 0;
let first_synced_version = 1;
let second_synced_version = 2;
let mut mock_db_reader = MockDatabaseReader::new();
mock_db_reader
.expect_get_latest_ledger_info_version()
.returning(move || Ok(first_synced_version))
.times(1); // Only allow one call for the first version
mock_db_reader
.expect_get_latest_ledger_info_version()
.returning(move || Ok(second_synced_version)); // Allow multiple calls for the second version

// Create a new fallback manager
let time_service = TimeService::mock();
let mut fallback_manager = ObserverFallbackManager::new(
consensus_observer_config,
Arc::new(mock_db_reader),
time_service.clone(),
);

// Verify that syncing progress is not checked during the startup period
let mock_time_service = time_service.into_mock();
let time_now = mock_time_service.now();
for _ in 0..5 {
// Elapse enough time to bypass the fallback threshold
mock_time_service.advance(Duration::from_millis(
observer_fallback_sync_threshold_ms + 1,
));

// Verify that the DB is still making sync progress (we're still in the startup period)
assert!(fallback_manager.check_syncing_progress().is_ok());
assert_eq!(
fallback_manager.highest_synced_version_and_time,
(initial_version, time_now)
);
}

// Elapse enough time to bypass the startup period
mock_time_service.advance(Duration::from_millis(observer_fallback_startup_period_ms + 1));

// Verify that the DB is making sync progress and that the highest synced version is updated
assert!(fallback_manager.check_syncing_progress().is_ok());
assert_eq!(
fallback_manager.highest_synced_version_and_time,
(first_synced_version, mock_time_service.now())
);

// Elapse enough time to bypass the fallback threshold
mock_time_service.advance(Duration::from_millis(
observer_fallback_sync_threshold_ms + 1,
));

// Verify that the DB is still making sync progress (the next DB version is higher)
let time_now = mock_time_service.now();
assert!(fallback_manager.check_syncing_progress().is_ok());
assert_eq!(
fallback_manager.highest_synced_version_and_time,
(second_synced_version, time_now)
);

// Elapse enough time to bypass the fallback threshold
mock_time_service.advance(Duration::from_millis(
observer_fallback_sync_threshold_ms + 1,
));

// Verify that the DB is not making sync progress and that fallback mode should be entered
assert_matches!(
fallback_manager.check_syncing_progress(),
Err(Error::ObserverProgressStopped(_))
);
assert_eq!(
fallback_manager.highest_synced_version_and_time,
(second_synced_version, time_now)
);
}

#[test]
fn test_reset_syncing_progress() {
// Create a new fallback manager
Expand Down

0 comments on commit 1761a2b

Please sign in to comment.