Skip to content

Commit

Permalink
[State Sync] Handle sync_for_duration() requests.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Oct 16, 2024
1 parent dc25f42 commit 16e31e6
Show file tree
Hide file tree
Showing 7 changed files with 562 additions and 139 deletions.
29 changes: 13 additions & 16 deletions state-sync/inter-component/consensus-notifications/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl ConsensusNotificationListener {
}

/// Respond to the commit notification
pub async fn respond_to_commit_notification(
pub fn respond_to_commit_notification(
&self,
consensus_commit_notification: ConsensusCommitNotification,
result: Result<(), Error>,
Expand All @@ -226,7 +226,7 @@ impl ConsensusNotificationListener {
}

/// Respond to the sync duration notification
pub async fn respond_to_sync_duration_notification(
pub fn respond_to_sync_duration_notification(
&self,
sync_duration_notification: ConsensusSyncDurationNotification,
result: Result<(), Error>,
Expand All @@ -235,7 +235,7 @@ impl ConsensusNotificationListener {
}

/// Respond to the sync target notification
pub async fn respond_to_sync_target_notification(
pub fn respond_to_sync_target_notification(
&self,
sync_target_notification: ConsensusSyncTargetNotification,
result: Result<(), Error>,
Expand Down Expand Up @@ -495,27 +495,24 @@ mod tests {
let _handler = std::thread::spawn(move || loop {
match consensus_listener.select_next_some().now_or_never() {
Some(ConsensusNotification::NotifyCommit(commit_notification)) => {
let _result = block_on(
consensus_listener
.respond_to_commit_notification(commit_notification, Ok(())),
);
let _result = consensus_listener
.respond_to_commit_notification(commit_notification, Ok(()));
},
Some(ConsensusNotification::SyncToTarget(sync_notification)) => {
let _result = block_on(consensus_listener.respond_to_sync_target_notification(
let _result = consensus_listener.respond_to_sync_target_notification(
sync_notification,
Err(Error::UnexpectedErrorEncountered(
"Oops! Sync to target failed!".into(),
)),
));
);
},
Some(ConsensusNotification::SyncForDuration(sync_notification)) => {
let _result =
block_on(consensus_listener.respond_to_sync_duration_notification(
sync_notification,
Err(Error::UnexpectedErrorEncountered(
"Oops! Sync for duration failed!".into(),
)),
));
let _result = consensus_listener.respond_to_sync_duration_notification(
sync_notification,
Err(Error::UnexpectedErrorEncountered(
"Oops! Sync for duration failed!".into(),
)),
);
},
_ => { /* Do nothing */ },
}
Expand Down
4 changes: 2 additions & 2 deletions state-sync/state-sync-driver/src/continuous_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl<
let sync_request_target = consensus_sync_request
.lock()
.as_ref()
.map(|sync_request| sync_request.get_sync_target());
.and_then(|sync_request| sync_request.get_sync_target());

// Initialize a new active data stream
let active_data_stream = match self.get_continuous_syncing_mode() {
Expand Down Expand Up @@ -432,7 +432,7 @@ impl<
let sync_request_target = consensus_sync_request
.lock()
.as_ref()
.map(|sync_request| sync_request.get_sync_target());
.and_then(|sync_request| sync_request.get_sync_target());
if let Some(sync_request_target) = sync_request_target {
let sync_request_version = sync_request_target.ledger_info().version();
let proof_version = ledger_info_with_signatures.ledger_info().version();
Expand Down
116 changes: 76 additions & 40 deletions state-sync/state-sync-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use crate::{
};
use aptos_config::config::{ConsensusObserverConfig, RoleType, StateSyncDriverConfig};
use aptos_consensus_notifications::{
ConsensusCommitNotification, ConsensusNotification, ConsensusSyncTargetNotification,
ConsensusCommitNotification, ConsensusNotification, ConsensusSyncDurationNotification,
ConsensusSyncTargetNotification,
};
use aptos_data_client::interface::AptosDataClientInterface;
use aptos_data_streaming_service::streaming_client::{
Expand Down Expand Up @@ -264,19 +265,20 @@ impl<
ConsensusNotification::NotifyCommit(commit_notification) => {
let _ = self
.consensus_notification_handler
.respond_to_commit_notification(commit_notification, Err(error.clone()))
.await;
.respond_to_commit_notification(commit_notification, Err(error.clone()));
},
ConsensusNotification::SyncToTarget(sync_notification) => {
let _ = self
.consensus_notification_handler
.respond_to_sync_target_notification(sync_notification, Err(error.clone()))
.await;
.respond_to_sync_target_notification(sync_notification, Err(error.clone()));
},
ConsensusNotification::SyncForDuration(_) => {
warn!(LogSchema::new(LogEntry::ConsensusNotification)
.error(&error)
.message("Received an invalid sync for duration notification!"));
ConsensusNotification::SyncForDuration(sync_notification) => {
let _ = self
.consensus_notification_handler
.respond_to_sync_duration_notification(
sync_notification,
Err(error.clone()),
);
},
}
warn!(LogSchema::new(LogEntry::ConsensusNotification)
Expand All @@ -296,10 +298,8 @@ impl<
.await
},
ConsensusNotification::SyncForDuration(sync_notification) => {
Err(Error::UnexpectedError(format!(
"Received an unexpected sync for duration notification: {:?}",
sync_notification
)))
self.handle_consensus_sync_duration_notification(sync_notification)
.await
},
};

Expand Down Expand Up @@ -341,8 +341,7 @@ impl<

// Respond successfully
self.consensus_notification_handler
.respond_to_commit_notification(commit_notification, Ok(()))
.await?;
.respond_to_commit_notification(commit_notification, Ok(()))?;

// Check the progress of any sync requests. We need this here because
// consensus might issue a sync request and then commit (asynchronously).
Expand Down Expand Up @@ -384,11 +383,36 @@ impl<
}
}

/// Handles a consensus or consensus observer request to sync for a specified duration
async fn handle_consensus_sync_duration_notification(
&mut self,
sync_duration_notification: ConsensusSyncDurationNotification,
) -> Result<(), Error> {
// Update the sync duration notification metrics
let latest_synced_version = utils::fetch_pre_committed_version(self.storage.clone())?;
info!(
LogSchema::new(LogEntry::ConsensusNotification).message(&format!(
"Received a consensus sync duration notification! Duration: {:?}. Latest synced version: {:?}",
sync_duration_notification.get_duration(), latest_synced_version,
))
);
metrics::increment_counter(
&metrics::DRIVER_COUNTERS,
metrics::DRIVER_CONSENSUS_SYNC_DURATION_NOTIFICATION,
);

// Initialize a new sync request
self.consensus_notification_handler
.initialize_sync_duration_request(sync_duration_notification)
.await
}

/// Handles a consensus or consensus observer request to sync to a specified target
async fn handle_consensus_sync_target_notification(
&mut self,
sync_target_notification: ConsensusSyncTargetNotification,
) -> Result<(), Error> {
// Update the sync target notification metrics
let latest_synced_version = utils::fetch_pre_committed_version(self.storage.clone())?;
info!(
LogSchema::new(LogEntry::ConsensusNotification).message(&format!(
Expand All @@ -398,7 +422,7 @@ impl<
);
metrics::increment_counter(
&metrics::DRIVER_COUNTERS,
metrics::DRIVER_CONSENSUS_SYNC_NOTIFICATION,
metrics::DRIVER_CONSENSUS_SYNC_TARGET_NOTIFICATION,
);

// Initialize a new sync request
Expand Down Expand Up @@ -500,31 +524,27 @@ impl<
};
}

/// Checks if the node has successfully reached the sync target
/// Checks if the node has successfully reached the sync target or duration
async fn check_sync_request_progress(&mut self) -> Result<(), Error> {
if !self.active_sync_request() {
return Ok(()); // There's no pending sync request
}

// There's a sync request. Fetch it and check if we're still behind the target.
let sync_request = self.consensus_notification_handler.get_sync_request();
let sync_target_version = sync_request
.lock()
.as_ref()
.ok_or_else(|| {
Error::UnexpectedError(
"We've already verified there is an active sync request!".into(),
)
})?
.get_sync_target_version();
let latest_synced_ledger_info =
utils::fetch_latest_synced_ledger_info(self.storage.clone())?;
if latest_synced_ledger_info.ledger_info().version() < sync_target_version {
return Ok(());
// Check if the sync request has been satisfied
let consensus_sync_request = self.consensus_notification_handler.get_sync_request();
match consensus_sync_request.lock().as_ref() {
Some(consensus_sync_request) => {
let latest_synced_ledger_info =
utils::fetch_latest_synced_ledger_info(self.storage.clone())?;
if !consensus_sync_request
.sync_request_satisfied(&latest_synced_ledger_info, self.time_service.clone())
{
return Ok(()); // The sync request hasn't been satisfied yet
}
},
None => {
return Ok(()); // There's no active sync request
},
}

// Wait for the storage synchronizer to drain (if it hasn't already).
// This prevents notifying consensus prematurely.
// The sync request has been satisfied. Wait for the storage synchronizer
// to drain. This prevents notifying consensus prematurely.
while self.storage_synchronizer.pending_storage_data() {
sample!(
SampleRate::Duration(Duration::from_secs(PENDING_DATA_LOG_FREQ_SECS)),
Expand All @@ -535,11 +555,26 @@ impl<
yield_now().await;
}

// Refresh the latest synced ledger info and handle the sync request
// If the request was to sync for a specified duration, we should only
// stop syncing when the synced version and synced ledger info match.
// Otherwise, the DB will be left in an inconsistent state on handover.
if let Some(sync_request) = consensus_sync_request.lock().as_ref() {
if sync_request.is_sync_duration_request() {
let latest_synced_version =
utils::fetch_pre_committed_version(self.storage.clone())?;
let latest_synced_ledger_info =
utils::fetch_latest_synced_ledger_info(self.storage.clone())?;
if latest_synced_version != latest_synced_ledger_info.ledger_info().version() {
return Ok(()); // State sync should continue to run
}
}
}

// Handle the satisfied sync request
let latest_synced_ledger_info =
utils::fetch_latest_synced_ledger_info(self.storage.clone())?;
self.consensus_notification_handler
.check_sync_request_progress(latest_synced_ledger_info)
.handle_satisfied_sync_request(latest_synced_ledger_info)
.await?;

// If the sync request was successfully handled, reset the continuous syncer
Expand All @@ -548,6 +583,7 @@ impl<
self.continuous_syncer.reset_active_stream(None).await?;
self.storage_synchronizer.finish_chunk_executor(); // Consensus or consensus observer is now in control
}

Ok(())
}

Expand Down
3 changes: 2 additions & 1 deletion state-sync/state-sync-driver/src/driver_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ impl DriverFactory {
ClientNotificationListener::new(client_notification_receiver);
let (commit_notification_sender, commit_notification_listener) =
CommitNotificationListener::new();
let consensus_notification_handler = ConsensusNotificationHandler::new(consensus_listener);
let consensus_notification_handler =
ConsensusNotificationHandler::new(consensus_listener, time_service.clone());
let (error_notification_sender, error_notification_listener) =
ErrorNotificationListener::new();
let mempool_notification_handler =
Expand Down
5 changes: 4 additions & 1 deletion state-sync/state-sync-driver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use std::time::Instant;
/// Driver metric labels
pub const DRIVER_CLIENT_NOTIFICATION: &str = "driver_client_notification";
pub const DRIVER_CONSENSUS_COMMIT_NOTIFICATION: &str = "driver_consensus_commit_notification";
pub const DRIVER_CONSENSUS_SYNC_NOTIFICATION: &str = "driver_consensus_sync_notification";
pub const DRIVER_CONSENSUS_SYNC_DURATION_NOTIFICATION: &str =
"driver_consensus_sync_duration_notification";
pub const DRIVER_CONSENSUS_SYNC_TARGET_NOTIFICATION: &str =
"driver_consensus_sync_target_notification";

/// Data notification metric labels
pub const NOTIFICATION_CREATE_TO_APPLY: &str = "notification_create_to_apply";
Expand Down
Loading

0 comments on commit 16e31e6

Please sign in to comment.