Skip to content

Commit

Permalink
[Consensus Observer] Add sync lag based fallback.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Nov 20, 2024
1 parent 47f0bf3 commit e524bde
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 24 deletions.
7 changes: 5 additions & 2 deletions config/src/config/consensus_observer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ pub struct ConsensusObserverConfig {
/// Duration (in milliseconds) we'll wait on startup before considering fallback mode
pub observer_fallback_startup_period_ms: u64,
/// Duration (in milliseconds) we'll wait for syncing progress before entering fallback mode
pub observer_fallback_sync_threshold_ms: u64,
pub observer_fallback_progress_threshold_ms: u64,
/// Duration (in milliseconds) of acceptable sync lag before entering fallback mode
pub observer_fallback_sync_lag_threshold_ms: u64,
}

impl Default for ConsensusObserverConfig {
Expand All @@ -76,7 +78,8 @@ impl Default for ConsensusObserverConfig {
subscription_refresh_interval_ms: 600_000, // 10 minutes
observer_fallback_duration_ms: 600_000, // 10 minutes
observer_fallback_startup_period_ms: 60_000, // 60 seconds
observer_fallback_sync_threshold_ms: 30_000, // 30 seconds
observer_fallback_progress_threshold_ms: 10_000, // 10 seconds
observer_fallback_sync_lag_threshold_ms: 15_000, // 15 seconds
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions consensus/src/consensus_observer/common/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub enum Error {
#[error("Network error: {0}")]
NetworkError(String),

#[error("Consensus observer falling behind: {0}")]
ObserverFallingBehind(String),

#[error("Consensus observer progress stopped: {0}")]
ObserverProgressStopped(String),

Expand Down Expand Up @@ -43,6 +46,7 @@ impl Error {
match self {
Self::InvalidMessageError(_) => "invalid_message_error",
Self::NetworkError(_) => "network_error",
Self::ObserverFallingBehind(_) => "observer_falling_behind",
Self::ObserverProgressStopped(_) => "observer_progress_stopped",
Self::RpcError(_) => "rpc_error",
Self::SubscriptionDisconnected(_) => "subscription_disconnected",
Expand Down
214 changes: 192 additions & 22 deletions consensus/src/consensus_observer/observer/fallback_manager.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::consensus_observer::common::error::Error;
use crate::consensus_observer::common::{
error::Error,
logging::{LogEntry, LogSchema},
};
use aptos_config::config::ConsensusObserverConfig;
use aptos_logger::warn;
use aptos_storage_interface::DbReader;
use aptos_time_service::{TimeService, TimeServiceTrait};
use aptos_types::ledger_info::LedgerInfoWithSignatures;
use aptos_types::{ledger_info::LedgerInfoWithSignatures, transaction::Version};
use std::{
sync::Arc,
time::{Duration, Instant},
Expand Down Expand Up @@ -48,7 +52,8 @@ impl ObserverFallbackManager {
}
}

/// Verifies that the DB is continuing to sync and commit new data.
/// Verifies that the DB is continuing to sync and commit new data, and that
/// the node has not fallen too far behind the rest of the network.
/// If not, an error is returned, indicating that we should enter fallback mode.
pub fn check_syncing_progress(&mut self) -> Result<(), Error> {
// If we're still within the startup period, we don't need to verify progress
Expand All @@ -61,8 +66,8 @@ impl ObserverFallbackManager {
return Ok(()); // We're still in the startup period
}

// Otherwise, fetch the synced version from storage
let current_synced_version =
// Fetch the synced ledger info version from storage
let latest_ledger_info_version =
self.db_reader
.get_latest_ledger_info_version()
.map_err(|error| {
Expand All @@ -72,15 +77,29 @@ impl ObserverFallbackManager {
))
})?;

// Verify that the synced version is increasing appropriately
self.verify_increasing_sync_versions(latest_ledger_info_version, time_now)?;

// Verify that the sync lag is within acceptable limits
self.verify_sync_lag_health(latest_ledger_info_version)
}

/// Verifies that the synced version is increasing appropriately. If not
/// (i.e., too much time has passed without an increase), an error is returned.
fn verify_increasing_sync_versions(
&mut self,
latest_ledger_info_version: Version,
time_now: Instant,
) -> Result<(), Error> {
// Verify that the synced version is increasing appropriately
let (highest_synced_version, highest_version_timestamp) =
self.highest_synced_version_and_time;
if current_synced_version <= highest_synced_version {
if latest_ledger_info_version <= highest_synced_version {
// The synced version hasn't increased. Check if we should enter fallback mode.
let duration_since_highest_seen = time_now.duration_since(highest_version_timestamp);
let fallback_threshold = Duration::from_millis(
self.consensus_observer_config
.observer_fallback_sync_threshold_ms,
.observer_fallback_progress_threshold_ms,
);
if duration_since_highest_seen > fallback_threshold {
Err(Error::ObserverProgressStopped(format!(
Expand All @@ -92,11 +111,48 @@ impl ObserverFallbackManager {
}
} else {
// The synced version has increased. Update the highest synced version and time.
self.highest_synced_version_and_time = (current_synced_version, time_now);
self.highest_synced_version_and_time = (latest_ledger_info_version, time_now);
Ok(())
}
}

/// Verifies that the sync lag is within acceptable limits. If not, an error is returned.
fn verify_sync_lag_health(&self, latest_ledger_info_version: Version) -> Result<(), Error> {
// Get the latest block timestamp from storage
let latest_block_timestamp_usecs = match self
.db_reader
.get_block_timestamp(latest_ledger_info_version)
{
Ok(block_timestamp_usecs) => block_timestamp_usecs,
Err(error) => {
// Log a warning and return without entering fallback mode
warn!(LogSchema::new(LogEntry::ConsensusObserver)
.message(&format!("Failed to read block timestamp: {:?}", error)));
return Ok(());
},
};

// Get the current time (in microseconds)
let timestamp_now_usecs = self.time_service.now_unix_time().as_micros() as u64;

// Calculate the block timestamp lag (saturating at 0)
let timestamp_lag_usecs = timestamp_now_usecs.saturating_sub(latest_block_timestamp_usecs);
let timestamp_lag_duration = Duration::from_micros(timestamp_lag_usecs);

// Check if the sync lag is within acceptable limits
let sync_lag_threshold_ms = self
.consensus_observer_config
.observer_fallback_sync_lag_threshold_ms;
if timestamp_lag_duration > Duration::from_millis(sync_lag_threshold_ms) {
return Err(Error::ObserverFallingBehind(format!(
"Consensus observer is falling behind! Highest synced version: {}, sync lag: {:?}",
latest_ledger_info_version, timestamp_lag_duration
)));
}

Ok(())
}

/// Resets the syncing progress to the latest synced ledger info and current time
pub fn reset_syncing_progress(&mut self, latest_synced_ledger_info: &LedgerInfoWithSignatures) {
// Get the current time and highest synced version
Expand Down Expand Up @@ -124,18 +180,19 @@ mod test {
mock! {
pub DatabaseReader {}
impl DbReader for DatabaseReader {
fn get_block_timestamp(&self, version: Version) -> Result<u64>;

fn get_latest_ledger_info_version(&self) -> Result<Version>;
}
}

#[test]
fn test_check_syncing_progress() {
fn test_verify_increasing_sync_versions() {
// Create a consensus observer config
let observer_fallback_sync_threshold_ms = 10_000;
let observer_fallback_startup_period_ms = 0; // Disable the startup period
let observer_fallback_progress_threshold_ms = 10_000;
let consensus_observer_config = ConsensusObserverConfig {
observer_fallback_startup_period_ms,
observer_fallback_sync_threshold_ms,
observer_fallback_startup_period_ms: 0, // Disable the startup period
observer_fallback_progress_threshold_ms,
..ConsensusObserverConfig::default()
};

Expand All @@ -150,6 +207,9 @@ mod test {
mock_db_reader
.expect_get_latest_ledger_info_version()
.returning(move || Ok(second_synced_version)); // Allow multiple calls for the second version
mock_db_reader
.expect_get_block_timestamp()
.returning(move |_| Ok(u64::MAX)); // Return a dummy block timestamp

// Create a new fallback manager
let time_service = TimeService::mock();
Expand All @@ -169,7 +229,7 @@ mod test {

// Elapse enough time to bypass the fallback threshold
mock_time_service.advance(Duration::from_millis(
observer_fallback_sync_threshold_ms + 1,
observer_fallback_progress_threshold_ms + 1,
));

// Verify that the DB is still making sync progress (the next DB version is higher)
Expand All @@ -182,7 +242,7 @@ mod test {

// Elapse some amount of time (but not enough to bypass the fallback threshold)
mock_time_service.advance(Duration::from_millis(
observer_fallback_sync_threshold_ms - 1,
observer_fallback_progress_threshold_ms - 1,
));

// Verify that the DB is still making sync progress (the threshold hasn't been reached)
Expand All @@ -194,7 +254,7 @@ mod test {

// Elapse enough time to bypass the fallback threshold
mock_time_service.advance(Duration::from_millis(
observer_fallback_sync_threshold_ms + 1,
observer_fallback_progress_threshold_ms + 1,
));

// Verify that the DB is not making sync progress and that fallback mode should be entered
Expand All @@ -209,13 +269,13 @@ mod test {
}

#[test]
fn test_check_syncing_progress_startup_period() {
fn test_verify_increasing_sync_versions_startup_period() {
// Create a consensus observer config
let observer_fallback_sync_threshold_ms = 10_000;
let observer_fallback_progress_threshold_ms = 10_000;
let observer_fallback_startup_period_ms = 90_0000;
let consensus_observer_config = ConsensusObserverConfig {
observer_fallback_startup_period_ms,
observer_fallback_sync_threshold_ms,
observer_fallback_progress_threshold_ms,
..ConsensusObserverConfig::default()
};

Expand All @@ -231,6 +291,9 @@ mod test {
mock_db_reader
.expect_get_latest_ledger_info_version()
.returning(move || Ok(second_synced_version)); // Allow multiple calls for the second version
mock_db_reader
.expect_get_block_timestamp()
.returning(move |_| Ok(u64::MAX)); // Return a dummy block timestamp

// Create a new fallback manager
let time_service = TimeService::mock();
Expand All @@ -246,7 +309,7 @@ mod test {
for _ in 0..5 {
// Elapse enough time to bypass the fallback threshold
mock_time_service.advance(Duration::from_millis(
observer_fallback_sync_threshold_ms + 1,
observer_fallback_progress_threshold_ms + 1,
));

// Verify that the DB is still making sync progress (we're still in the startup period)
Expand All @@ -271,7 +334,7 @@ mod test {

// Elapse enough time to bypass the fallback threshold
mock_time_service.advance(Duration::from_millis(
observer_fallback_sync_threshold_ms + 1,
observer_fallback_progress_threshold_ms + 1,
));

// Verify that the DB is still making sync progress (the next DB version is higher)
Expand All @@ -284,7 +347,7 @@ mod test {

// Elapse enough time to bypass the fallback threshold
mock_time_service.advance(Duration::from_millis(
observer_fallback_sync_threshold_ms + 1,
observer_fallback_progress_threshold_ms + 1,
));

// Verify that the DB is not making sync progress and that fallback mode should be entered
Expand All @@ -298,6 +361,113 @@ mod test {
);
}

#[test]
fn test_verify_sync_lag_health() {
// Create a consensus observer config
let observer_fallback_sync_lag_threshold_ms = 10_000;
let consensus_observer_config = ConsensusObserverConfig {
observer_fallback_startup_period_ms: 0, // Disable the startup period
observer_fallback_progress_threshold_ms: 999_999_999, // Disable the progress check
observer_fallback_sync_lag_threshold_ms,
..ConsensusObserverConfig::default()
};

// Create a mock DB reader with expectations
let time_service = TimeService::mock();
let latest_block_timestamp = time_service.now_unix_time().as_micros() as u64;
let mut mock_db_reader = MockDatabaseReader::new();
mock_db_reader
.expect_get_latest_ledger_info_version()
.returning(move || Ok(1));
mock_db_reader
.expect_get_block_timestamp()
.returning(move |_| Ok(latest_block_timestamp));

// Create a new fallback manager
let mut fallback_manager = ObserverFallbackManager::new(
consensus_observer_config,
Arc::new(mock_db_reader),
time_service.clone(),
);

// Verify that the DB is making sync progress and that the sync lag is acceptable
assert!(fallback_manager.check_syncing_progress().is_ok());

// Elapse some amount of time (but not enough to bypass the sync lag threshold)
let mock_time_service = time_service.into_mock();
mock_time_service.advance(Duration::from_millis(
observer_fallback_sync_lag_threshold_ms - 1,
));

// Verify that the DB is making sync progress and that the sync lag is acceptable
assert!(fallback_manager.check_syncing_progress().is_ok());

// Elapse enough time to bypass the sync lag threshold
mock_time_service.advance(Duration::from_millis(
observer_fallback_sync_lag_threshold_ms + 1,
));

// Verify that the sync lag is too high and that fallback mode should be entered
assert_matches!(
fallback_manager.check_syncing_progress(),
Err(Error::ObserverFallingBehind(_))
);
}

#[test]
fn test_verify_sync_lag_health_startup_period() {
// Create a consensus observer config
let observer_fallback_sync_lag_threshold_ms = 10_000;
let observer_fallback_startup_period_ms = 90_0000;
let consensus_observer_config = ConsensusObserverConfig {
observer_fallback_startup_period_ms,
observer_fallback_progress_threshold_ms: 999_999_999, // Disable the progress check
observer_fallback_sync_lag_threshold_ms,
..ConsensusObserverConfig::default()
};

// Create a mock DB reader with expectations
let time_service = TimeService::mock();
let latest_block_timestamp = time_service.now_unix_time().as_micros() as u64;
let mut mock_db_reader = MockDatabaseReader::new();
mock_db_reader
.expect_get_latest_ledger_info_version()
.returning(move || Ok(1));
mock_db_reader
.expect_get_block_timestamp()
.returning(move |_| Ok(latest_block_timestamp));

// Create a new fallback manager
let mut fallback_manager = ObserverFallbackManager::new(
consensus_observer_config,
Arc::new(mock_db_reader),
time_service.clone(),
);

// Verify that the DB is making sync progress and that we're still in the startup period
let mock_time_service = time_service.into_mock();
for _ in 0..5 {
// Elapse enough time to bypass the sync lag threshold
mock_time_service.advance(Duration::from_millis(
observer_fallback_sync_lag_threshold_ms + 1,
));

// Verify that the DB is still making sync progress (we're still in the startup period)
assert!(fallback_manager.check_syncing_progress().is_ok());
}

// Elapse enough time to bypass the startup period
mock_time_service.advance(Duration::from_millis(
observer_fallback_startup_period_ms + 1,
));

// Verify that the sync lag is too high and that fallback mode should be entered
assert_matches!(
fallback_manager.check_syncing_progress(),
Err(Error::ObserverFallingBehind(_))
);
}

#[test]
fn test_reset_syncing_progress() {
// Create a new fallback manager
Expand Down

0 comments on commit e524bde

Please sign in to comment.