diff --git a/state-sync/inter-component/consensus-notifications/src/lib.rs b/state-sync/inter-component/consensus-notifications/src/lib.rs index dd1aeb1302a91..2112bbfc66e00 100644 --- a/state-sync/inter-component/consensus-notifications/src/lib.rs +++ b/state-sync/inter-component/consensus-notifications/src/lib.rs @@ -217,7 +217,7 @@ impl ConsensusNotificationListener { } /// Respond to the commit notification - pub async fn respond_to_commit_notification( + pub fn respond_to_commit_notification( &self, consensus_commit_notification: ConsensusCommitNotification, result: Result<(), Error>, @@ -226,7 +226,7 @@ impl ConsensusNotificationListener { } /// Respond to the sync duration notification - pub async fn respond_to_sync_duration_notification( + pub fn respond_to_sync_duration_notification( &self, sync_duration_notification: ConsensusSyncDurationNotification, result: Result<(), Error>, @@ -235,7 +235,7 @@ impl ConsensusNotificationListener { } /// Respond to the sync target notification - pub async fn respond_to_sync_target_notification( + pub fn respond_to_sync_target_notification( &self, sync_target_notification: ConsensusSyncTargetNotification, result: Result<(), Error>, @@ -495,27 +495,24 @@ mod tests { let _handler = std::thread::spawn(move || loop { match consensus_listener.select_next_some().now_or_never() { Some(ConsensusNotification::NotifyCommit(commit_notification)) => { - let _result = block_on( - consensus_listener - .respond_to_commit_notification(commit_notification, Ok(())), - ); + let _result = consensus_listener + .respond_to_commit_notification(commit_notification, Ok(())); }, Some(ConsensusNotification::SyncToTarget(sync_notification)) => { - let _result = block_on(consensus_listener.respond_to_sync_target_notification( + let _result = consensus_listener.respond_to_sync_target_notification( sync_notification, Err(Error::UnexpectedErrorEncountered( "Oops! Sync to target failed!".into(), )), - )); + ); }, Some(ConsensusNotification::SyncForDuration(sync_notification)) => { - let _result = - block_on(consensus_listener.respond_to_sync_duration_notification( - sync_notification, - Err(Error::UnexpectedErrorEncountered( - "Oops! Sync for duration failed!".into(), - )), - )); + let _result = consensus_listener.respond_to_sync_duration_notification( + sync_notification, + Err(Error::UnexpectedErrorEncountered( + "Oops! Sync for duration failed!".into(), + )), + ); }, _ => { /* Do nothing */ }, } diff --git a/state-sync/state-sync-driver/src/continuous_syncer.rs b/state-sync/state-sync-driver/src/continuous_syncer.rs index d8da9a4496d52..84727902d2ae3 100644 --- a/state-sync/state-sync-driver/src/continuous_syncer.rs +++ b/state-sync/state-sync-driver/src/continuous_syncer.rs @@ -116,7 +116,7 @@ impl< let sync_request_target = consensus_sync_request .lock() .as_ref() - .map(|sync_request| sync_request.get_sync_target()); + .and_then(|sync_request| sync_request.get_sync_target()); // Initialize a new active data stream let active_data_stream = match self.get_continuous_syncing_mode() { @@ -432,7 +432,7 @@ impl< let sync_request_target = consensus_sync_request .lock() .as_ref() - .map(|sync_request| sync_request.get_sync_target()); + .and_then(|sync_request| sync_request.get_sync_target()); if let Some(sync_request_target) = sync_request_target { let sync_request_version = sync_request_target.ledger_info().version(); let proof_version = ledger_info_with_signatures.ledger_info().version(); diff --git a/state-sync/state-sync-driver/src/driver.rs b/state-sync/state-sync-driver/src/driver.rs index edec1b8f54b38..cdbd45c08c448 100644 --- a/state-sync/state-sync-driver/src/driver.rs +++ b/state-sync/state-sync-driver/src/driver.rs @@ -22,7 +22,8 @@ use crate::{ }; use aptos_config::config::{ConsensusObserverConfig, RoleType, StateSyncDriverConfig}; use aptos_consensus_notifications::{ - ConsensusCommitNotification, ConsensusNotification, ConsensusSyncTargetNotification, + ConsensusCommitNotification, ConsensusNotification, ConsensusSyncDurationNotification, + ConsensusSyncTargetNotification, }; use aptos_data_client::interface::AptosDataClientInterface; use aptos_data_streaming_service::streaming_client::{ @@ -264,19 +265,20 @@ impl< ConsensusNotification::NotifyCommit(commit_notification) => { let _ = self .consensus_notification_handler - .respond_to_commit_notification(commit_notification, Err(error.clone())) - .await; + .respond_to_commit_notification(commit_notification, Err(error.clone())); }, ConsensusNotification::SyncToTarget(sync_notification) => { let _ = self .consensus_notification_handler - .respond_to_sync_target_notification(sync_notification, Err(error.clone())) - .await; + .respond_to_sync_target_notification(sync_notification, Err(error.clone())); }, - ConsensusNotification::SyncForDuration(_) => { - warn!(LogSchema::new(LogEntry::ConsensusNotification) - .error(&error) - .message("Received an invalid sync for duration notification!")); + ConsensusNotification::SyncForDuration(sync_notification) => { + let _ = self + .consensus_notification_handler + .respond_to_sync_duration_notification( + sync_notification, + Err(error.clone()), + ); }, } warn!(LogSchema::new(LogEntry::ConsensusNotification) @@ -296,10 +298,8 @@ impl< .await }, ConsensusNotification::SyncForDuration(sync_notification) => { - Err(Error::UnexpectedError(format!( - "Received an unexpected sync for duration notification: {:?}", - sync_notification - ))) + self.handle_consensus_sync_duration_notification(sync_notification) + .await }, }; @@ -341,8 +341,7 @@ impl< // Respond successfully self.consensus_notification_handler - .respond_to_commit_notification(commit_notification, Ok(())) - .await?; + .respond_to_commit_notification(commit_notification, Ok(()))?; // Check the progress of any sync requests. We need this here because // consensus might issue a sync request and then commit (asynchronously). @@ -384,11 +383,36 @@ impl< } } + /// Handles a consensus or consensus observer request to sync for a specified duration + async fn handle_consensus_sync_duration_notification( + &mut self, + sync_duration_notification: ConsensusSyncDurationNotification, + ) -> Result<(), Error> { + // Update the sync duration notification metrics + let latest_synced_version = utils::fetch_pre_committed_version(self.storage.clone())?; + info!( + LogSchema::new(LogEntry::ConsensusNotification).message(&format!( + "Received a consensus sync duration notification! Duration: {:?}. Latest synced version: {:?}", + sync_duration_notification.get_duration(), latest_synced_version, + )) + ); + metrics::increment_counter( + &metrics::DRIVER_COUNTERS, + metrics::DRIVER_CONSENSUS_SYNC_DURATION_NOTIFICATION, + ); + + // Initialize a new sync request + self.consensus_notification_handler + .initialize_sync_duration_request(sync_duration_notification) + .await + } + /// Handles a consensus or consensus observer request to sync to a specified target async fn handle_consensus_sync_target_notification( &mut self, sync_target_notification: ConsensusSyncTargetNotification, ) -> Result<(), Error> { + // Update the sync target notification metrics let latest_synced_version = utils::fetch_pre_committed_version(self.storage.clone())?; info!( LogSchema::new(LogEntry::ConsensusNotification).message(&format!( @@ -398,7 +422,7 @@ impl< ); metrics::increment_counter( &metrics::DRIVER_COUNTERS, - metrics::DRIVER_CONSENSUS_SYNC_NOTIFICATION, + metrics::DRIVER_CONSENSUS_SYNC_TARGET_NOTIFICATION, ); // Initialize a new sync request @@ -500,31 +524,27 @@ impl< }; } - /// Checks if the node has successfully reached the sync target + /// Checks if the node has successfully reached the sync target or duration async fn check_sync_request_progress(&mut self) -> Result<(), Error> { - if !self.active_sync_request() { - return Ok(()); // There's no pending sync request - } - - // There's a sync request. Fetch it and check if we're still behind the target. - let sync_request = self.consensus_notification_handler.get_sync_request(); - let sync_target_version = sync_request - .lock() - .as_ref() - .ok_or_else(|| { - Error::UnexpectedError( - "We've already verified there is an active sync request!".into(), - ) - })? - .get_sync_target_version(); - let latest_synced_ledger_info = - utils::fetch_latest_synced_ledger_info(self.storage.clone())?; - if latest_synced_ledger_info.ledger_info().version() < sync_target_version { - return Ok(()); + // Check if the sync request has been satisfied + let consensus_sync_request = self.consensus_notification_handler.get_sync_request(); + match consensus_sync_request.lock().as_ref() { + Some(consensus_sync_request) => { + let latest_synced_ledger_info = + utils::fetch_latest_synced_ledger_info(self.storage.clone())?; + if !consensus_sync_request + .sync_request_satisfied(&latest_synced_ledger_info, self.time_service.clone()) + { + return Ok(()); // The sync request hasn't been satisfied yet + } + }, + None => { + return Ok(()); // There's no active sync request + }, } - // Wait for the storage synchronizer to drain (if it hasn't already). - // This prevents notifying consensus prematurely. + // The sync request has been satisfied. Wait for the storage synchronizer + // to drain. This prevents notifying consensus prematurely. while self.storage_synchronizer.pending_storage_data() { sample!( SampleRate::Duration(Duration::from_secs(PENDING_DATA_LOG_FREQ_SECS)), @@ -535,11 +555,26 @@ impl< yield_now().await; } - // Refresh the latest synced ledger info and handle the sync request + // If the request was to sync for a specified duration, we should only + // stop syncing when the synced version and synced ledger info match. + // Otherwise, the DB will be left in an inconsistent state on handover. + if let Some(sync_request) = consensus_sync_request.lock().as_ref() { + if sync_request.is_sync_duration_request() { + let latest_synced_version = + utils::fetch_pre_committed_version(self.storage.clone())?; + let latest_synced_ledger_info = + utils::fetch_latest_synced_ledger_info(self.storage.clone())?; + if latest_synced_version != latest_synced_ledger_info.ledger_info().version() { + return Ok(()); // State sync should continue to run + } + } + } + + // Handle the satisfied sync request let latest_synced_ledger_info = utils::fetch_latest_synced_ledger_info(self.storage.clone())?; self.consensus_notification_handler - .check_sync_request_progress(latest_synced_ledger_info) + .handle_satisfied_sync_request(latest_synced_ledger_info) .await?; // If the sync request was successfully handled, reset the continuous syncer @@ -548,6 +583,7 @@ impl< self.continuous_syncer.reset_active_stream(None).await?; self.storage_synchronizer.finish_chunk_executor(); // Consensus or consensus observer is now in control } + Ok(()) } diff --git a/state-sync/state-sync-driver/src/driver_factory.rs b/state-sync/state-sync-driver/src/driver_factory.rs index 0e904075aacde..0126ed889cacc 100644 --- a/state-sync/state-sync-driver/src/driver_factory.rs +++ b/state-sync/state-sync-driver/src/driver_factory.rs @@ -124,7 +124,8 @@ impl DriverFactory { ClientNotificationListener::new(client_notification_receiver); let (commit_notification_sender, commit_notification_listener) = CommitNotificationListener::new(); - let consensus_notification_handler = ConsensusNotificationHandler::new(consensus_listener); + let consensus_notification_handler = + ConsensusNotificationHandler::new(consensus_listener, time_service.clone()); let (error_notification_sender, error_notification_listener) = ErrorNotificationListener::new(); let mempool_notification_handler = diff --git a/state-sync/state-sync-driver/src/metrics.rs b/state-sync/state-sync-driver/src/metrics.rs index b87b4f910536b..8d27f2da10cd7 100644 --- a/state-sync/state-sync-driver/src/metrics.rs +++ b/state-sync/state-sync-driver/src/metrics.rs @@ -11,7 +11,10 @@ use std::time::Instant; /// Driver metric labels pub const DRIVER_CLIENT_NOTIFICATION: &str = "driver_client_notification"; pub const DRIVER_CONSENSUS_COMMIT_NOTIFICATION: &str = "driver_consensus_commit_notification"; -pub const DRIVER_CONSENSUS_SYNC_NOTIFICATION: &str = "driver_consensus_sync_notification"; +pub const DRIVER_CONSENSUS_SYNC_DURATION_NOTIFICATION: &str = + "driver_consensus_sync_duration_notification"; +pub const DRIVER_CONSENSUS_SYNC_TARGET_NOTIFICATION: &str = + "driver_consensus_sync_target_notification"; /// Data notification metric labels pub const NOTIFICATION_CREATE_TO_APPLY: &str = "notification_create_to_apply"; diff --git a/state-sync/state-sync-driver/src/notification_handlers.rs b/state-sync/state-sync-driver/src/notification_handlers.rs index 0a93e95c9f07b..4207e6f1be35a 100644 --- a/state-sync/state-sync-driver/src/notification_handlers.rs +++ b/state-sync/state-sync-driver/src/notification_handlers.rs @@ -8,7 +8,7 @@ use crate::{ }; use aptos_consensus_notifications::{ ConsensusCommitNotification, ConsensusNotification, ConsensusNotificationListener, - ConsensusSyncTargetNotification, + ConsensusSyncDurationNotification, ConsensusSyncTargetNotification, }; use aptos_data_streaming_service::data_notification::NotificationId; use aptos_event_notifications::{EventNotificationSender, EventSubscriptionService}; @@ -16,6 +16,7 @@ use aptos_infallible::Mutex; use aptos_logger::prelude::*; use aptos_mempool_notifications::MempoolNotificationSender; use aptos_storage_service_notifications::StorageServiceNotificationSender; +use aptos_time_service::{TimeService, TimeServiceTrait}; use aptos_types::{ contract_event::ContractEvent, ledger_info::LedgerInfoWithSignatures, @@ -27,6 +28,7 @@ use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, + time::Instant, }; /// A notification for new data that has been committed to storage @@ -144,27 +146,66 @@ impl FusedStream for CommitNotificationListener { } } -/// A consensus sync request for a specified target ledger info -pub struct ConsensusSyncRequest { - sync_target_notification: ConsensusSyncTargetNotification, +/// A consensus sync request for a specified target ledger info or duration +pub enum ConsensusSyncRequest { + SyncDuration(Instant, ConsensusSyncDurationNotification), // The start time and duration to sync for + SyncTarget(ConsensusSyncTargetNotification), // The target ledger info to sync to } impl ConsensusSyncRequest { - pub fn new(sync_target_notification: ConsensusSyncTargetNotification) -> Self { - Self { - sync_target_notification, + /// Returns a new sync target request + pub fn new_with_target(sync_target_notification: ConsensusSyncTargetNotification) -> Self { + ConsensusSyncRequest::SyncTarget(sync_target_notification) + } + + /// Returns a new sync duration request + pub fn new_with_duration( + start_time: Instant, + sync_duration_notification: ConsensusSyncDurationNotification, + ) -> Self { + ConsensusSyncRequest::SyncDuration(start_time, sync_duration_notification) + } + + /// Returns the sync target (if one exists) + pub fn get_sync_target(&self) -> Option { + match self { + ConsensusSyncRequest::SyncTarget(sync_target_notification) => { + Some(sync_target_notification.get_target().clone()) + }, + _ => None, } } - pub fn get_sync_target(&self) -> LedgerInfoWithSignatures { - self.sync_target_notification.get_target().clone() + /// Returns true iff the sync request is a duration request + pub fn is_sync_duration_request(&self) -> bool { + matches!(self, ConsensusSyncRequest::SyncDuration(_, _)) } - pub fn get_sync_target_version(&self) -> Version { - self.sync_target_notification - .get_target() - .ledger_info() - .version() + /// Returns true iff the sync request has been satisfied + pub fn sync_request_satisfied( + &self, + latest_synced_ledger_info: &LedgerInfoWithSignatures, + time_service: TimeService, + ) -> bool { + match self { + ConsensusSyncRequest::SyncDuration(start_time, sync_duration_notification) => { + // Get the duration and the current time + let sync_duration = sync_duration_notification.get_duration(); + let current_time = time_service.now(); + + // Check if the duration has been reached + current_time.duration_since(*start_time) >= sync_duration + }, + ConsensusSyncRequest::SyncTarget(sync_target_notification) => { + // Get the sync target version and latest synced version + let sync_target = sync_target_notification.get_target(); + let sync_target_version = sync_target.ledger_info().version(); + let latest_synced_version = latest_synced_ledger_info.ledger_info().version(); + + // Check if we've satisfied the target + latest_synced_version >= sync_target_version + }, + } } } @@ -175,13 +216,20 @@ pub struct ConsensusNotificationHandler { // The latest consensus sync request that has been received consensus_sync_request: Arc>>, + + // The time service + time_service: TimeService, } impl ConsensusNotificationHandler { - pub fn new(consensus_listener: ConsensusNotificationListener) -> Self { + pub fn new( + consensus_listener: ConsensusNotificationListener, + time_service: TimeService, + ) -> Self { Self { consensus_listener, consensus_sync_request: Arc::new(Mutex::new(None)), + time_service, } } @@ -195,7 +243,23 @@ impl ConsensusNotificationHandler { self.consensus_sync_request.clone() } - /// Initializes the sync request received from consensus + /// Initializes the sync duration request received from consensus + pub async fn initialize_sync_duration_request( + &mut self, + sync_duration_notification: ConsensusSyncDurationNotification, + ) -> Result<(), Error> { + // Get the current time + let start_time = self.time_service.now(); + + // Save the request so we can notify consensus once we've hit the duration + let consensus_sync_request = + ConsensusSyncRequest::new_with_duration(start_time, sync_duration_notification); + self.consensus_sync_request = Arc::new(Mutex::new(Some(consensus_sync_request))); + + Ok(()) + } + + /// Initializes the sync target request received from consensus pub async fn initialize_sync_target_request( &mut self, sync_target_notification: ConsensusSyncTargetNotification, @@ -214,8 +278,7 @@ impl ConsensusNotificationHandler { sync_target_version, latest_committed_version, )); - self.respond_to_sync_target_notification(sync_target_notification, error.clone()) - .await?; + self.respond_to_sync_target_notification(sync_target_notification, error.clone())?; return error; } @@ -224,66 +287,65 @@ impl ConsensusNotificationHandler { info!(LogSchema::new(LogEntry::NotificationHandler) .message("We're already at the requested sync target version! Returning early")); let result = Ok(()); - self.respond_to_sync_target_notification(sync_target_notification, result.clone()) - .await?; + self.respond_to_sync_target_notification(sync_target_notification, result.clone())?; return result; } // Save the request so we can notify consensus once we've hit the target - let consensus_sync_request = ConsensusSyncRequest::new(sync_target_notification); + let consensus_sync_request = + ConsensusSyncRequest::new_with_target(sync_target_notification); self.consensus_sync_request = Arc::new(Mutex::new(Some(consensus_sync_request))); Ok(()) } - /// Checks to see if the sync request has been successfully fulfilled - pub async fn check_sync_request_progress( + /// Notifies consensus of a satisfied sync request, and removes the active request. + /// Note: this assumes that the sync request has already been checked for satisfaction. + pub async fn handle_satisfied_sync_request( &mut self, latest_synced_ledger_info: LedgerInfoWithSignatures, ) -> Result<(), Error> { - // Fetch the sync target version - let consensus_sync_request = self.get_sync_request(); - let sync_target_version = consensus_sync_request.lock().as_ref().map(|sync_request| { - sync_request - .sync_target_notification - .get_target() - .ledger_info() - .version() - }); - - // Compare our local state to the target version - if let Some(sync_target_version) = sync_target_version { - let latest_committed_version = latest_synced_ledger_info.ledger_info().version(); - - // Check if we've synced beyond the target - if latest_committed_version > sync_target_version { - return Err(Error::SyncedBeyondTarget( - latest_committed_version, - sync_target_version, - )); - } - - // Check if we've hit the target - if latest_committed_version == sync_target_version { - let consensus_sync_request = self.get_sync_request().lock().take(); - if let Some(consensus_sync_request) = consensus_sync_request { + // Remove the active sync request + let mut sync_request_lock = self.consensus_sync_request.lock(); + let consensus_sync_request = sync_request_lock.take(); + + // Notify consensus of the satisfied request + match consensus_sync_request { + Some(ConsensusSyncRequest::SyncDuration(_, sync_duration_notification)) => { + self.respond_to_sync_duration_notification(sync_duration_notification, Ok(()))?; + }, + Some(ConsensusSyncRequest::SyncTarget(sync_target_notification)) => { + // Get the sync target version and latest synced version + let sync_target = sync_target_notification.get_target(); + let sync_target_version = sync_target.ledger_info().version(); + let latest_synced_version = latest_synced_ledger_info.ledger_info().version(); + + // Check if we've synced beyond the target. If so, notify consensus with an error. + if latest_synced_version > sync_target_version { + let error = Err(Error::SyncedBeyondTarget( + latest_synced_version, + sync_target_version, + )); self.respond_to_sync_target_notification( - consensus_sync_request.sync_target_notification, - Ok(()), - ) - .await?; + sync_target_notification, + error.clone(), + )?; + return error; } - return Ok(()); - } + + // Otherwise, notify consensus that the target has been reached + self.respond_to_sync_target_notification(sync_target_notification, Ok(()))?; + }, + None => { /* Nothing needs to be done */ }, } Ok(()) } - /// Responds to consensus for a sync notification using the specified result - pub async fn respond_to_sync_target_notification( - &mut self, - sync_target_notification: ConsensusSyncTargetNotification, + /// Responds to consensus for a sync duration notification using the specified result + pub fn respond_to_sync_duration_notification( + &self, + sync_duration_notification: ConsensusSyncDurationNotification, result: Result<(), Error>, ) -> Result<(), Error> { // Wrap the result in an error that consensus can process @@ -291,28 +353,54 @@ impl ConsensusNotificationHandler { aptos_consensus_notifications::Error::UnexpectedErrorEncountered(format!("{:?}", error)) }); + // Send the result info!( LogSchema::new(LogEntry::NotificationHandler).message(&format!( - "Responding to consensus sync notification with message: {:?}", + "Responding to consensus sync duration notification with message: {:?}", message )) ); + self.consensus_listener + .respond_to_sync_duration_notification(sync_duration_notification, message) + .map_err(|error| { + Error::CallbackSendFailed(format!( + "Consensus sync duration response error: {:?}", + error + )) + }) + } + + /// Responds to consensus for a sync notification using the specified result + pub fn respond_to_sync_target_notification( + &self, + sync_target_notification: ConsensusSyncTargetNotification, + result: Result<(), Error>, + ) -> Result<(), Error> { + // Wrap the result in an error that consensus can process + let message = result.map_err(|error| { + aptos_consensus_notifications::Error::UnexpectedErrorEncountered(format!("{:?}", error)) + }); // Send the result + info!( + LogSchema::new(LogEntry::NotificationHandler).message(&format!( + "Responding to consensus sync target notification with message: {:?}", + message + )) + ); self.consensus_listener .respond_to_sync_target_notification(sync_target_notification, message) - .await .map_err(|error| { Error::CallbackSendFailed(format!( - "Consensus sync request response error: {:?}", + "Consensus sync target response error: {:?}", error )) }) } /// Responds successfully to consensus for a commit notification - pub async fn respond_to_commit_notification( - &mut self, + pub fn respond_to_commit_notification( + &self, commit_notification: ConsensusCommitNotification, result: Result<(), Error>, ) -> Result<(), Error> { @@ -321,17 +409,15 @@ impl ConsensusNotificationHandler { aptos_consensus_notifications::Error::UnexpectedErrorEncountered(format!("{:?}", error)) }); + // Send the result debug!( LogSchema::new(LogEntry::NotificationHandler).message(&format!( "Responding to consensus commit notification with message: {:?}", message )) ); - - // Send the result self.consensus_listener .respond_to_commit_notification(commit_notification, message) - .await .map_err(|error| { Error::CallbackSendFailed(format!("Consensus commit response error: {:?}", error)) }) diff --git a/state-sync/state-sync-driver/src/tests/continuous_syncer.rs b/state-sync/state-sync-driver/src/tests/continuous_syncer.rs index 2af11d42d9d5d..5bd5a93cbf03f 100644 --- a/state-sync/state-sync-driver/src/tests/continuous_syncer.rs +++ b/state-sync/state-sync-driver/src/tests/continuous_syncer.rs @@ -19,7 +19,9 @@ use crate::{ utils::OutputFallbackHandler, }; use aptos_config::config::ContinuousSyncingMode; -use aptos_consensus_notifications::ConsensusSyncTargetNotification; +use aptos_consensus_notifications::{ + ConsensusSyncDurationNotification, ConsensusSyncTargetNotification, +}; use aptos_data_streaming_service::{ data_notification::{DataNotification, DataPayload, NotificationId}, streaming_client::{NotificationAndFeedback, NotificationFeedback}, @@ -31,7 +33,10 @@ use aptos_types::transaction::{TransactionOutputListWithProof, Version}; use claims::assert_matches; use futures::SinkExt; use mockall::{predicate::eq, Sequence}; -use std::{sync::Arc, time::Duration}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; #[tokio::test] async fn test_critical_timeout() { @@ -111,7 +116,89 @@ async fn test_critical_timeout() { } #[tokio::test] -async fn test_data_stream_transactions_with_target() { +async fn test_data_stream_transactions_with_sync_duration() { + // Create test data + let current_synced_epoch = 10; + let current_synced_version = 1000; + let notification_id = 900; + + // Create a driver configuration + let mut driver_configuration = create_full_node_driver_configuration(); + driver_configuration.config.continuous_syncing_mode = + ContinuousSyncingMode::ExecuteTransactions; + + // Create the mock streaming client + let mut mock_streaming_client = create_mock_streaming_client(); + let mut expectation_sequence = Sequence::new(); + let (mut notification_sender_1, data_stream_listener_1) = create_data_stream_listener(); + let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener(); + let data_stream_id_1 = data_stream_listener_1.data_stream_id; + for data_stream_listener in [data_stream_listener_1, data_stream_listener_2] { + mock_streaming_client + .expect_continuously_stream_transactions() + .times(1) + .with( + eq(current_synced_version), + eq(current_synced_epoch), + eq(false), + eq(None), + ) + .return_once(move |_, _, _, _| Ok(data_stream_listener)) + .in_sequence(&mut expectation_sequence); + } + mock_streaming_client + .expect_terminate_stream_with_feedback() + .with( + eq(data_stream_id_1), + eq(Some(NotificationAndFeedback::new( + notification_id, + NotificationFeedback::EmptyPayloadData, + ))), + ) + .return_const(Ok(())); + + // Create the continuous syncer + let (mut continuous_syncer, _) = create_continuous_syncer( + driver_configuration, + mock_streaming_client, + None, + true, + current_synced_version, + current_synced_epoch, + ); + + // Drive progress to initialize the transaction output stream for the sync duration + let (sync_duration_notification, _) = + ConsensusSyncDurationNotification::new(Duration::from_secs(1)); + let sync_request = Arc::new(Mutex::new(Some(ConsensusSyncRequest::new_with_duration( + Instant::now(), + sync_duration_notification, + )))); + drive_progress(&mut continuous_syncer, &sync_request).await; + + // Send an invalid output along the stream + let data_notification = DataNotification::new( + notification_id, + DataPayload::ContinuousTransactionOutputsWithProof( + create_epoch_ending_ledger_info(), + TransactionOutputListWithProof::new_empty(), + ), + ); + notification_sender_1.send(data_notification).await.unwrap(); + + // Drive progress again and ensure we get a verification error + let error = continuous_syncer + .drive_progress(sync_request.clone()) + .await + .unwrap_err(); + assert_matches!(error, Error::VerificationError(_)); + + // Drive progress to initialize the transaction output stream. + drive_progress(&mut continuous_syncer, &sync_request).await; +} + +#[tokio::test] +async fn test_data_stream_transactions_with_sync_target() { // Create test data let current_synced_epoch = 5; let current_synced_version = 234; @@ -163,9 +250,9 @@ async fn test_data_stream_transactions_with_target() { current_synced_epoch, ); - // Drive progress to initialize the transaction output stream + // Drive progress to initialize the transaction output stream for the sync target let (sync_target_notification, _) = ConsensusSyncTargetNotification::new(target_ledger_info); - let sync_request = Arc::new(Mutex::new(Some(ConsensusSyncRequest::new( + let sync_request = Arc::new(Mutex::new(Some(ConsensusSyncRequest::new_with_target( sync_target_notification, )))); drive_progress(&mut continuous_syncer, &sync_request).await; @@ -187,7 +274,7 @@ async fn test_data_stream_transactions_with_target() { .unwrap_err(); assert_matches!(error, Error::VerificationError(_)); - // Drive progress to initialize the transaction output stream. + // Drive progress to initialize the transaction output stream drive_progress(&mut continuous_syncer, &sync_request).await; } @@ -242,7 +329,7 @@ async fn test_data_stream_transaction_outputs() { current_synced_epoch, ); - // Drive progress to initialize the transaction output stream + // Drive progress to initialize the transaction output stream (without a sync target) let no_sync_request = Arc::new(Mutex::new(None)); drive_progress(&mut continuous_syncer, &no_sync_request).await; @@ -271,7 +358,89 @@ async fn test_data_stream_transaction_outputs() { } #[tokio::test] -async fn test_data_stream_transactions_or_outputs_with_target() { +async fn test_data_stream_transactions_or_outputs_with_sync_duration() { + // Create test data + let current_synced_epoch = 100; + let current_synced_version = 1000; + let notification_id = 100; + + // Create a driver configuration with a genesis waypoint and transactions or output syncing + let mut driver_configuration = create_full_node_driver_configuration(); + driver_configuration.config.continuous_syncing_mode = + ContinuousSyncingMode::ExecuteTransactionsOrApplyOutputs; + + // Create the mock streaming client + let mut mock_streaming_client = create_mock_streaming_client(); + let mut expectation_sequence = Sequence::new(); + let (mut notification_sender_1, data_stream_listener_1) = create_data_stream_listener(); + let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener(); + let data_stream_id_1 = data_stream_listener_1.data_stream_id; + for data_stream_listener in [data_stream_listener_1, data_stream_listener_2] { + mock_streaming_client + .expect_continuously_stream_transactions_or_outputs() + .times(1) + .with( + eq(current_synced_version), + eq(current_synced_epoch), + eq(false), + eq(None), + ) + .return_once(move |_, _, _, _| Ok(data_stream_listener)) + .in_sequence(&mut expectation_sequence); + } + mock_streaming_client + .expect_terminate_stream_with_feedback() + .with( + eq(data_stream_id_1), + eq(Some(NotificationAndFeedback::new( + notification_id, + NotificationFeedback::EmptyPayloadData, + ))), + ) + .return_const(Ok(())); + + // Create the continuous syncer + let (mut continuous_syncer, _) = create_continuous_syncer( + driver_configuration, + mock_streaming_client, + None, + true, + current_synced_version, + current_synced_epoch, + ); + + // Drive progress to initialize the transaction output stream for the sync duration + let (sync_duration_notification, _) = + ConsensusSyncDurationNotification::new(Duration::from_secs(1)); + let sync_request = Arc::new(Mutex::new(Some(ConsensusSyncRequest::new_with_duration( + Instant::now(), + sync_duration_notification, + )))); + drive_progress(&mut continuous_syncer, &sync_request).await; + + // Send an invalid output along the stream + let data_notification = DataNotification::new( + notification_id, + DataPayload::ContinuousTransactionOutputsWithProof( + create_epoch_ending_ledger_info(), + TransactionOutputListWithProof::new_empty(), + ), + ); + notification_sender_1.send(data_notification).await.unwrap(); + + // Drive progress again and ensure we get a verification error + let error = continuous_syncer + .drive_progress(sync_request.clone()) + .await + .unwrap_err(); + assert_matches!(error, Error::VerificationError(_)); + + // Drive progress to initialize the transaction output stream + drive_progress(&mut continuous_syncer, &sync_request).await; +} + +#[tokio::test] +async fn test_data_stream_transactions_or_outputs_with_sync_target() { // Create test data let current_synced_epoch = 5; let current_synced_version = 234; @@ -323,9 +492,9 @@ async fn test_data_stream_transactions_or_outputs_with_target() { current_synced_epoch, ); - // Drive progress to initialize the transaction output stream + // Drive progress to initialize the transaction output stream for the sync target let (sync_target_notification, _) = ConsensusSyncTargetNotification::new(target_ledger_info); - let sync_request = Arc::new(Mutex::new(Some(ConsensusSyncRequest::new( + let sync_request = Arc::new(Mutex::new(Some(ConsensusSyncRequest::new_with_target( sync_target_notification, )))); drive_progress(&mut continuous_syncer, &sync_request).await; @@ -352,7 +521,138 @@ async fn test_data_stream_transactions_or_outputs_with_target() { } #[tokio::test] -async fn test_data_stream_transactions_or_outputs_with_target_fallback() { +async fn test_data_stream_transactions_or_outputs_with_sync_duration_fallback() { + // Create test data + let current_synced_epoch = 50; + let current_synced_version = 1234; + let notification_id = 101; + + // Create a driver configuration with a genesis waypoint and transactions or output syncing + let mut driver_configuration = create_full_node_driver_configuration(); + driver_configuration.config.continuous_syncing_mode = + ContinuousSyncingMode::ExecuteTransactionsOrApplyOutputs; + + // Create the mock streaming client + let mut mock_streaming_client = create_mock_streaming_client(); + + // Set expectations for stream creations and terminations + let mut expectation_sequence = Sequence::new(); + let (_notification_sender_1, data_stream_listener_1) = create_data_stream_listener(); + let data_stream_id_1 = data_stream_listener_1.data_stream_id; + let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener(); + let data_stream_id_2 = data_stream_listener_2.data_stream_id; + let (_notification_sender_3, data_stream_listener_3) = create_data_stream_listener(); + mock_streaming_client + .expect_continuously_stream_transactions_or_outputs() + .times(1) + .with( + eq(current_synced_version), + eq(current_synced_epoch), + eq(false), + eq(None), + ) + .return_once(move |_, _, _, _| Ok(data_stream_listener_1)) + .in_sequence(&mut expectation_sequence); + mock_streaming_client + .expect_terminate_stream_with_feedback() + .times(1) + .with( + eq(data_stream_id_1), + eq(Some(NotificationAndFeedback::new( + notification_id, + NotificationFeedback::PayloadProofFailed, + ))), + ) + .return_const(Ok(())) + .in_sequence(&mut expectation_sequence); + mock_streaming_client + .expect_continuously_stream_transaction_outputs() + .times(1) + .with( + eq(current_synced_version), + eq(current_synced_epoch), + eq(None), + ) + .return_once(move |_, _, _| Ok(data_stream_listener_2)) + .in_sequence(&mut expectation_sequence); + mock_streaming_client + .expect_terminate_stream_with_feedback() + .times(1) + .with( + eq(data_stream_id_2), + eq(Some(NotificationAndFeedback::new( + notification_id, + NotificationFeedback::InvalidPayloadData, + ))), + ) + .return_const(Ok(())) + .in_sequence(&mut expectation_sequence); + mock_streaming_client + .expect_continuously_stream_transactions_or_outputs() + .times(1) + .with( + eq(current_synced_version), + eq(current_synced_epoch), + eq(false), + eq(None), + ) + .return_once(move |_, _, _, _| Ok(data_stream_listener_3)) + .in_sequence(&mut expectation_sequence); + + // Create the continuous syncer + let time_service = TimeService::mock(); + let (mut continuous_syncer, mut output_fallback_handler) = create_continuous_syncer( + driver_configuration.clone(), + mock_streaming_client, + Some(time_service.clone()), + true, + current_synced_version, + current_synced_epoch, + ); + assert!(!output_fallback_handler.in_fallback_mode()); + + // Drive progress to initialize the transactions or output stream for the sync duration + let (sync_duration_notification, _) = + ConsensusSyncDurationNotification::new(Duration::from_secs(1)); + let sync_request = Arc::new(Mutex::new(Some(ConsensusSyncRequest::new_with_duration( + Instant::now(), + sync_duration_notification, + )))); + drive_progress(&mut continuous_syncer, &sync_request).await; + + // Send a storage synchronizer error to the continuous syncer so that it falls back + // to output syncing and drive progress for the new stream type. + handle_storage_synchronizer_error( + &mut continuous_syncer, + notification_id, + NotificationFeedback::PayloadProofFailed, + ) + .await; + drive_progress(&mut continuous_syncer, &sync_request).await; + assert!(output_fallback_handler.in_fallback_mode()); + + // Elapse enough time so that fallback mode is now disabled + time_service + .into_mock() + .advance_async(Duration::from_secs( + driver_configuration.config.fallback_to_output_syncing_secs, + )) + .await; + + // Send another storage synchronizer error to the bootstrapper and drive progress + // so that a regular stream is created. + handle_storage_synchronizer_error( + &mut continuous_syncer, + notification_id, + NotificationFeedback::InvalidPayloadData, + ) + .await; + drive_progress(&mut continuous_syncer, &sync_request).await; + assert!(!output_fallback_handler.in_fallback_mode()); +} + +#[tokio::test] +async fn test_data_stream_transactions_or_outputs_with_sync_target_fallback() { // Create test data let current_synced_epoch = 5; let current_synced_version = 234; @@ -445,7 +745,7 @@ async fn test_data_stream_transactions_or_outputs_with_target_fallback() { // Drive progress to initialize the transactions or output stream let (sync_target_notification, _) = ConsensusSyncTargetNotification::new(target_ledger_info); - let sync_request = Arc::new(Mutex::new(Some(ConsensusSyncRequest::new( + let sync_request = Arc::new(Mutex::new(Some(ConsensusSyncRequest::new_with_target( sync_target_notification, )))); drive_progress(&mut continuous_syncer, &sync_request).await;