diff --git a/aptos-node/src/lib.rs b/aptos-node/src/lib.rs index 1bafcbd501189..a7823cc9e5a3a 100644 --- a/aptos-node/src/lib.rs +++ b/aptos-node/src/lib.rs @@ -433,6 +433,7 @@ fn create_state_sync_runtimes( event_subscription_service, aptos_data_client, streaming_service_client, + TimeService::real(), ); // Create and return the new state sync handle diff --git a/config/src/config/state_sync_config.rs b/config/src/config/state_sync_config.rs index 6ab8e8576b0b0..9d3472f931caf 100644 --- a/config/src/config/state_sync_config.rs +++ b/config/src/config/state_sync_config.rs @@ -22,6 +22,7 @@ pub enum BootstrappingMode { ApplyTransactionOutputsFromGenesis, // Applies transaction outputs (starting at genesis) DownloadLatestStates, // Downloads the state keys and values (at the latest version) ExecuteTransactionsFromGenesis, // Executes transactions (starting at genesis) + ExecuteOrApplyFromGenesis, // Executes transactions or applies outputs from genesis (whichever is faster) } impl BootstrappingMode { @@ -34,6 +35,7 @@ impl BootstrappingMode { BootstrappingMode::ExecuteTransactionsFromGenesis => { "execute_transactions_from_genesis" } + BootstrappingMode::ExecuteOrApplyFromGenesis => "execute_or_apply_from_genesis", } } } @@ -45,6 +47,7 @@ impl BootstrappingMode { pub enum ContinuousSyncingMode { ApplyTransactionOutputs, // Applies transaction outputs to stay up-to-date ExecuteTransactions, // Executes transactions to stay up-to-date + ExecuteTransactionsOrApplyOutputs, // Executes transactions or applies outputs to stay up-to-date (whichever is faster) } impl ContinuousSyncingMode { @@ -52,6 +55,9 @@ impl ContinuousSyncingMode { match self { ContinuousSyncingMode::ApplyTransactionOutputs => "apply_transaction_outputs", ContinuousSyncingMode::ExecuteTransactions => "execute_transactions", + ContinuousSyncingMode::ExecuteTransactionsOrApplyOutputs => { + "execute_transactions_or_apply_outputs" + } } } } @@ -62,6 +68,7 @@ pub struct StateSyncDriverConfig { pub bootstrapping_mode: BootstrappingMode, // The mode by which to bootstrap pub commit_notification_timeout_ms: u64, // The max time taken to process a commit notification pub continuous_syncing_mode: ContinuousSyncingMode, // The mode by which to sync after bootstrapping + pub fallback_to_output_syncing_secs: u64, // The duration to fallback to output syncing after an execution failure pub progress_check_interval_ms: u64, // The interval (ms) at which to check state sync progress pub max_connection_deadline_secs: u64, // The max time (secs) to wait for connections from peers pub max_consecutive_stream_notifications: u64, // The max number of notifications to process per driver loop @@ -76,9 +83,10 @@ pub struct StateSyncDriverConfig { impl Default for StateSyncDriverConfig { fn default() -> Self { Self { - bootstrapping_mode: BootstrappingMode::ApplyTransactionOutputsFromGenesis, + bootstrapping_mode: BootstrappingMode::ExecuteOrApplyFromGenesis, commit_notification_timeout_ms: 5000, - continuous_syncing_mode: ContinuousSyncingMode::ApplyTransactionOutputs, + continuous_syncing_mode: ContinuousSyncingMode::ExecuteTransactionsOrApplyOutputs, + fallback_to_output_syncing_secs: 120, // 2 minutes progress_check_interval_ms: 100, max_connection_deadline_secs: 10, max_consecutive_stream_notifications: 10, @@ -116,7 +124,7 @@ impl Default for StorageServiceConfig { max_state_chunk_size: 2000, max_subscription_period_ms: 5000, max_transaction_chunk_size: 2000, - max_transaction_output_chunk_size: 2000, + max_transaction_output_chunk_size: 1000, storage_summary_refresh_interval_ms: 50, } } @@ -183,7 +191,7 @@ impl Default for AptosDataClientConfig { Self { max_num_in_flight_priority_polls: 10, max_num_in_flight_regular_polls: 10, - max_num_output_reductions: 2, + max_num_output_reductions: 0, max_response_timeout_ms: 60000, // 60 seconds response_timeout_ms: 10000, // 10 seconds subscription_timeout_ms: 5000, // 5 seconds diff --git a/state-sync/state-sync-v2/data-streaming-service/src/data_notification.rs b/state-sync/state-sync-v2/data-streaming-service/src/data_notification.rs index 4276113dbbe71..36122bc23fc44 100644 --- a/state-sync/state-sync-v2/data-streaming-service/src/data_notification.rs +++ b/state-sync/state-sync-v2/data-streaming-service/src/data_notification.rs @@ -43,6 +43,8 @@ pub enum DataClientRequest { StateValuesWithProof(StateValuesWithProofRequest), TransactionsWithProof(TransactionsWithProofRequest), TransactionOutputsWithProof(TransactionOutputsWithProofRequest), + NewTransactionsOrOutputsWithProof(NewTransactionsOrOutputsWithProofRequest), + TransactionsOrOutputsWithProof(TransactionsOrOutputsWithProofRequest), } impl DataClientRequest { @@ -56,6 +58,8 @@ impl DataClientRequest { Self::StateValuesWithProof(_) => "state_values_with_proof", Self::TransactionsWithProof(_) => "transactions_with_proof", Self::TransactionOutputsWithProof(_) => "transaction_outputs_with_proof", + Self::NewTransactionsOrOutputsWithProof(_) => "new_transactions_or_outputs_with_proof", + Self::TransactionsOrOutputsWithProof(_) => "transactions_or_outputs_with_proof", } } } @@ -83,6 +87,14 @@ pub struct NewTransactionsWithProofRequest { pub include_events: bool, } +/// A client request for fetching new transactions or outputs with proofs. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct NewTransactionsOrOutputsWithProofRequest { + pub known_version: Version, + pub known_epoch: Epoch, + pub include_events: bool, +} + /// A client request for fetching new transaction outputs with proofs. #[derive(Clone, Debug, Eq, PartialEq)] pub struct NewTransactionOutputsWithProofRequest { @@ -113,6 +125,15 @@ pub struct TransactionOutputsWithProofRequest { pub proof_version: Version, } +/// A client request for fetching transaction or outputs with proofs. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct TransactionsOrOutputsWithProofRequest { + pub start_version: Version, + pub end_version: Version, + pub proof_version: Version, + pub include_events: bool, +} + /// A pending client response where data has been requested from the /// network and will be available in `client_response` when received. pub struct PendingClientResponse { diff --git a/state-sync/state-sync-v2/data-streaming-service/src/data_stream.rs b/state-sync/state-sync-v2/data-streaming-service/src/data_stream.rs index 81052c2700164..2b7cc363753cc 100644 --- a/state-sync/state-sync-v2/data-streaming-service/src/data_stream.rs +++ b/state-sync/state-sync-v2/data-streaming-service/src/data_stream.rs @@ -1,6 +1,9 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 +use crate::data_notification::{ + NewTransactionsOrOutputsWithProofRequest, TransactionsOrOutputsWithProofRequest, +}; use crate::metrics::increment_counter_multiple; use crate::{ data_notification, @@ -728,6 +731,15 @@ fn sanity_check_client_response( ResponsePayload::NewTransactionsWithProof(_) ) } + DataClientRequest::NewTransactionsOrOutputsWithProof(_) => { + matches!( + data_client_response.payload, + ResponsePayload::NewTransactionsWithProof(_) + ) || matches!( + data_client_response.payload, + ResponsePayload::NewTransactionOutputsWithProof(_) + ) + } DataClientRequest::NumberOfStates(_) => { matches!( data_client_response.payload, @@ -752,6 +764,15 @@ fn sanity_check_client_response( ResponsePayload::TransactionOutputsWithProof(_) ) } + DataClientRequest::TransactionsOrOutputsWithProof(_) => { + matches!( + data_client_response.payload, + ResponsePayload::TransactionsWithProof(_) + ) || matches!( + data_client_response.payload, + ResponsePayload::TransactionOutputsWithProof(_) + ) + } } } @@ -808,6 +829,14 @@ fn spawn_request_task( ) .await } + DataClientRequest::NewTransactionsOrOutputsWithProof(request) => { + get_new_transactions_or_outputs_with_proof( + aptos_data_client, + request, + request_timeout_ms, + ) + .await + } DataClientRequest::NumberOfStates(request) => { get_number_of_states(aptos_data_client, request, request_timeout_ms).await } @@ -821,6 +850,14 @@ fn spawn_request_task( DataClientRequest::TransactionsWithProof(request) => { get_transactions_with_proof(aptos_data_client, request, request_timeout_ms).await } + DataClientRequest::TransactionsOrOutputsWithProof(request) => { + get_transactions_or_outputs_with_proof( + aptos_data_client, + request, + request_timeout_ms, + ) + .await + } }; // Increment the appropriate counter depending on the response @@ -903,6 +940,21 @@ async fn get_new_transactions_with_proof( + aptos_data_client: T, + request: NewTransactionsOrOutputsWithProofRequest, + request_timeout_ms: u64, +) -> Result, aptos_data_client::Error> { + let client_response = aptos_data_client.get_new_transactions_or_outputs_with_proof( + request.known_version, + request.known_epoch, + request.include_events, + request_timeout_ms, + ); + let (context, payload) = client_response.await?.into_parts(); + Ok(Response::new(context, ResponsePayload::try_from(payload)?)) +} + async fn get_number_of_states( aptos_data_client: T, request: NumberOfStatesRequest, @@ -948,6 +1000,22 @@ async fn get_transactions_with_proof( + aptos_data_client: T, + request: TransactionsOrOutputsWithProofRequest, + request_timeout_ms: u64, +) -> Result, aptos_data_client::Error> { + let client_response = aptos_data_client.get_transactions_or_outputs_with_proof( + request.proof_version, + request.start_version, + request.end_version, + request.include_events, + request_timeout_ms, + ); + let (context, payload) = client_response.await?.into_parts(); + Ok(Response::new(context, ResponsePayload::try_from(payload)?)) +} + /// Returns true iff the given request is a subscription request fn is_subscription_request(request: &DataClientRequest) -> bool { matches!(request, DataClientRequest::NewTransactionsWithProof(_)) @@ -955,4 +1023,8 @@ fn is_subscription_request(request: &DataClientRequest) -> bool { request, DataClientRequest::NewTransactionOutputsWithProof(_) ) + || matches!( + request, + DataClientRequest::NewTransactionsOrOutputsWithProof(_) + ) } diff --git a/state-sync/state-sync-v2/data-streaming-service/src/stream_engine.rs b/state-sync/state-sync-v2/data-streaming-service/src/stream_engine.rs index 62a268761b9c0..84585c07d04be 100644 --- a/state-sync/state-sync-v2/data-streaming-service/src/stream_engine.rs +++ b/state-sync/state-sync-v2/data-streaming-service/src/stream_engine.rs @@ -1,13 +1,17 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 +use crate::data_notification::DataClientRequest::NewTransactionsOrOutputsWithProof; +use crate::data_notification::{ + NewTransactionsOrOutputsWithProofRequest, TransactionsOrOutputsWithProofRequest, +}; use crate::{ data_notification::{ DataClientRequest, DataClientRequest::{ EpochEndingLedgerInfos, NewTransactionOutputsWithProof, NewTransactionsWithProof, NumberOfStates, StateValuesWithProof, TransactionOutputsWithProof, - TransactionsWithProof, + TransactionsOrOutputsWithProof, TransactionsWithProof, }, DataNotification, DataPayload, EpochEndingLedgerInfosRequest, NewTransactionOutputsWithProofRequest, NewTransactionsWithProofRequest, @@ -121,6 +125,9 @@ impl StreamEngine { StreamRequest::ContinuouslyStreamTransactions(_) => { Ok(ContinuousTransactionStreamEngine::new(stream_request)?.into()) } + StreamRequest::ContinuouslyStreamTransactionsOrOutputs(_) => { + Ok(ContinuousTransactionStreamEngine::new(stream_request)?.into()) + } StreamRequest::GetAllStates(request) => Ok(StateStreamEngine::new(request)?.into()), StreamRequest::GetAllEpochEndingLedgerInfos(request) => { Ok(EpochEndingStreamEngine::new(request, advertised_data)?.into()) @@ -131,6 +138,9 @@ impl StreamEngine { StreamRequest::GetAllTransactions(_) => { Ok(TransactionStreamEngine::new(stream_request)?.into()) } + StreamRequest::GetAllTransactionsOrOutputs(_) => { + Ok(TransactionStreamEngine::new(stream_request)?.into()) + } _ => Err(Error::UnsupportedRequestEncountered(format!( "Stream request not supported: {:?}", stream_request @@ -352,39 +362,28 @@ pub struct ContinuousTransactionStreamEngine { impl ContinuousTransactionStreamEngine { fn new(stream_request: &StreamRequest) -> Result { - match stream_request { + let (next_version, next_epoch) = match stream_request { StreamRequest::ContinuouslyStreamTransactions(request) => { - let (next_version, next_epoch) = Self::calculate_next_version_and_epoch( - request.known_version, - request.known_epoch, - )?; - Ok(ContinuousTransactionStreamEngine { - request: stream_request.clone(), - current_target_ledger_info: None, - end_of_epoch_requested: false, - subscription_requested: false, - next_stream_version_and_epoch: (next_version, next_epoch), - next_request_version_and_epoch: (next_version, next_epoch), - stream_is_complete: false, - }) + Self::calculate_next_version_and_epoch(request.known_version, request.known_epoch)? } StreamRequest::ContinuouslyStreamTransactionOutputs(request) => { - let (next_version, next_epoch) = Self::calculate_next_version_and_epoch( - request.known_version, - request.known_epoch, - )?; - Ok(ContinuousTransactionStreamEngine { - request: stream_request.clone(), - current_target_ledger_info: None, - end_of_epoch_requested: false, - subscription_requested: false, - next_stream_version_and_epoch: (next_version, next_epoch), - next_request_version_and_epoch: (next_version, next_epoch), - stream_is_complete: false, - }) + Self::calculate_next_version_and_epoch(request.known_version, request.known_epoch)? + } + StreamRequest::ContinuouslyStreamTransactionsOrOutputs(request) => { + Self::calculate_next_version_and_epoch(request.known_version, request.known_epoch)? } request => invalid_stream_request!(request), - } + }; + + Ok(ContinuousTransactionStreamEngine { + request: stream_request.clone(), + current_target_ledger_info: None, + end_of_epoch_requested: false, + subscription_requested: false, + next_stream_version_and_epoch: (next_version, next_epoch), + next_request_version_and_epoch: (next_version, next_epoch), + stream_is_complete: false, + }) } fn calculate_next_version_and_epoch( @@ -413,6 +412,11 @@ impl ContinuousTransactionStreamEngine { return Ok(Some(target.clone())); } } + StreamRequest::ContinuouslyStreamTransactionsOrOutputs(request) => { + if let Some(target) = &request.target { + return Ok(Some(target.clone())); + } + } request => invalid_stream_request!(request), }; @@ -525,19 +529,24 @@ impl ContinuousTransactionStreamEngine { let data_client_request = match &self.request { StreamRequest::ContinuouslyStreamTransactions(request) => { - DataClientRequest::NewTransactionsWithProof(NewTransactionsWithProofRequest { + NewTransactionsWithProof(NewTransactionsWithProofRequest { known_version, known_epoch, include_events: request.include_events, }) } StreamRequest::ContinuouslyStreamTransactionOutputs(_) => { - DataClientRequest::NewTransactionOutputsWithProof( - NewTransactionOutputsWithProofRequest { - known_version, - known_epoch, - }, - ) + NewTransactionOutputsWithProof(NewTransactionOutputsWithProofRequest { + known_version, + known_epoch, + }) + } + StreamRequest::ContinuouslyStreamTransactionsOrOutputs(request) => { + NewTransactionsOrOutputsWithProof(NewTransactionsOrOutputsWithProofRequest { + known_version, + known_epoch, + include_events: request.include_events, + }) } request => invalid_stream_request!(request), }; @@ -624,6 +633,13 @@ impl ContinuousTransactionStreamEngine { } } } + StreamRequest::ContinuouslyStreamTransactionsOrOutputs(request) => { + if let Some(target) = &request.target { + if request_end_version == target.ledger_info().version() { + self.stream_is_complete = true; + } + } + } request => invalid_stream_request!(request), }; @@ -692,6 +708,19 @@ impl ContinuousTransactionStreamEngine { } } } + StreamRequest::ContinuouslyStreamTransactionsOrOutputs(_) => { + for client_request in client_requests { + match client_request { + DataClientRequest::TransactionsOrOutputsWithProof(request) => { + self.update_request_version_and_epoch( + request.end_version, + target_ledger_info, + )?; + } + request => invalid_client_request!(request, self), + } + } + } request => invalid_stream_request!(request), } @@ -768,6 +797,11 @@ impl DataStreamEngine for ContinuousTransactionStreamEngine { .optimal_chunk_sizes .transaction_output_chunk_size } + StreamRequest::ContinuouslyStreamTransactionsOrOutputs(_) => { + global_data_summary + .optimal_chunk_sizes + .transaction_output_chunk_size + } request => invalid_stream_request!(request), }; let client_requests = create_data_client_requests( @@ -795,6 +829,9 @@ impl DataStreamEngine for ContinuousTransactionStreamEngine { StreamRequest::ContinuouslyStreamTransactionOutputs(_) => { &advertised_data.transaction_outputs } + StreamRequest::ContinuouslyStreamTransactionsOrOutputs(_) => { + &advertised_data.transaction_outputs + } request => invalid_stream_request!(request), }; @@ -846,6 +883,17 @@ impl DataStreamEngine for ContinuousTransactionStreamEngine { (LogSchema::new(LogEntry::RequestTimeout) .message("Subscription request for new transaction outputs timed out!")) ); + } else if matches!( + self.request, + StreamRequest::ContinuouslyStreamTransactionsOrOutputs(_) + ) && matches!( + client_request, + DataClientRequest::NewTransactionsOrOutputsWithProof(_) + ) { + info!( + (LogSchema::new(LogEntry::RequestTimeout) + .message("Subscription request for new transactions or outputs timed out!")) + ); } else { return Err(Error::UnexpectedErrorEncountered(format!("Received a subscription request timeout but the request did not match the expected type for the stream! Request: {:?}, Stream: {:?}", client_request, self.request))); } @@ -894,6 +942,17 @@ impl DataStreamEngine for ContinuousTransactionStreamEngine { } request => invalid_stream_request!(request), }, + NewTransactionsOrOutputsWithProof(request) => match &self.request { + StreamRequest::ContinuouslyStreamTransactionsOrOutputs(_) => { + let data_notification = self.create_notification_for_subscription_data( + request.known_version, + client_response_payload, + notification_id_generator, + )?; + Ok(Some(data_notification)) + } + request => invalid_stream_request!(request), + }, TransactionsWithProof(request) => match &self.request { StreamRequest::ContinuouslyStreamTransactions(_) => { let data_notification = self.create_notification_for_continuous_data( @@ -918,6 +977,18 @@ impl DataStreamEngine for ContinuousTransactionStreamEngine { } request => invalid_stream_request!(request), }, + TransactionsOrOutputsWithProof(request) => match &self.request { + StreamRequest::ContinuouslyStreamTransactionsOrOutputs(_) => { + let data_notification = self.create_notification_for_continuous_data( + request.start_version, + request.end_version, + client_response_payload, + notification_id_generator, + )?; + Ok(Some(data_notification)) + } + request => invalid_stream_request!(request), + }, request => invalid_client_request!(request, self), } } @@ -1074,7 +1145,7 @@ impl DataStreamEngine for EpochEndingStreamEngine { #[derive(Clone, Debug)] pub struct TransactionStreamEngine { - // The original stream request made by the client (i.e., a transaction or + // The original stream request made by the client (e.g., a transaction or // transaction output stream request). pub request: StreamRequest, @@ -1105,6 +1176,12 @@ impl TransactionStreamEngine { next_request_version: request.start_version, stream_is_complete: false, }), + StreamRequest::GetAllTransactionsOrOutputs(request) => Ok(TransactionStreamEngine { + request: stream_request.clone(), + next_stream_version: request.start_version, + next_request_version: request.start_version, + stream_is_complete: false, + }), request => invalid_stream_request!(request), } } @@ -1166,6 +1243,16 @@ impl TransactionStreamEngine { } } } + StreamRequest::GetAllTransactionsOrOutputs(_) => { + for client_request in client_requests.iter() { + match client_request { + TransactionsOrOutputsWithProof(request) => { + self.update_request_version(request.end_version)?; + } + request => invalid_client_request!(request, self), + } + } + } request => invalid_stream_request!(request), } @@ -1192,6 +1279,12 @@ impl DataStreamEngine for TransactionStreamEngine { .optimal_chunk_sizes .transaction_output_chunk_size, ), + StreamRequest::GetAllTransactionsOrOutputs(request) => ( + request.end_version, + global_data_summary + .optimal_chunk_sizes + .transaction_output_chunk_size, + ), request => invalid_stream_request!(request), }; @@ -1216,6 +1309,9 @@ impl DataStreamEngine for TransactionStreamEngine { StreamRequest::GetAllTransactionOutputs(request) => { (request.end_version, &advertised_data.transaction_outputs) } + StreamRequest::GetAllTransactionsOrOutputs(request) => { + (request.end_version, &advertised_data.transaction_outputs) + } request => invalid_stream_request!(request), }; AdvertisedData::contains_range( @@ -1258,6 +1354,17 @@ impl DataStreamEngine for TransactionStreamEngine { } request => invalid_client_request!(request, self), }, + StreamRequest::GetAllTransactionsOrOutputs(stream_request) => match client_request { + TransactionsOrOutputsWithProof(request) => { + let stream_end_version = stream_request.end_version; + self.update_stream_version( + request.start_version, + request.end_version, + stream_end_version, + )?; + } + request => invalid_client_request!(request, self), + }, request => invalid_stream_request!(request), } @@ -1351,7 +1458,7 @@ fn create_data_client_request( ) -> DataClientRequest { match stream_engine { StreamEngine::StateStreamEngine(stream_engine) => { - DataClientRequest::StateValuesWithProof(StateValuesWithProofRequest { + StateValuesWithProof(StateValuesWithProofRequest { version: stream_engine.request.version, start_index, end_index, @@ -1364,7 +1471,7 @@ fn create_data_client_request( .version(); match &stream_engine.request { StreamRequest::ContinuouslyStreamTransactions(request) => { - DataClientRequest::TransactionsWithProof(TransactionsWithProofRequest { + TransactionsWithProof(TransactionsWithProofRequest { start_version: start_index, end_version: end_index, proof_version: target_ledger_info_version, @@ -1372,26 +1479,32 @@ fn create_data_client_request( }) } StreamRequest::ContinuouslyStreamTransactionOutputs(_) => { - DataClientRequest::TransactionOutputsWithProof( - TransactionOutputsWithProofRequest { - start_version: start_index, - end_version: end_index, - proof_version: target_ledger_info_version, - }, - ) + TransactionOutputsWithProof(TransactionOutputsWithProofRequest { + start_version: start_index, + end_version: end_index, + proof_version: target_ledger_info_version, + }) + } + StreamRequest::ContinuouslyStreamTransactionsOrOutputs(request) => { + TransactionsOrOutputsWithProof(TransactionsOrOutputsWithProofRequest { + start_version: start_index, + end_version: end_index, + proof_version: target_ledger_info_version, + include_events: request.include_events, + }) } request => invalid_stream_request!(request), } } StreamEngine::EpochEndingStreamEngine(_) => { - DataClientRequest::EpochEndingLedgerInfos(EpochEndingLedgerInfosRequest { + EpochEndingLedgerInfos(EpochEndingLedgerInfosRequest { start_epoch: start_index, end_epoch: end_index, }) } StreamEngine::TransactionStreamEngine(stream_engine) => match &stream_engine.request { StreamRequest::GetAllTransactions(request) => { - DataClientRequest::TransactionsWithProof(TransactionsWithProofRequest { + TransactionsWithProof(TransactionsWithProofRequest { start_version: start_index, end_version: end_index, proof_version: request.proof_version, @@ -1399,12 +1512,20 @@ fn create_data_client_request( }) } StreamRequest::GetAllTransactionOutputs(request) => { - DataClientRequest::TransactionOutputsWithProof(TransactionOutputsWithProofRequest { + TransactionOutputsWithProof(TransactionOutputsWithProofRequest { start_version: start_index, end_version: end_index, proof_version: request.proof_version, }) } + StreamRequest::GetAllTransactionsOrOutputs(request) => { + TransactionsOrOutputsWithProof(TransactionsOrOutputsWithProofRequest { + start_version: start_index, + end_version: end_index, + proof_version: request.proof_version, + include_events: request.include_events, + }) + } request => invalid_stream_request!(request), }, } diff --git a/state-sync/state-sync-v2/data-streaming-service/src/streaming_client.rs b/state-sync/state-sync-v2/data-streaming-service/src/streaming_client.rs index 358c8792eb1f5..fa4d88c1ce7e3 100644 --- a/state-sync/state-sync-v2/data-streaming-service/src/streaming_client.rs +++ b/state-sync/state-sync-v2/data-streaming-service/src/streaming_client.rs @@ -68,6 +68,18 @@ pub trait DataStreamingClient { include_events: bool, ) -> Result; + /// Fetches all transactions or outputs with proofs from `start_version` to + /// `end_version` (inclusive) at the specified `proof_version`. If + /// `include_events` is true, events are also included in the proofs for + /// transaction notifications. + async fn get_all_transactions_or_outputs( + &self, + start_version: Version, + end_version: Version, + proof_version: Version, + include_events: bool, + ) -> Result; + /// Continuously streams transaction outputs with proofs as the blockchain /// grows. The stream starts at `known_version + 1` (inclusive) and /// `known_epoch`, where the `known_epoch` is expected to be the epoch @@ -105,6 +117,27 @@ pub trait DataStreamingClient { target: Option, ) -> Result; + /// Continuously streams transactions or outputs with proofs as the blockchain + /// grows. The stream starts at `known_version + 1` (inclusive) and + /// `known_epoch`, where the `known_epoch` is expected to be the epoch + /// that contains `known_version + 1`, i.e., any epoch change at + /// `known_version` must be noted by the client. + /// Transaction or output proof versions are tied to ledger infos within the + /// same epoch, otherwise epoch ending ledger infos will signify epoch changes. + /// + /// If `include_events` is true, events are also included in the proofs when + /// receiving transaction notifications. + /// + /// Note: if a `target` is provided, the stream will terminate once it reaches + /// the target. Otherwise, it will continue indefinitely. + async fn continuously_stream_transactions_or_outputs( + &self, + start_version: Version, + start_epoch: Epoch, + include_events: bool, + target: Option, + ) -> Result; + /// Terminates the stream with the given stream id and (optionally) provides /// feedback about the notification and the termination reason. /// @@ -138,8 +171,10 @@ pub enum StreamRequest { GetAllStates(GetAllStatesRequest), GetAllTransactions(GetAllTransactionsRequest), GetAllTransactionOutputs(GetAllTransactionOutputsRequest), + GetAllTransactionsOrOutputs(GetAllTransactionsOrOutputsRequest), ContinuouslyStreamTransactions(ContinuouslyStreamTransactionsRequest), ContinuouslyStreamTransactionOutputs(ContinuouslyStreamTransactionOutputsRequest), + ContinuouslyStreamTransactionsOrOutputs(ContinuouslyStreamTransactionsOrOutputsRequest), TerminateStream(TerminateStreamRequest), } @@ -151,10 +186,14 @@ impl StreamRequest { Self::GetAllStates(_) => "get_all_states", Self::GetAllTransactions(_) => "get_all_transactions", Self::GetAllTransactionOutputs(_) => "get_all_transaction_outputs", + Self::GetAllTransactionsOrOutputs(_) => "get_all_transactions_or_outputs", Self::ContinuouslyStreamTransactions(_) => "continuously_stream_transactions", Self::ContinuouslyStreamTransactionOutputs(_) => { "continuously_stream_transaction_outputs" } + Self::ContinuouslyStreamTransactionsOrOutputs(_) => { + "continuously_stream_transactions_or_outputs" + } Self::TerminateStream(_) => "terminate_stream", } } @@ -190,6 +229,15 @@ pub struct GetAllTransactionOutputsRequest { pub proof_version: Version, } +/// A client request for fetching all transactions or outputs with proofs. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct GetAllTransactionsOrOutputsRequest { + pub start_version: Version, + pub end_version: Version, + pub proof_version: Version, + pub include_events: bool, +} + /// A client request for continuously streaming transactions with proofs #[derive(Clone, Debug, Eq, PartialEq)] pub struct ContinuouslyStreamTransactionsRequest { @@ -207,6 +255,15 @@ pub struct ContinuouslyStreamTransactionOutputsRequest { pub target: Option, } +/// A client request for continuously streaming transactions or outputs with proofs +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct ContinuouslyStreamTransactionsOrOutputsRequest { + pub known_version: Version, + pub known_epoch: Epoch, + pub include_events: bool, + pub target: Option, +} + /// A client request for terminating a stream and providing payload feedback. #[derive(Clone, Debug, Eq, PartialEq)] pub struct TerminateStreamRequest { @@ -329,6 +386,23 @@ impl DataStreamingClient for StreamingServiceClient { self.send_request_and_await_response(client_request).await } + async fn get_all_transactions_or_outputs( + &self, + start_version: u64, + end_version: u64, + proof_version: u64, + include_events: bool, + ) -> Result { + let client_request = + StreamRequest::GetAllTransactionsOrOutputs(GetAllTransactionsOrOutputsRequest { + start_version, + end_version, + proof_version, + include_events, + }); + self.send_request_and_await_response(client_request).await + } + async fn continuously_stream_transaction_outputs( &self, known_version: u64, @@ -362,6 +436,24 @@ impl DataStreamingClient for StreamingServiceClient { self.send_request_and_await_response(client_request).await } + async fn continuously_stream_transactions_or_outputs( + &self, + known_version: u64, + known_epoch: u64, + include_events: bool, + target: Option, + ) -> Result { + let client_request = StreamRequest::ContinuouslyStreamTransactionsOrOutputs( + ContinuouslyStreamTransactionsOrOutputsRequest { + known_version, + known_epoch, + include_events, + target, + }, + ); + self.send_request_and_await_response(client_request).await + } + async fn terminate_stream_with_feedback( &self, data_stream_id: DataStreamId, diff --git a/state-sync/state-sync-v2/data-streaming-service/src/tests/data_stream.rs b/state-sync/state-sync-v2/data-streaming-service/src/tests/data_stream.rs index 255636202b6e2..5d9627c66b508 100644 --- a/state-sync/state-sync-v2/data-streaming-service/src/tests/data_stream.rs +++ b/state-sync/state-sync-v2/data-streaming-service/src/tests/data_stream.rs @@ -5,9 +5,13 @@ use crate::data_notification::{ NewTransactionOutputsWithProofRequest, NewTransactionsWithProofRequest, TransactionOutputsWithProofRequest, TransactionsWithProofRequest, }; +use crate::data_notification::{ + NewTransactionsOrOutputsWithProofRequest, TransactionsOrOutputsWithProofRequest, +}; use crate::streaming_client::{ - ContinuouslyStreamTransactionOutputsRequest, ContinuouslyStreamTransactionsRequest, - GetAllTransactionOutputsRequest, + ContinuouslyStreamTransactionOutputsRequest, ContinuouslyStreamTransactionsOrOutputsRequest, + ContinuouslyStreamTransactionsRequest, GetAllTransactionOutputsRequest, + GetAllTransactionsOrOutputsRequest, }; use crate::tests::utils::{ create_output_list_with_proof, MAX_ADVERTISED_TRANSACTION, MAX_NOTIFICATION_TIMEOUT_SECS, @@ -418,7 +422,13 @@ async fn test_continuous_stream_epoch_change_retry() { MIN_ADVERTISED_TRANSACTION_OUTPUT, MIN_ADVERTISED_EPOCH_END, ); - for mut data_stream in [data_stream_1, data_stream_2] { + let (data_stream_3, _stream_listener_3) = create_continuous_transaction_or_output_stream( + AptosDataClientConfig::default(), + streaming_service_config, + MIN_ADVERTISED_TRANSACTION_OUTPUT, + MIN_ADVERTISED_EPOCH_END, + ); + for mut data_stream in [data_stream_1, data_stream_2, data_stream_3] { // Initialize the data stream and drive progress let global_data_summary = create_global_data_summary(1); initialize_data_requests(&mut data_stream, &global_data_summary); @@ -482,9 +492,16 @@ async fn test_continuous_stream_subscription_retry() { MAX_ADVERTISED_TRANSACTION_OUTPUT, MAX_ADVERTISED_EPOCH_END, ); - for (mut data_stream, mut stream_listener, transactions_only) in [ - (data_stream_1, stream_listener_1, true), - (data_stream_2, stream_listener_2, false), + let (data_stream_3, stream_listener_3) = create_continuous_transaction_or_output_stream( + AptosDataClientConfig::default(), + streaming_service_config, + MAX_ADVERTISED_TRANSACTION_OUTPUT, + MAX_ADVERTISED_EPOCH_END, + ); + for (mut data_stream, mut stream_listener, transactions_only, allow_transactions_or_outputs) in [ + (data_stream_1, stream_listener_1, true, false), + (data_stream_2, stream_listener_2, false, false), + (data_stream_3, stream_listener_3, false, true), ] { // Initialize the data stream let global_data_summary = create_global_data_summary(1); @@ -494,9 +511,17 @@ async fn test_continuous_stream_subscription_retry() { let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); assert_eq!(sent_requests.as_ref().unwrap().len(), 1); - // Verify the request is for new transaction data + // Verify the request is for the correct data let client_request = get_pending_client_request(&mut data_stream, 0); - let expected_request = if transactions_only { + let expected_request = if allow_transactions_or_outputs { + DataClientRequest::NewTransactionsOrOutputsWithProof( + NewTransactionsOrOutputsWithProofRequest { + known_version: MAX_ADVERTISED_TRANSACTION_OUTPUT, + known_epoch: MAX_ADVERTISED_EPOCH_END, + include_events: false, + }, + ) + } else if transactions_only { DataClientRequest::NewTransactionsWithProof(NewTransactionsWithProofRequest { known_version: MAX_ADVERTISED_TRANSACTION, known_epoch: MAX_ADVERTISED_EPOCH_END, @@ -525,8 +550,8 @@ async fn test_continuous_stream_subscription_retry() { process_data_responses(&mut data_stream, &global_data_summary).await; // Verify the same subscription request was resent to the network - let client_request = get_pending_client_request(&mut data_stream, 0); - assert_eq!(client_request, expected_request); + let new_client_request = get_pending_client_request(&mut data_stream, 0); + assert_eq!(new_client_request, client_request); } // Set a subscription response in the queue and process it @@ -542,7 +567,15 @@ async fn test_continuous_stream_subscription_retry() { let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); assert_eq!(sent_requests.as_ref().unwrap().len(), 1); let client_request = get_pending_client_request(&mut data_stream, 0); - let expected_request = if transactions_only { + let expected_request = if allow_transactions_or_outputs { + DataClientRequest::NewTransactionsOrOutputsWithProof( + NewTransactionsOrOutputsWithProofRequest { + known_version: MAX_ADVERTISED_TRANSACTION_OUTPUT + 1, + known_epoch: MAX_ADVERTISED_EPOCH_END, + include_events: false, + }, + ) + } else if transactions_only { DataClientRequest::NewTransactionsWithProof(NewTransactionsWithProofRequest { known_version: MAX_ADVERTISED_TRANSACTION + 1, known_epoch: MAX_ADVERTISED_EPOCH_END, @@ -579,9 +612,18 @@ async fn test_continuous_stream_subscription_retry() { let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); assert_eq!(sent_requests.as_ref().unwrap().len(), 3); for i in 0..3 { - let expected_version = MAX_ADVERTISED_TRANSACTION + 2 + i as u64; let client_request = get_pending_client_request(&mut data_stream, i); - let expected_request = if transactions_only { + let expected_version = MAX_ADVERTISED_TRANSACTION + 2 + i as u64; + let expected_request = if allow_transactions_or_outputs { + DataClientRequest::TransactionsOrOutputsWithProof( + TransactionsOrOutputsWithProofRequest { + start_version: expected_version, + end_version: expected_version, + proof_version: new_highest_synced_version, + include_events: false, + }, + ) + } else if transactions_only { DataClientRequest::TransactionsWithProof(TransactionsWithProofRequest { start_version: expected_version, end_version: expected_version, @@ -622,9 +664,16 @@ async fn test_continuous_stream_subscription_timeout() { MAX_ADVERTISED_TRANSACTION_OUTPUT, MAX_ADVERTISED_EPOCH_END, ); - for (mut data_stream, mut stream_listener, transactions_only) in [ - (data_stream_1, stream_listener_1, true), - (data_stream_2, stream_listener_2, false), + let (data_stream_3, stream_listener_3) = create_continuous_transaction_or_output_stream( + data_client_config, + DataStreamingServiceConfig::default(), + MAX_ADVERTISED_TRANSACTION_OUTPUT, + MAX_ADVERTISED_EPOCH_END, + ); + for (mut data_stream, mut stream_listener, transactions_only, allow_transactions_or_outputs) in [ + (data_stream_1, stream_listener_1, true, false), + (data_stream_2, stream_listener_2, false, false), + (data_stream_3, stream_listener_3, false, true), ] { // Initialize the data stream let global_data_summary = create_global_data_summary(1); @@ -640,6 +689,7 @@ async fn test_continuous_stream_subscription_timeout() { &mut data_stream, &mut stream_listener, transactions_only, + allow_transactions_or_outputs, true, &global_data_summary, ) @@ -658,6 +708,7 @@ async fn test_continuous_stream_subscription_timeout() { &mut data_stream, &mut stream_listener, transactions_only, + allow_transactions_or_outputs, true, &global_data_summary, ) @@ -666,7 +717,7 @@ async fn test_continuous_stream_subscription_timeout() { } #[tokio::test(flavor = "multi_thread")] -async fn test_transaction_and_output_stream_timeout() { +async fn test_transactions_and_output_stream_timeout() { // Create a test data client config let max_response_timeout_ms = 85; let response_timeout_ms = 7; @@ -685,7 +736,7 @@ async fn test_transaction_and_output_stream_timeout() { ..Default::default() }; - // Test both types of data streams + // Test all types of data streams let (data_stream_1, stream_listener_1) = create_transaction_stream( data_client_config, streaming_service_config, @@ -698,9 +749,16 @@ async fn test_transaction_and_output_stream_timeout() { MIN_ADVERTISED_TRANSACTION_OUTPUT, MAX_ADVERTISED_TRANSACTION_OUTPUT, ); - for (mut data_stream, mut stream_listener, transactions_only) in [ - (data_stream_1, stream_listener_1, true), - (data_stream_2, stream_listener_2, false), + let (data_stream_3, stream_listener_3) = create_transactions_or_output_stream( + data_client_config, + streaming_service_config, + MIN_ADVERTISED_TRANSACTION_OUTPUT, + MAX_ADVERTISED_TRANSACTION_OUTPUT, + ); + for (mut data_stream, mut stream_listener, transactions_only, allow_transactions_or_outputs) in [ + (data_stream_1, stream_listener_1, true, false), + (data_stream_2, stream_listener_2, false, false), + (data_stream_3, stream_listener_3, false, true), ] { // Initialize the data stream let global_data_summary = create_global_data_summary(1); @@ -730,6 +788,7 @@ async fn test_transaction_and_output_stream_timeout() { &mut data_stream, &mut stream_listener, transactions_only, + allow_transactions_or_outputs, false, &global_data_summary, ) @@ -762,6 +821,7 @@ async fn test_transaction_and_output_stream_timeout() { &mut data_stream, &mut stream_listener, transactions_only, + allow_transactions_or_outputs, false, &global_data_summary, ) @@ -892,6 +952,25 @@ fn create_continuous_transaction_stream( create_data_stream(data_client_config, streaming_service_config, stream_request) } +/// Creates a continuous transaction or output stream for the given `version`. +fn create_continuous_transaction_or_output_stream( + data_client_config: AptosDataClientConfig, + streaming_service_config: DataStreamingServiceConfig, + known_version: Version, + known_epoch: Version, +) -> (DataStream, DataStreamListener) { + // Create a continuous transaction stream request + let stream_request = StreamRequest::ContinuouslyStreamTransactionsOrOutputs( + ContinuouslyStreamTransactionsOrOutputsRequest { + known_version, + known_epoch, + include_events: false, + target: None, + }, + ); + create_data_stream(data_client_config, streaming_service_config, stream_request) +} + /// Creates a transaction stream for the given `version`. fn create_transaction_stream( data_client_config: AptosDataClientConfig, @@ -925,6 +1004,24 @@ fn create_output_stream( create_data_stream(data_client_config, streaming_service_config, stream_request) } +/// Creates an output stream for the given `version`. +fn create_transactions_or_output_stream( + data_client_config: AptosDataClientConfig, + streaming_service_config: DataStreamingServiceConfig, + start_version: Version, + end_version: Version, +) -> (DataStream, DataStreamListener) { + // Create a transaction or output stream request + let stream_request = + StreamRequest::GetAllTransactionsOrOutputs(GetAllTransactionsOrOutputsRequest { + start_version, + end_version, + proof_version: end_version, + include_events: false, + }); + create_data_stream(data_client_config, streaming_service_config, stream_request) +} + fn create_data_stream( data_client_config: AptosDataClientConfig, streaming_service_config: DataStreamingServiceConfig, @@ -1206,7 +1303,8 @@ fn get_pending_client_request( async fn wait_for_notification_and_verify( data_stream: &mut DataStream, stream_listener: &mut DataStreamListener, - transactions_only: bool, + transaction_syncing: bool, + allow_transactions_or_outputs: bool, subscription_notification: bool, global_data_summary: &GlobalDataSummary, ) { @@ -1216,42 +1314,37 @@ async fn wait_for_notification_and_verify( { if subscription_notification { // Verify we got the correct subscription data - if transactions_only { - if !matches!( - data_notification.data_payload, - DataPayload::ContinuousTransactionsWithProof(..) - ) { + match data_notification.data_payload { + DataPayload::ContinuousTransactionsWithProof(..) => { + assert!(allow_transactions_or_outputs || transaction_syncing); + } + DataPayload::ContinuousTransactionOutputsWithProof(..) => { + assert!(allow_transactions_or_outputs || !transaction_syncing); + } + _ => { panic!( "Invalid data notification found: {:?}", data_notification.data_payload ); } - } else if !matches!( - data_notification.data_payload, - DataPayload::ContinuousTransactionOutputsWithProof(..) - ) { - panic!("Invalid data notification found: {:?}", data_notification); } } else { // Verify we got the correct transaction data - if transactions_only { - if !matches!( - data_notification.data_payload, - DataPayload::TransactionsWithProof(..) - ) { + match data_notification.data_payload { + DataPayload::TransactionsWithProof(..) => { + assert!(allow_transactions_or_outputs || transaction_syncing); + } + DataPayload::TransactionOutputsWithProof(..) => { + assert!(allow_transactions_or_outputs || !transaction_syncing); + } + _ => { panic!( "Invalid data notification found: {:?}", data_notification.data_payload ); } - } else if !matches!( - data_notification.data_payload, - DataPayload::TransactionOutputsWithProof(..) - ) { - panic!("Invalid data notification found: {:?}", data_notification); } } - break; } else { process_data_responses(data_stream, global_data_summary).await; diff --git a/state-sync/state-sync-v2/data-streaming-service/src/tests/utils.rs b/state-sync/state-sync-v2/data-streaming-service/src/tests/utils.rs index 9c7d110d87543..3cd3a17d9ae18 100644 --- a/state-sync/state-sync-v2/data-streaming-service/src/tests/utils.rs +++ b/state-sync/state-sync-v2/data-streaming-service/src/tests/utils.rs @@ -12,8 +12,9 @@ use aptos_infallible::Mutex; use aptos_logger::Level; use aptos_storage_service_types::requests::{ DataRequest, EpochEndingLedgerInfoRequest, NewTransactionOutputsWithProofRequest, - NewTransactionsWithProofRequest, StateValuesWithProofRequest, - TransactionOutputsWithProofRequest, TransactionsWithProofRequest, + NewTransactionsOrOutputsWithProofRequest, NewTransactionsWithProofRequest, + StateValuesWithProofRequest, TransactionOutputsWithProofRequest, + TransactionsOrOutputsWithProofRequest, TransactionsWithProofRequest, }; use aptos_storage_service_types::responses::{CompleteDataRange, TransactionOrOutputListWithProof}; use aptos_storage_service_types::Epoch; @@ -437,14 +438,55 @@ impl AptosDataClient for MockAptosDataClient { async fn get_new_transactions_or_outputs_with_proof( &self, - _known_version: Version, - _known_epoch: Epoch, - _include_events: bool, - _request_timeout_ms: u64, + known_version: Version, + known_epoch: Epoch, + include_events: bool, + request_timeout_ms: u64, ) -> aptos_data_client::Result< Response<(TransactionOrOutputListWithProof, LedgerInfoWithSignatures)>, > { - todo!() // Implement when we have a client side implementation + self.verify_request_timeout( + request_timeout_ms, + true, + DataRequest::GetNewTransactionsOrOutputsWithProof( + NewTransactionsOrOutputsWithProofRequest { + known_version, + known_epoch, + include_events, + max_num_output_reductions: 3, + }, + ), + ); + self.emulate_network_latencies(); + + // Create a mock data client without timeout verification (to handle the internal requests) + let mut aptos_data_client = self.clone(); + aptos_data_client.skip_timeout_verification = true; + + // Get the new transactions or outputs response + let response = if return_transactions_instead_of_outputs() { + let (transactions, ledger_info) = aptos_data_client + .get_new_transactions_with_proof( + known_version, + known_epoch, + include_events, + request_timeout_ms, + ) + .await? + .payload; + ((Some(transactions), None), ledger_info) + } else { + let (outputs, ledger_info) = aptos_data_client + .get_new_transaction_outputs_with_proof( + known_version, + known_epoch, + request_timeout_ms, + ) + .await? + .payload; + ((None, Some(outputs)), ledger_info) + }; + Ok(create_data_client_response(response)) } async fn get_number_of_states( @@ -483,16 +525,8 @@ impl AptosDataClient for MockAptosDataClient { // Calculate the last version based on if we should limit the chunk size let end_version = self.calculate_last_index(start_version, end_version); - // Create the requested transactions and transaction outputs - let mut transactions_and_outputs = vec![]; - for _ in start_version..=end_version { - transactions_and_outputs.push((create_transaction(), create_transaction_output())); - } + let output_list_with_proof = create_output_list_with_proof(start_version, end_version); - // Create a transaction output list with an empty proof - let mut output_list_with_proof = TransactionOutputListWithProof::new_empty(); - output_list_with_proof.first_transaction_output_version = Some(start_version); - output_list_with_proof.transactions_and_outputs = transactions_and_outputs; Ok(create_data_client_response(output_list_with_proof)) } @@ -528,13 +562,53 @@ impl AptosDataClient for MockAptosDataClient { async fn get_transactions_or_outputs_with_proof( &self, - _proof_version: Version, - _start_version: Version, - _end_version: Version, - _include_events: bool, - _request_timeout_ms: u64, + proof_version: Version, + start_version: Version, + end_version: Version, + include_events: bool, + request_timeout_ms: u64, ) -> aptos_data_client::Result> { - todo!() // Implement when we have a client side implementation + self.verify_request_timeout( + request_timeout_ms, + false, + DataRequest::GetTransactionsOrOutputsWithProof(TransactionsOrOutputsWithProofRequest { + proof_version, + start_version, + end_version, + include_events, + max_num_output_reductions: 3, + }), + ); + self.emulate_network_latencies(); + + // Create a mock data client without timeout verification (to handle the internal requests) + let mut aptos_data_client = self.clone(); + aptos_data_client.skip_timeout_verification = true; + + // Get the transactions or outputs response + let transactions_or_outputs = if return_transactions_instead_of_outputs() { + let transactions_with_proof = aptos_data_client + .get_transactions_with_proof( + proof_version, + start_version, + end_version, + include_events, + request_timeout_ms, + ) + .await?; + (Some(transactions_with_proof.payload), None) + } else { + let outputs_with_proof = aptos_data_client + .get_transaction_outputs_with_proof( + proof_version, + start_version, + end_version, + request_timeout_ms, + ) + .await?; + (None, Some(outputs_with_proof.payload)) + }; + Ok(create_data_client_response(transactions_or_outputs)) } } @@ -772,3 +846,11 @@ pub fn create_output_list_with_proof( transaction_list_with_proof.proof, ) } + +/// Returns true iff the server should return transactions +/// instead of outputs. +/// +/// Note: there's a 50-50 chance of this occurring. +fn return_transactions_instead_of_outputs() -> bool { + (create_random_u64(u64::MAX) % 2) == 0 +} diff --git a/state-sync/state-sync-v2/state-sync-driver/Cargo.toml b/state-sync/state-sync-v2/state-sync-driver/Cargo.toml index 54781662f5950..a4ca2ae51faef 100644 --- a/state-sync/state-sync-v2/state-sync-driver/Cargo.toml +++ b/state-sync/state-sync-v2/state-sync-driver/Cargo.toml @@ -28,6 +28,7 @@ aptos-metrics-core = { workspace = true } aptos-schemadb = { workspace = true } aptos-scratchpad = { workspace = true } aptos-storage-interface = { workspace = true } +aptos-time-service = { workspace = true } aptos-types = { workspace = true } async-trait = { workspace = true } bcs = { workspace = true } diff --git a/state-sync/state-sync-v2/state-sync-driver/src/bootstrapper.rs b/state-sync/state-sync-v2/state-sync-driver/src/bootstrapper.rs index 4853af9d29afc..c28281139ee7d 100644 --- a/state-sync/state-sync-v2/state-sync-driver/src/bootstrapper.rs +++ b/state-sync/state-sync-v2/state-sync-driver/src/bootstrapper.rs @@ -1,11 +1,14 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 +use crate::metrics::ExecutingComponent; +use crate::utils::OutputFallbackHandler; use crate::{ driver::DriverConfiguration, error::Error, logging::{LogEntry, LogSchema}, metadata_storage::MetadataStorageInterface, + metrics, storage_synchronizer::StorageSynchronizerInterface, utils, utils::{SpeculativeStreamState, PENDING_DATA_LOG_FREQ_SECS}, @@ -290,6 +293,9 @@ pub struct Bootstrapper { // The storage to write metadata about the syncing progress metadata_storage: MetadataStorage, + // The handler for output fallback behaviour + output_fallback_handler: OutputFallbackHandler, + // The speculative state tracking the active data stream speculative_stream_state: Option, @@ -318,6 +324,7 @@ impl< pub fn new( driver_configuration: DriverConfiguration, metadata_storage: MetadataStorage, + output_fallback_handler: OutputFallbackHandler, streaming_client: StreamingClient, storage: Arc, storage_synchronizer: StorageSyncer, @@ -334,6 +341,7 @@ impl< bootstrapped: false, driver_configuration, metadata_storage, + output_fallback_handler, speculative_stream_state: None, streaming_client, storage, @@ -342,6 +350,11 @@ impl< } } + /// Returns the bootstrapping mode of the node + fn get_bootstrapping_mode(&self) -> BootstrappingMode { + self.driver_configuration.config.bootstrapping_mode + } + /// Returns true iff the node has already completed bootstrapping pub fn is_bootstrapped(&self) -> bool { self.bootstrapped @@ -455,11 +468,10 @@ impl< info!(LogSchema::new(LogEntry::Bootstrapper).message(&format!( "Highest synced version is {}, highest_known_ledger_info is {:?}, bootstrapping_mode is {:?}.", - highest_synced_version, highest_known_ledger_info, - self.driver_configuration.config.bootstrapping_mode))); + highest_synced_version, highest_known_ledger_info, self.get_bootstrapping_mode()))); // Bootstrap according to the mode - match self.driver_configuration.config.bootstrapping_mode { + match self.get_bootstrapping_mode() { BootstrappingMode::DownloadLatestStates => { self.fetch_missing_state_snapshot_data( highest_synced_version, @@ -685,7 +697,7 @@ impl< .verified_epoch_states .next_epoch_ending_version(highest_synced_version) .expect("No higher epoch ending version known!"); - let data_stream = match self.driver_configuration.config.bootstrapping_mode { + let data_stream = match self.get_bootstrapping_mode() { BootstrappingMode::ApplyTransactionOutputsFromGenesis => { self.streaming_client .get_all_transaction_outputs( @@ -705,6 +717,36 @@ impl< ) .await? } + BootstrappingMode::ExecuteOrApplyFromGenesis => { + if self.output_fallback_handler.in_fallback_mode() { + metrics::set_gauge( + &metrics::DRIVER_FALLBACK_MODE, + ExecutingComponent::Bootstrapper.get_label(), + 1, + ); + self.streaming_client + .get_all_transaction_outputs( + next_version, + end_version, + highest_known_ledger_version, + ) + .await? + } else { + metrics::set_gauge( + &metrics::DRIVER_FALLBACK_MODE, + ExecutingComponent::Bootstrapper.get_label(), + 0, + ); + self.streaming_client + .get_all_transactions_or_outputs( + next_version, + end_version, + highest_known_ledger_version, + false, + ) + .await? + } + } bootstrapping_mode => { unreachable!("Bootstrapping mode not supported: {:?}", bootstrapping_mode) } @@ -877,7 +919,7 @@ impl< state_value_chunk_with_proof: StateValueChunkWithProof, ) -> Result<(), Error> { // Verify that we're expecting state value payloads - let bootstrapping_mode = self.driver_configuration.config.bootstrapping_mode; + let bootstrapping_mode = self.get_bootstrapping_mode(); if self.should_fetch_epoch_ending_ledger_infos() || !matches!(bootstrapping_mode, BootstrappingMode::DownloadLatestStates) { @@ -1030,7 +1072,7 @@ impl< payload_start_version: Option, ) -> Result<(), Error> { // Verify that we're expecting transaction or output payloads - let bootstrapping_mode = self.driver_configuration.config.bootstrapping_mode; + let bootstrapping_mode = self.get_bootstrapping_mode(); if self.should_fetch_epoch_ending_ledger_infos() || (matches!(bootstrapping_mode, BootstrappingMode::DownloadLatestStates) && self.state_value_syncer.transaction_output_to_sync.is_some()) @@ -1085,18 +1127,14 @@ impl< let num_transactions_or_outputs = match bootstrapping_mode { BootstrappingMode::ApplyTransactionOutputsFromGenesis => { if let Some(transaction_outputs_with_proof) = transaction_outputs_with_proof { - let num_transaction_outputs = transaction_outputs_with_proof - .transactions_and_outputs - .len(); - self.storage_synchronizer - .apply_transaction_outputs( - notification_id, - transaction_outputs_with_proof, - proof_ledger_info, - end_of_epoch_ledger_info, - ) - .await?; - num_transaction_outputs + utils::apply_transaction_outputs( + self.storage_synchronizer.clone(), + notification_id, + proof_ledger_info, + end_of_epoch_ledger_info, + transaction_outputs_with_proof, + ) + .await? } else { self.reset_active_stream(Some(NotificationAndFeedback::new( notification_id, @@ -1110,16 +1148,14 @@ impl< } BootstrappingMode::ExecuteTransactionsFromGenesis => { if let Some(transaction_list_with_proof) = transaction_list_with_proof { - let num_transactions = transaction_list_with_proof.transactions.len(); - self.storage_synchronizer - .execute_transactions( - notification_id, - transaction_list_with_proof, - proof_ledger_info, - end_of_epoch_ledger_info, - ) - .await?; - num_transactions + utils::execute_transactions( + self.storage_synchronizer.clone(), + notification_id, + proof_ledger_info, + end_of_epoch_ledger_info, + transaction_list_with_proof, + ) + .await? } else { self.reset_active_stream(Some(NotificationAndFeedback::new( notification_id, @@ -1131,6 +1167,37 @@ impl< )); } } + BootstrappingMode::ExecuteOrApplyFromGenesis => { + if let Some(transaction_list_with_proof) = transaction_list_with_proof { + utils::execute_transactions( + self.storage_synchronizer.clone(), + notification_id, + proof_ledger_info, + end_of_epoch_ledger_info, + transaction_list_with_proof, + ) + .await? + } else if let Some(transaction_outputs_with_proof) = transaction_outputs_with_proof + { + utils::apply_transaction_outputs( + self.storage_synchronizer.clone(), + notification_id, + proof_ledger_info, + end_of_epoch_ledger_info, + transaction_outputs_with_proof, + ) + .await? + } else { + self.reset_active_stream(Some(NotificationAndFeedback::new( + notification_id, + NotificationFeedback::PayloadTypeIsIncorrect, + ))) + .await?; + return Err(Error::InvalidPayload( + "Did not receive transactions or outputs with proof!".into(), + )); + } + } bootstrapping_mode => { unreachable!("Bootstrapping mode not supported: {:?}", bootstrapping_mode) } @@ -1260,7 +1327,7 @@ impl< transaction_outputs_with_proof: Option<&TransactionOutputListWithProof>, ) -> Result, Error> { // Calculate the payload end version - let num_versions = match self.driver_configuration.config.bootstrapping_mode { + let num_versions = match self.get_bootstrapping_mode() { BootstrappingMode::ApplyTransactionOutputsFromGenesis => { if let Some(transaction_outputs_with_proof) = transaction_outputs_with_proof { transaction_outputs_with_proof @@ -1291,6 +1358,22 @@ impl< )); } } + BootstrappingMode::ExecuteOrApplyFromGenesis => { + if let Some(transaction_list_with_proof) = transaction_list_with_proof { + transaction_list_with_proof.transactions.len() + } else if let Some(output_list_with_proof) = transaction_outputs_with_proof { + output_list_with_proof.transactions_and_outputs.len() + } else { + self.reset_active_stream(Some(NotificationAndFeedback::new( + notification_id, + NotificationFeedback::PayloadTypeIsIncorrect, + ))) + .await?; + return Err(Error::InvalidPayload( + "Did not receive transactions or outputs with proof!".into(), + )); + } + } bootstrapping_mode => { unimplemented!("Bootstrapping mode not supported: {:?}", bootstrapping_mode) } @@ -1360,6 +1443,28 @@ impl< .expect("Speculative stream state does not exist!") } + /// Handles the storage synchronizer error sent by the driver + pub async fn handle_storage_synchronizer_error( + &mut self, + notification_and_feedback: NotificationAndFeedback, + ) -> Result<(), Error> { + // Reset the active stream + self.reset_active_stream(Some(notification_and_feedback)) + .await?; + + // Fallback to output syncing if we need to + if let BootstrappingMode::ExecuteOrApplyFromGenesis = self.get_bootstrapping_mode() { + self.output_fallback_handler.fallback_to_outputs(); + metrics::set_gauge( + &metrics::DRIVER_FALLBACK_MODE, + ExecutingComponent::Bootstrapper.get_label(), + 1, + ); + } + + Ok(()) + } + /// Resets the currently active data stream and speculative state pub async fn reset_active_stream( &mut self, diff --git a/state-sync/state-sync-v2/state-sync-driver/src/continuous_syncer.rs b/state-sync/state-sync-v2/state-sync-driver/src/continuous_syncer.rs index 31a987ff6ae26..e1a19c56ba940 100644 --- a/state-sync/state-sync-v2/state-sync-driver/src/continuous_syncer.rs +++ b/state-sync/state-sync-v2/state-sync-driver/src/continuous_syncer.rs @@ -1,9 +1,12 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 +use crate::metrics::ExecutingComponent; +use crate::utils::OutputFallbackHandler; use crate::{ driver::DriverConfiguration, error::Error, + metrics, notification_handlers::ConsensusSyncRequest, storage_synchronizer::StorageSynchronizerInterface, utils, @@ -33,6 +36,9 @@ pub struct ContinuousSyncer { // The config of the state sync driver driver_configuration: DriverConfiguration, + // The handler for output fallback behaviour + output_fallback_handler: OutputFallbackHandler, + // The speculative state tracking the active data stream speculative_stream_state: Option, @@ -54,12 +60,14 @@ impl< pub fn new( driver_configuration: DriverConfiguration, streaming_client: StreamingClient, + output_fallback_handler: OutputFallbackHandler, storage: Arc, storage_synchronizer: StorageSyncer, ) -> Self { Self { active_data_stream: None, driver_configuration, + output_fallback_handler, speculative_stream_state: None, streaming_client, storage, @@ -112,7 +120,7 @@ impl< .map(|sync_request| sync_request.get_sync_target()); // Initialize a new active data stream - let active_data_stream = match self.driver_configuration.config.continuous_syncing_mode { + let active_data_stream = match self.get_continuous_syncing_mode() { ContinuousSyncingMode::ApplyTransactionOutputs => { self.streaming_client .continuously_stream_transaction_outputs( @@ -132,6 +140,36 @@ impl< ) .await? } + ContinuousSyncingMode::ExecuteTransactionsOrApplyOutputs => { + if self.output_fallback_handler.in_fallback_mode() { + metrics::set_gauge( + &metrics::DRIVER_FALLBACK_MODE, + ExecutingComponent::ContinuousSyncer.get_label(), + 1, + ); + self.streaming_client + .continuously_stream_transaction_outputs( + highest_synced_version, + highest_synced_epoch, + sync_request_target, + ) + .await? + } else { + metrics::set_gauge( + &metrics::DRIVER_FALLBACK_MODE, + ExecutingComponent::ContinuousSyncer.get_label(), + 0, + ); + self.streaming_client + .continuously_stream_transactions_or_outputs( + highest_synced_version, + highest_synced_epoch, + false, + sync_request_target, + ) + .await? + } + } }; self.speculative_stream_state = Some(SpeculativeStreamState::new( highest_epoch_state, @@ -216,6 +254,11 @@ impl< Ok(()) } + /// Returns the continuous syncing mode of the node + fn get_continuous_syncing_mode(&self) -> ContinuousSyncingMode { + self.driver_configuration.config.continuous_syncing_mode + } + /// Returns the highest synced version and epoch in storage fn get_highest_synced_version_and_epoch(&self) -> Result<(Version, Epoch), Error> { let highest_synced_version = utils::fetch_latest_synced_version(self.storage.clone())?; @@ -248,57 +291,81 @@ impl< .await?; // Execute/apply and commit the transactions/outputs - let num_transactions_or_outputs = - match self.driver_configuration.config.continuous_syncing_mode { - ContinuousSyncingMode::ApplyTransactionOutputs => { - if let Some(transaction_outputs_with_proof) = transaction_outputs_with_proof { - let num_transaction_outputs = transaction_outputs_with_proof - .transactions_and_outputs - .len(); - self.storage_synchronizer - .apply_transaction_outputs( - notification_id, - transaction_outputs_with_proof, - ledger_info_with_signatures.clone(), - None, - ) - .await?; - num_transaction_outputs - } else { - self.reset_active_stream(Some(NotificationAndFeedback::new( - notification_id, - NotificationFeedback::PayloadTypeIsIncorrect, - ))) - .await?; - return Err(Error::InvalidPayload( - "Did not receive transaction outputs with proof!".into(), - )); - } + let num_transactions_or_outputs = match self.get_continuous_syncing_mode() { + ContinuousSyncingMode::ApplyTransactionOutputs => { + if let Some(transaction_outputs_with_proof) = transaction_outputs_with_proof { + utils::apply_transaction_outputs( + self.storage_synchronizer.clone(), + notification_id, + ledger_info_with_signatures.clone(), + None, + transaction_outputs_with_proof, + ) + .await? + } else { + self.reset_active_stream(Some(NotificationAndFeedback::new( + notification_id, + NotificationFeedback::PayloadTypeIsIncorrect, + ))) + .await?; + return Err(Error::InvalidPayload( + "Did not receive transaction outputs with proof!".into(), + )); } - ContinuousSyncingMode::ExecuteTransactions => { - if let Some(transaction_list_with_proof) = transaction_list_with_proof { - let num_transactions = transaction_list_with_proof.transactions.len(); - self.storage_synchronizer - .execute_transactions( - notification_id, - transaction_list_with_proof, - ledger_info_with_signatures.clone(), - None, - ) - .await?; - num_transactions - } else { - self.reset_active_stream(Some(NotificationAndFeedback::new( - notification_id, - NotificationFeedback::PayloadTypeIsIncorrect, - ))) - .await?; - return Err(Error::InvalidPayload( - "Did not receive transactions with proof!".into(), - )); - } + } + ContinuousSyncingMode::ExecuteTransactions => { + if let Some(transaction_list_with_proof) = transaction_list_with_proof { + utils::execute_transactions( + self.storage_synchronizer.clone(), + notification_id, + ledger_info_with_signatures.clone(), + None, + transaction_list_with_proof, + ) + .await? + } else { + self.reset_active_stream(Some(NotificationAndFeedback::new( + notification_id, + NotificationFeedback::PayloadTypeIsIncorrect, + ))) + .await?; + return Err(Error::InvalidPayload( + "Did not receive transactions with proof!".into(), + )); + } + } + ContinuousSyncingMode::ExecuteTransactionsOrApplyOutputs => { + if let Some(transaction_list_with_proof) = transaction_list_with_proof { + utils::execute_transactions( + self.storage_synchronizer.clone(), + notification_id, + ledger_info_with_signatures.clone(), + None, + transaction_list_with_proof, + ) + .await? + } else if let Some(transaction_outputs_with_proof) = transaction_outputs_with_proof + { + utils::apply_transaction_outputs( + self.storage_synchronizer.clone(), + notification_id, + ledger_info_with_signatures.clone(), + None, + transaction_outputs_with_proof, + ) + .await? + } else { + self.reset_active_stream(Some(NotificationAndFeedback::new( + notification_id, + NotificationFeedback::PayloadTypeIsIncorrect, + ))) + .await?; + return Err(Error::InvalidPayload( + "No transactions or output with proof was provided!".into(), + )); } - }; + } + }; let synced_version = payload_start_version .checked_add(num_transactions_or_outputs as u64) .and_then(|version| version.checked_sub(1)) // synced_version = start + num txns/outputs - 1 @@ -423,6 +490,25 @@ impl< .expect("Speculative stream state does not exist!") } + /// Handles the storage synchronizer error sent by the driver + pub async fn handle_storage_synchronizer_error( + &mut self, + notification_and_feedback: NotificationAndFeedback, + ) -> Result<(), Error> { + // Reset the active stream + self.reset_active_stream(Some(notification_and_feedback)) + .await?; + + // Fallback to output syncing if we need to + if let ContinuousSyncingMode::ExecuteTransactionsOrApplyOutputs = + self.get_continuous_syncing_mode() + { + self.output_fallback_handler.fallback_to_outputs(); + } + + Ok(()) + } + /// Resets the currently active data stream and speculative state pub async fn reset_active_stream( &mut self, diff --git a/state-sync/state-sync-v2/state-sync-driver/src/driver.rs b/state-sync/state-sync-v2/state-sync-driver/src/driver.rs index 8e8702fbef67f..4b4dc4a90c8ee 100644 --- a/state-sync/state-sync-v2/state-sync-driver/src/driver.rs +++ b/state-sync/state-sync-v2/state-sync-driver/src/driver.rs @@ -1,6 +1,7 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 +use crate::utils::OutputFallbackHandler; use crate::{ bootstrapper::Bootstrapper, continuous_syncer::ContinuousSyncer, @@ -20,9 +21,9 @@ use crate::{ utils::PENDING_DATA_LOG_FREQ_SECS, }; use aptos_config::config::{RoleType, StateSyncDriverConfig}; -use aptos_consensus_notifications::{ - ConsensusCommitNotification, ConsensusNotification, ConsensusSyncNotification, -}; +use aptos_consensus_notifications::ConsensusCommitNotification; +use aptos_consensus_notifications::ConsensusNotification; +use aptos_consensus_notifications::ConsensusSyncNotification; use aptos_data_client::AptosDataClient; use aptos_data_streaming_service::streaming_client::{ DataStreamingClient, NotificationAndFeedback, NotificationFeedback, @@ -32,9 +33,12 @@ use aptos_infallible::Mutex; use aptos_logger::prelude::*; use aptos_mempool_notifications::MempoolNotificationSender; use aptos_storage_interface::DbReader; +use aptos_time_service::TimeService; +use aptos_time_service::TimeServiceTrait; use aptos_types::waypoint::Waypoint; use futures::StreamExt; -use std::{sync::Arc, time::SystemTime}; +use std::sync::Arc; +use std::time::Instant; use tokio::task::yield_now; use tokio::time::{interval, Duration}; use tokio_stream::wrappers::IntervalStream; @@ -104,13 +108,16 @@ pub struct StateSyncDriver< mempool_notification_handler: MempoolNotificationHandler, // The timestamp at which the driver started executing - start_time: Option, + start_time: Option, // The interface to read from storage storage: Arc, // The storage synchronizer used to update local storage storage_synchronizer: StorageSyncer, + + // The time service + time_service: TimeService, } impl< @@ -135,10 +142,14 @@ impl< aptos_data_client: DataClient, streaming_client: StreamingClient, storage: Arc, + time_service: TimeService, ) -> Self { + let output_fallback_handler = + OutputFallbackHandler::new(driver_configuration.clone(), time_service.clone()); let bootstrapper = Bootstrapper::new( driver_configuration.clone(), metadata_storage, + output_fallback_handler.clone(), streaming_client.clone(), storage.clone(), storage_synchronizer.clone(), @@ -146,6 +157,7 @@ impl< let continuous_syncer = ContinuousSyncer::new( driver_configuration.clone(), streaming_client, + output_fallback_handler, storage.clone(), storage_synchronizer.clone(), ); @@ -164,6 +176,7 @@ impl< start_time: None, storage, storage_synchronizer, + time_service, } } @@ -176,7 +189,7 @@ impl< // Start the driver info!(LogSchema::new(LogEntry::Driver).message("Started the state sync v2 driver!")); - self.start_time = Some(SystemTime::now()); + self.start_time = Some(self.time_service.now()); loop { ::futures::select! { notification = self.client_notification_listener.select_next_some() => { @@ -415,10 +428,10 @@ impl< if self.bootstrapper.is_bootstrapped() { if let Err(error) = self .continuous_syncer - .reset_active_stream(Some(NotificationAndFeedback::new( + .handle_storage_synchronizer_error(NotificationAndFeedback::new( notification_id, notification_feedback, - ))) + )) .await { panic!( @@ -428,10 +441,10 @@ impl< } } else if let Err(error) = self .bootstrapper - .reset_active_stream(Some(NotificationAndFeedback::new( + .handle_storage_synchronizer_error(NotificationAndFeedback::new( notification_id, notification_feedback, - ))) + )) .await { panic!( @@ -518,10 +531,7 @@ impl< .config .max_connection_deadline_secs, )) { - if SystemTime::now() - .duration_since(connection_deadline) - .is_ok() - { + if self.time_service.now() >= connection_deadline { info!(LogSchema::new(LogEntry::AutoBootstrapping).message( "Passed the connection deadline! Auto-bootstrapping the validator!" )); diff --git a/state-sync/state-sync-v2/state-sync-driver/src/driver_factory.rs b/state-sync/state-sync-v2/state-sync-driver/src/driver_factory.rs index 2aa6f855c75b7..45cc600dcd33f 100644 --- a/state-sync/state-sync-v2/state-sync-driver/src/driver_factory.rs +++ b/state-sync/state-sync-v2/state-sync-driver/src/driver_factory.rs @@ -20,6 +20,7 @@ use aptos_executor_types::ChunkExecutorTrait; use aptos_infallible::Mutex; use aptos_mempool_notifications::MempoolNotificationSender; use aptos_storage_interface::DbReaderWriter; +use aptos_time_service::TimeService; use aptos_types::{move_resource::MoveStorage, waypoint::Waypoint}; use futures::{channel::mpsc, executor::block_on}; use std::sync::{ @@ -52,6 +53,7 @@ impl DriverFactory { mut event_subscription_service: EventSubscriptionService, aptos_data_client: AptosNetDataClient, streaming_service_client: StreamingServiceClient, + time_service: TimeService, ) -> Self { // Notify subscribers of the initial on-chain config values match (&*storage.reader).fetch_latest_state_checkpoint_version() { @@ -133,6 +135,7 @@ impl DriverFactory { aptos_data_client, streaming_service_client, storage.reader, + time_service, ); // Spawn the driver diff --git a/state-sync/state-sync-v2/state-sync-driver/src/metrics.rs b/state-sync/state-sync-v2/state-sync-driver/src/metrics.rs index 2fb7cc6705af9..498278bfc1467 100644 --- a/state-sync/state-sync-v2/state-sync-driver/src/metrics.rs +++ b/state-sync/state-sync-v2/state-sync-driver/src/metrics.rs @@ -72,7 +72,7 @@ pub static BOOTSTRAPPER_ERRORS: Lazy = Lazy::new(|| { .unwrap() }); -/// Counter for state sync continuous syncer errors +/// Gauge for state sync continuous syncer fallback mode pub static CONTINUOUS_SYNCER_ERRORS: Lazy = Lazy::new(|| { register_int_counter_vec!( "aptos_state_sync_continuous_syncer_errors", @@ -92,6 +92,16 @@ pub static DRIVER_COUNTERS: Lazy = Lazy::new(|| { .unwrap() }); +/// Gauge for state sync bootstrapper fallback mode +pub static DRIVER_FALLBACK_MODE: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "aptos_state_sync_driver_fallback_mode", + "Gauges related to the driver fallback mode", + &["label"] + ) + .unwrap() +}); + /// Counters related to the currently executing component pub static EXECUTING_COMPONENT: Lazy = Lazy::new(|| { register_int_counter_vec!( diff --git a/state-sync/state-sync-v2/state-sync-driver/src/tests/bootstrapper.rs b/state-sync/state-sync-v2/state-sync-driver/src/tests/bootstrapper.rs index b0f7c2d6fb28c..de84d1ff520d8 100644 --- a/state-sync/state-sync-v2/state-sync-driver/src/tests/bootstrapper.rs +++ b/state-sync/state-sync-v2/state-sync-driver/src/tests/bootstrapper.rs @@ -1,6 +1,7 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 +use crate::utils::OutputFallbackHandler; use crate::{ bootstrapper::{Bootstrapper, GENESIS_TRANSACTION_VERSION}, driver::DriverConfiguration, @@ -20,10 +21,12 @@ use crate::{ }; use aptos_config::config::BootstrappingMode; use aptos_data_client::GlobalDataSummary; +use aptos_data_streaming_service::data_notification::NotificationId; use aptos_data_streaming_service::{ data_notification::{DataNotification, DataPayload}, streaming_client::{NotificationAndFeedback, NotificationFeedback}, }; +use aptos_time_service::TimeService; use aptos_types::{ transaction::{TransactionOutputListWithProof, Version}, waypoint::Waypoint, @@ -32,6 +35,7 @@ use claims::{assert_matches, assert_none, assert_ok}; use futures::{channel::oneshot, FutureExt, SinkExt}; use mockall::{predicate::eq, Sequence}; use std::sync::Arc; +use std::time::Duration; #[tokio::test] async fn test_bootstrap_genesis_waypoint() { @@ -42,7 +46,8 @@ async fn test_bootstrap_genesis_waypoint() { let mock_streaming_client = create_mock_streaming_client(); // Create the bootstrapper and verify it's not yet bootstrapped - let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true); + let (mut bootstrapper, _) = + create_bootstrapper(driver_configuration, mock_streaming_client, None, true); assert!(!bootstrapper.is_bootstrapped()); // Subscribe to a bootstrapped notification @@ -78,7 +83,8 @@ async fn test_bootstrap_immediate_notification() { let mock_streaming_client = create_mock_streaming_client(); // Create the bootstrapper - let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true); + let (mut bootstrapper, _) = + create_bootstrapper(driver_configuration, mock_streaming_client, None, true); // Create a global data summary where only epoch 0 has ended let global_data_summary = create_global_summary(0); @@ -112,7 +118,8 @@ async fn test_bootstrap_no_notification() { .return_once(move |_| Ok(data_stream_listener)); // Create the bootstrapper - let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true); + let (mut bootstrapper, _) = + create_bootstrapper(driver_configuration, mock_streaming_client, None, true); // Create a global data summary where epoch 0 and 1 have ended let global_data_summary = create_global_summary(1); @@ -160,7 +167,8 @@ async fn test_critical_timeout() { .return_const(Ok(())); // Create the bootstrapper - let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true); + let (mut bootstrapper, _) = + create_bootstrapper(driver_configuration, mock_streaming_client, None, true); // Create a global data summary where epoch 0 and 1 have ended let global_data_summary = create_global_summary(1); @@ -237,7 +245,8 @@ async fn test_data_stream_state_values() { .return_const(Ok(())); // Create the bootstrapper - let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true); + let (mut bootstrapper, _) = + create_bootstrapper(driver_configuration, mock_streaming_client, None, true); // Insert an epoch ending ledger info into the verified states of the bootstrapper manipulate_verified_epoch_states(&mut bootstrapper, true, true, Some(highest_version)); @@ -308,7 +317,8 @@ async fn test_data_stream_transactions() { .return_const(Ok(())); // Create the bootstrapper - let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true); + let (mut bootstrapper, _) = + create_bootstrapper(driver_configuration, mock_streaming_client, None, true); // Insert an epoch ending ledger info into the verified states of the bootstrapper manipulate_verified_epoch_states(&mut bootstrapper, true, true, Some(highest_version)); @@ -379,7 +389,8 @@ async fn test_data_stream_transaction_outputs() { .return_const(Ok(())); // Create the bootstrapper - let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true); + let (mut bootstrapper, _) = + create_bootstrapper(driver_configuration, mock_streaming_client, None, true); // Insert an epoch ending ledger info into the verified states of the bootstrapper manipulate_verified_epoch_states(&mut bootstrapper, true, true, Some(highest_version)); @@ -414,6 +425,195 @@ async fn test_data_stream_transaction_outputs() { .unwrap(); } +#[tokio::test] +async fn test_data_stream_transactions_or_outputs() { + // Create test data + let notification_id = 0; + let highest_version = 9998765; + let highest_ledger_info = create_random_epoch_ending_ledger_info(highest_version, 1); + + // Create a driver configuration with a genesis waypoint and transaction or output syncing + let mut driver_configuration = create_full_node_driver_configuration(); + driver_configuration.config.bootstrapping_mode = BootstrappingMode::ExecuteOrApplyFromGenesis; + + // Create the mock streaming client + let mut mock_streaming_client = create_mock_streaming_client(); + let mut expectation_sequence = Sequence::new(); + let (mut notification_sender_1, data_stream_listener_1) = create_data_stream_listener(); + let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener(); + let data_stream_id_1 = data_stream_listener_1.data_stream_id; + for data_stream_listener in [data_stream_listener_1, data_stream_listener_2] { + mock_streaming_client + .expect_get_all_transactions_or_outputs() + .times(1) + .with(eq(1), eq(highest_version), eq(highest_version), eq(false)) + .return_once(move |_, _, _, _| Ok(data_stream_listener)) + .in_sequence(&mut expectation_sequence); + } + mock_streaming_client + .expect_terminate_stream_with_feedback() + .with( + eq(data_stream_id_1), + eq(Some(NotificationAndFeedback::new( + notification_id, + NotificationFeedback::InvalidPayloadData, + ))), + ) + .return_const(Ok(())); + + // Create the bootstrapper + let (mut bootstrapper, _) = + create_bootstrapper(driver_configuration, mock_streaming_client, None, true); + + // Insert an epoch ending ledger info into the verified states of the bootstrapper + manipulate_verified_epoch_states(&mut bootstrapper, true, true, Some(highest_version)); + + // Create a global data summary + let mut global_data_summary = create_global_summary(1); + global_data_summary.advertised_data.synced_ledger_infos = vec![highest_ledger_info.clone()]; + + // Drive progress to initialize the transaction output stream + drive_progress(&mut bootstrapper, &global_data_summary, false) + .await + .unwrap(); + + // Send an invalid output along the stream + let data_notification = DataNotification { + notification_id, + data_payload: DataPayload::EpochEndingLedgerInfos(vec![create_epoch_ending_ledger_info()]), + }; + notification_sender_1.send(data_notification).await.unwrap(); + + // Drive progress again and ensure we get an invalid payload error + let error = drive_progress(&mut bootstrapper, &global_data_summary, false) + .await + .unwrap_err(); + assert_matches!(error, Error::InvalidPayload(_)); + + // Drive progress to initialize the transaction output stream + drive_progress(&mut bootstrapper, &global_data_summary, false) + .await + .unwrap(); +} + +#[tokio::test] +async fn test_data_stream_transactions_or_outputs_fallback() { + // Create test data + let notification_id = 1; + let highest_version = 9998765; + let highest_ledger_info = create_random_epoch_ending_ledger_info(highest_version, 1); + + // Create a driver configuration with a genesis waypoint and transaction or output syncing + let mut driver_configuration = create_full_node_driver_configuration(); + driver_configuration.config.bootstrapping_mode = BootstrappingMode::ExecuteOrApplyFromGenesis; + + // Create the mock streaming client + let mut mock_streaming_client = create_mock_streaming_client(); + + // Set expectations for stream creations and terminations + let mut expectation_sequence = Sequence::new(); + let (_notification_sender_1, data_stream_listener_1) = create_data_stream_listener(); + let data_stream_id_1 = data_stream_listener_1.data_stream_id; + let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener(); + let data_stream_id_2 = data_stream_listener_2.data_stream_id; + let (_notification_sender_3, data_stream_listener_3) = create_data_stream_listener(); + mock_streaming_client + .expect_get_all_transactions_or_outputs() + .times(1) + .with(eq(1), eq(highest_version), eq(highest_version), eq(false)) + .return_once(move |_, _, _, _| Ok(data_stream_listener_1)) + .in_sequence(&mut expectation_sequence); + mock_streaming_client + .expect_terminate_stream_with_feedback() + .times(1) + .with( + eq(data_stream_id_1), + eq(Some(NotificationAndFeedback::new( + notification_id, + NotificationFeedback::PayloadProofFailed, + ))), + ) + .return_const(Ok(())) + .in_sequence(&mut expectation_sequence); + mock_streaming_client + .expect_get_all_transaction_outputs() + .times(1) + .with(eq(1), eq(highest_version), eq(highest_version)) + .return_once(move |_, _, _| Ok(data_stream_listener_2)); + mock_streaming_client + .expect_terminate_stream_with_feedback() + .times(1) + .with( + eq(data_stream_id_2), + eq(Some(NotificationAndFeedback::new( + notification_id, + NotificationFeedback::InvalidPayloadData, + ))), + ) + .return_const(Ok(())) + .in_sequence(&mut expectation_sequence); + mock_streaming_client + .expect_get_all_transactions_or_outputs() + .with(eq(1), eq(highest_version), eq(highest_version), eq(false)) + .return_once(move |_, _, _, _| Ok(data_stream_listener_3)); + + // Create the bootstrapper + let time_service = TimeService::mock(); + let (mut bootstrapper, mut output_fallback_handler) = create_bootstrapper( + driver_configuration.clone(), + mock_streaming_client, + Some(time_service.clone()), + true, + ); + assert!(!output_fallback_handler.in_fallback_mode()); + + // Insert an epoch ending ledger info into the verified states of the bootstrapper + manipulate_verified_epoch_states(&mut bootstrapper, true, true, Some(highest_version)); + + // Create a global data summary + let mut global_data_summary = create_global_summary(1); + global_data_summary.advertised_data.synced_ledger_infos = vec![highest_ledger_info.clone()]; + + // Drive progress to initialize the first transaction output stream + drive_progress(&mut bootstrapper, &global_data_summary, false) + .await + .unwrap(); + + // Send a storage synchronizer error to the bootstrapper so that it falls back + // to output syncing and drive progress for the new stream type. + handle_storage_synchronizer_error( + &mut bootstrapper, + notification_id, + NotificationFeedback::PayloadProofFailed, + ) + .await; + drive_progress(&mut bootstrapper, &global_data_summary, false) + .await + .unwrap(); + assert!(output_fallback_handler.in_fallback_mode()); + + // Elapse enough time so that fallback mode is now disabled + time_service + .into_mock() + .advance_async(Duration::from_secs( + driver_configuration.config.fallback_to_output_syncing_secs, + )) + .await; + + // Send another storage synchronizer error to the bootstrapper and drive progress + // so that a regular stream is created. + handle_storage_synchronizer_error( + &mut bootstrapper, + notification_id, + NotificationFeedback::InvalidPayloadData, + ) + .await; + drive_progress(&mut bootstrapper, &global_data_summary, false) + .await + .unwrap(); + assert!(!output_fallback_handler.in_fallback_mode()); +} + #[tokio::test] async fn test_fetch_epoch_ending_ledger_infos() { // Create a driver configuration with a genesis waypoint and a stream timeout of 1 second @@ -429,7 +629,8 @@ async fn test_fetch_epoch_ending_ledger_infos() { .return_once(move |_| Ok(data_stream_listener)); // Create the bootstrapper - let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true); + let (mut bootstrapper, _) = + create_bootstrapper(driver_configuration, mock_streaming_client, None, true); // Set the waypoint as already having been verified (but no fetched ledger infos) manipulate_verified_epoch_states(&mut bootstrapper, false, true, None); @@ -847,7 +1048,8 @@ async fn test_waypoint_mismatch() { .return_const(Ok(())); // Create the bootstrapper - let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true); + let (mut bootstrapper, _) = + create_bootstrapper(driver_configuration, mock_streaming_client, None, true); // Create a global data summary up to the waypoint let mut global_data_summary = create_global_summary(waypoint_epoch); @@ -891,7 +1093,8 @@ async fn test_waypoint_must_be_verified() { .return_once(move |_| Ok(data_stream_listener)); // Create the bootstrapper - let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true); + let (mut bootstrapper, _) = + create_bootstrapper(driver_configuration, mock_streaming_client, None, true); // Set fetched ledger infos to true but the waypoint is still not verified manipulate_verified_epoch_states(&mut bootstrapper, true, false, None); @@ -923,7 +1126,8 @@ async fn test_waypoint_satisfiable() { let mock_streaming_client = create_mock_streaming_client(); // Create the bootstrapper - let mut bootstrapper = create_bootstrapper(driver_configuration, mock_streaming_client, true); + let (mut bootstrapper, _) = + create_bootstrapper(driver_configuration, mock_streaming_client, None, true); // Create an empty global data summary let mut global_data_summary = GlobalDataSummary::empty(); @@ -949,8 +1153,12 @@ async fn test_waypoint_satisfiable() { fn create_bootstrapper( driver_configuration: DriverConfiguration, mock_streaming_client: MockStreamingClient, + time_service: Option, expect_reset_executor: bool, -) -> Bootstrapper { +) -> ( + Bootstrapper, + OutputFallbackHandler, +) { // Initialize the logger for tests aptos_logger::Logger::init_for_testing(); @@ -975,13 +1183,22 @@ fn create_bootstrapper( .expect_get_latest_transaction_info_option() .returning(|| Ok(Some((0, create_transaction_info())))); - Bootstrapper::new( + // Create the output fallback handler + let time_service = time_service.unwrap_or_else(TimeService::mock); + let output_fallback_handler = + OutputFallbackHandler::new(driver_configuration.clone(), time_service); + + // Create the bootstrapper + let bootstrapper = Bootstrapper::new( driver_configuration, metadata_storage, + output_fallback_handler.clone(), mock_streaming_client, Arc::new(mock_database_reader), mock_storage_synchronizer, - ) + ); + + (bootstrapper, output_fallback_handler) } /// Creates a bootstrapper for testing with a mock metadata storage @@ -1010,9 +1227,14 @@ fn create_bootstrapper_with_storage( .expect_get_latest_transaction_info_option() .returning(move || Ok(Some((latest_synced_version, create_transaction_info())))); + // Create the output fallback handler + let output_fallback_handler = + OutputFallbackHandler::new(driver_configuration.clone(), TimeService::mock()); + Bootstrapper::new( driver_configuration, mock_metadata_storage, + output_fallback_handler, mock_streaming_client, Arc::new(mock_database_reader), mock_storage_synchronizer, @@ -1078,3 +1300,22 @@ fn manipulate_verified_epoch_states( fn verify_bootstrap_notification(notification_receiver: oneshot::Receiver>) { assert_ok!(notification_receiver.now_or_never().unwrap().unwrap()); } + +/// Handles the given storage synchronizer error for the bootstrapper +async fn handle_storage_synchronizer_error( + bootstrapper: &mut Bootstrapper< + MockMetadataStorage, + MockStorageSynchronizer, + MockStreamingClient, + >, + notification_id: NotificationId, + notification_feedback: NotificationFeedback, +) { + bootstrapper + .handle_storage_synchronizer_error(NotificationAndFeedback::new( + notification_id, + notification_feedback, + )) + .await + .unwrap(); +} diff --git a/state-sync/state-sync-v2/state-sync-driver/src/tests/continuous_syncer.rs b/state-sync/state-sync-v2/state-sync-driver/src/tests/continuous_syncer.rs index be2a4018490ba..bec26269e4691 100644 --- a/state-sync/state-sync-v2/state-sync-driver/src/tests/continuous_syncer.rs +++ b/state-sync/state-sync-v2/state-sync-driver/src/tests/continuous_syncer.rs @@ -1,6 +1,7 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 +use crate::utils::OutputFallbackHandler; use crate::{ continuous_syncer::ContinuousSyncer, driver::DriverConfiguration, @@ -19,17 +20,20 @@ use crate::{ }; use aptos_config::config::ContinuousSyncingMode; use aptos_consensus_notifications::ConsensusSyncNotification; -use aptos_data_streaming_service::{ - data_notification::{DataNotification, DataPayload}, - streaming_client::{NotificationAndFeedback, NotificationFeedback}, -}; +use aptos_data_streaming_service::data_notification::DataNotification; +use aptos_data_streaming_service::data_notification::DataPayload; +use aptos_data_streaming_service::data_notification::NotificationId; +use aptos_data_streaming_service::streaming_client::NotificationAndFeedback; +use aptos_data_streaming_service::streaming_client::NotificationFeedback; use aptos_infallible::Mutex; use aptos_storage_service_types::Epoch; +use aptos_time_service::TimeService; use aptos_types::transaction::{TransactionOutputListWithProof, Version}; use claims::assert_matches; use futures::SinkExt; use mockall::{predicate::eq, Sequence}; use std::sync::Arc; +use std::time::Duration; #[tokio::test] async fn test_critical_timeout() { @@ -68,9 +72,10 @@ async fn test_critical_timeout() { .return_const(Ok(())); // Create the continuous syncer - let mut continuous_syncer = create_continuous_syncer( + let (mut continuous_syncer, _) = create_continuous_syncer( driver_configuration, mock_streaming_client, + None, true, current_synced_version, current_synced_epoch, @@ -78,10 +83,7 @@ async fn test_critical_timeout() { // Drive progress to initialize the transaction output stream let no_sync_request = Arc::new(Mutex::new(None)); - continuous_syncer - .drive_progress(no_sync_request.clone()) - .await - .unwrap(); + drive_progress(&mut continuous_syncer, &no_sync_request).await; // Drive progress and verify we get non-critical timeouts for _ in 0..3 { @@ -100,10 +102,7 @@ async fn test_critical_timeout() { assert_matches!(error, Error::CriticalDataStreamTimeout(_)); // Drive progress to initialize the transaction output stream again - continuous_syncer - .drive_progress(no_sync_request.clone()) - .await - .unwrap(); + drive_progress(&mut continuous_syncer, &no_sync_request).await; // Drive progress again and verify we get a non-critical timeout let error = continuous_syncer @@ -157,9 +156,10 @@ async fn test_data_stream_transactions_with_target() { .return_const(Ok(())); // Create the continuous syncer - let mut continuous_syncer = create_continuous_syncer( + let (mut continuous_syncer, _) = create_continuous_syncer( driver_configuration, mock_streaming_client, + None, true, current_synced_version, current_synced_epoch, @@ -170,10 +170,7 @@ async fn test_data_stream_transactions_with_target() { let sync_request = Arc::new(Mutex::new(Some(ConsensusSyncRequest::new( consensus_sync_notification, )))); - continuous_syncer - .drive_progress(sync_request.clone()) - .await - .unwrap(); + drive_progress(&mut continuous_syncer, &sync_request).await; // Send an invalid output along the stream let data_notification = DataNotification { @@ -193,10 +190,7 @@ async fn test_data_stream_transactions_with_target() { assert_matches!(error, Error::VerificationError(_)); // Drive progress to initialize the transaction output stream - continuous_syncer - .drive_progress(sync_request.clone()) - .await - .unwrap(); + drive_progress(&mut continuous_syncer, &sync_request).await; } #[tokio::test] @@ -241,9 +235,10 @@ async fn test_data_stream_transaction_outputs() { .return_const(Ok(())); // Create the continuous syncer - let mut continuous_syncer = create_continuous_syncer( + let (mut continuous_syncer, _) = create_continuous_syncer( driver_configuration, mock_streaming_client, + None, true, current_synced_version, current_synced_epoch, @@ -251,10 +246,7 @@ async fn test_data_stream_transaction_outputs() { // Drive progress to initialize the transaction output stream let no_sync_request = Arc::new(Mutex::new(None)); - continuous_syncer - .drive_progress(no_sync_request.clone()) - .await - .unwrap(); + drive_progress(&mut continuous_syncer, &no_sync_request).await; // Send an invalid output along the stream let mut transaction_output_with_proof = TransactionOutputListWithProof::new_empty(); @@ -277,20 +269,232 @@ async fn test_data_stream_transaction_outputs() { assert_matches!(error, Error::VerificationError(_)); // Drive progress to initialize the transaction output stream - continuous_syncer - .drive_progress(no_sync_request.clone()) + drive_progress(&mut continuous_syncer, &no_sync_request).await; +} + +#[tokio::test] +async fn test_data_stream_transactions_or_outputs_with_target() { + // Create test data + let current_synced_epoch = 5; + let current_synced_version = 234; + let notification_id = 435345; + let target_ledger_info = create_epoch_ending_ledger_info(); + + // Create a driver configuration with a genesis waypoint and transactions or output syncing + let mut driver_configuration = create_full_node_driver_configuration(); + driver_configuration.config.continuous_syncing_mode = + ContinuousSyncingMode::ExecuteTransactionsOrApplyOutputs; + + // Create the mock streaming client + let mut mock_streaming_client = create_mock_streaming_client(); + let mut expectation_sequence = Sequence::new(); + let (mut notification_sender_1, data_stream_listener_1) = create_data_stream_listener(); + let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener(); + let data_stream_id_1 = data_stream_listener_1.data_stream_id; + for data_stream_listener in [data_stream_listener_1, data_stream_listener_2] { + mock_streaming_client + .expect_continuously_stream_transactions_or_outputs() + .times(1) + .with( + eq(current_synced_version), + eq(current_synced_epoch), + eq(false), + eq(Some(target_ledger_info.clone())), + ) + .return_once(move |_, _, _, _| Ok(data_stream_listener)) + .in_sequence(&mut expectation_sequence); + } + mock_streaming_client + .expect_terminate_stream_with_feedback() + .with( + eq(data_stream_id_1), + eq(Some(NotificationAndFeedback::new( + notification_id, + NotificationFeedback::EmptyPayloadData, + ))), + ) + .return_const(Ok(())); + + // Create the continuous syncer + let (mut continuous_syncer, _) = create_continuous_syncer( + driver_configuration, + mock_streaming_client, + None, + true, + current_synced_version, + current_synced_epoch, + ); + + // Drive progress to initialize the transaction output stream + let (consensus_sync_notification, _) = ConsensusSyncNotification::new(target_ledger_info); + let sync_request = Arc::new(Mutex::new(Some(ConsensusSyncRequest::new( + consensus_sync_notification, + )))); + drive_progress(&mut continuous_syncer, &sync_request).await; + + // Send an invalid output along the stream + let data_notification = DataNotification { + notification_id, + data_payload: DataPayload::ContinuousTransactionOutputsWithProof( + create_epoch_ending_ledger_info(), + TransactionOutputListWithProof::new_empty(), + ), + }; + notification_sender_1.send(data_notification).await.unwrap(); + + // Drive progress again and ensure we get a verification error + let error = continuous_syncer + .drive_progress(sync_request.clone()) .await - .unwrap(); + .unwrap_err(); + assert_matches!(error, Error::VerificationError(_)); + + // Drive progress to initialize the transaction output stream + drive_progress(&mut continuous_syncer, &sync_request).await; +} + +#[tokio::test] +async fn test_data_stream_transactions_or_outputs_with_target_fallback() { + // Create test data + let current_synced_epoch = 5; + let current_synced_version = 234; + let notification_id = 435345; + let target_ledger_info = create_epoch_ending_ledger_info(); + + // Create a driver configuration with a genesis waypoint and transactions or output syncing + let mut driver_configuration = create_full_node_driver_configuration(); + driver_configuration.config.continuous_syncing_mode = + ContinuousSyncingMode::ExecuteTransactionsOrApplyOutputs; + + // Create the mock streaming client + let mut mock_streaming_client = create_mock_streaming_client(); + + // Set expectations for stream creations and terminations + let mut expectation_sequence = Sequence::new(); + let (_notification_sender_1, data_stream_listener_1) = create_data_stream_listener(); + let data_stream_id_1 = data_stream_listener_1.data_stream_id; + let (_notification_sender_2, data_stream_listener_2) = create_data_stream_listener(); + let data_stream_id_2 = data_stream_listener_2.data_stream_id; + let (_notification_sender_3, data_stream_listener_3) = create_data_stream_listener(); + mock_streaming_client + .expect_continuously_stream_transactions_or_outputs() + .times(1) + .with( + eq(current_synced_version), + eq(current_synced_epoch), + eq(false), + eq(Some(target_ledger_info.clone())), + ) + .return_once(move |_, _, _, _| Ok(data_stream_listener_1)) + .in_sequence(&mut expectation_sequence); + mock_streaming_client + .expect_terminate_stream_with_feedback() + .times(1) + .with( + eq(data_stream_id_1), + eq(Some(NotificationAndFeedback::new( + notification_id, + NotificationFeedback::PayloadProofFailed, + ))), + ) + .return_const(Ok(())) + .in_sequence(&mut expectation_sequence); + mock_streaming_client + .expect_continuously_stream_transaction_outputs() + .times(1) + .with( + eq(current_synced_version), + eq(current_synced_epoch), + eq(Some(target_ledger_info.clone())), + ) + .return_once(move |_, _, _| Ok(data_stream_listener_2)) + .in_sequence(&mut expectation_sequence); + mock_streaming_client + .expect_terminate_stream_with_feedback() + .times(1) + .with( + eq(data_stream_id_2), + eq(Some(NotificationAndFeedback::new( + notification_id, + NotificationFeedback::InvalidPayloadData, + ))), + ) + .return_const(Ok(())) + .in_sequence(&mut expectation_sequence); + mock_streaming_client + .expect_continuously_stream_transactions_or_outputs() + .times(1) + .with( + eq(current_synced_version), + eq(current_synced_epoch), + eq(false), + eq(Some(target_ledger_info.clone())), + ) + .return_once(move |_, _, _, _| Ok(data_stream_listener_3)) + .in_sequence(&mut expectation_sequence); + + // Create the continuous syncer + let time_service = TimeService::mock(); + let (mut continuous_syncer, mut output_fallback_handler) = create_continuous_syncer( + driver_configuration.clone(), + mock_streaming_client, + Some(time_service.clone()), + true, + current_synced_version, + current_synced_epoch, + ); + assert!(!output_fallback_handler.in_fallback_mode()); + + // Drive progress to initialize the transactions or output stream + let (consensus_sync_notification, _) = ConsensusSyncNotification::new(target_ledger_info); + let sync_request = Arc::new(Mutex::new(Some(ConsensusSyncRequest::new( + consensus_sync_notification, + )))); + drive_progress(&mut continuous_syncer, &sync_request).await; + + // Send a storage synchronizer error to the continuous syncer so that it falls back + // to output syncing and drive progress for the new stream type. + handle_storage_synchronizer_error( + &mut continuous_syncer, + notification_id, + NotificationFeedback::PayloadProofFailed, + ) + .await; + drive_progress(&mut continuous_syncer, &sync_request).await; + assert!(output_fallback_handler.in_fallback_mode()); + + // Elapse enough time so that fallback mode is now disabled + time_service + .into_mock() + .advance_async(Duration::from_secs( + driver_configuration.config.fallback_to_output_syncing_secs, + )) + .await; + + // Send another storage synchronizer error to the bootstrapper and drive progress + // so that a regular stream is created. + handle_storage_synchronizer_error( + &mut continuous_syncer, + notification_id, + NotificationFeedback::InvalidPayloadData, + ) + .await; + drive_progress(&mut continuous_syncer, &sync_request).await; + assert!(!output_fallback_handler.in_fallback_mode()); } /// Creates a continuous syncer for testing fn create_continuous_syncer( driver_configuration: DriverConfiguration, mock_streaming_client: MockStreamingClient, + time_service: Option, expect_reset_executor: bool, synced_version: Version, current_epoch: Epoch, -) -> ContinuousSyncer { +) -> ( + ContinuousSyncer, + OutputFallbackHandler, +) { // Initialize the logger for tests aptos_logger::Logger::init_for_testing(); @@ -306,10 +510,45 @@ fn create_continuous_syncer( .expect_get_latest_epoch_state() .returning(move || Ok(create_epoch_state(current_epoch))); - ContinuousSyncer::new( + // Create the output fallback handler + let time_service = time_service.unwrap_or_else(TimeService::mock); + let output_fallback_handler = + OutputFallbackHandler::new(driver_configuration.clone(), time_service); + + // Create the continuous syncer + let continuous_syncer = ContinuousSyncer::new( driver_configuration, mock_streaming_client, + output_fallback_handler.clone(), Arc::new(mock_database_reader), mock_storage_synchronizer, - ) + ); + + (continuous_syncer, output_fallback_handler) +} + +/// Drives progress on the given syncer +async fn drive_progress( + continuous_syncer: &mut ContinuousSyncer, + no_sync_request: &Arc>>, +) { + continuous_syncer + .drive_progress(no_sync_request.clone()) + .await + .unwrap(); +} + +/// Handles the given storage synchronizer error for the bootstrapper +async fn handle_storage_synchronizer_error( + continuous_syncer: &mut ContinuousSyncer, + notification_id: NotificationId, + notification_feedback: NotificationFeedback, +) { + continuous_syncer + .handle_storage_synchronizer_error(NotificationAndFeedback::new( + notification_id, + notification_feedback, + )) + .await + .unwrap(); } diff --git a/state-sync/state-sync-v2/state-sync-driver/src/tests/driver.rs b/state-sync/state-sync-v2/state-sync-driver/src/tests/driver.rs index 094a50f41a488..5035bb7782604 100644 --- a/state-sync/state-sync-v2/state-sync-driver/src/tests/driver.rs +++ b/state-sync/state-sync-v2/state-sync-driver/src/tests/driver.rs @@ -9,6 +9,7 @@ use crate::{ verify_mempool_and_event_notification, }, }; +use aptos_config::config::StateSyncDriverConfig; use aptos_config::config::{NodeConfig, RoleType}; use aptos_consensus_notifications::{ConsensusNotificationSender, ConsensusNotifier}; use aptos_data_client::aptosnet::AptosNetDataClient; @@ -34,6 +35,7 @@ use aptos_types::{ use aptos_vm::AptosVM; use claims::{assert_err, assert_none}; use futures::{FutureExt, StreamExt}; +use std::time::Duration; use std::{collections::HashMap, sync::Arc}; // TODO(joshlind): extend these tests to cover more functionality! @@ -41,17 +43,37 @@ use std::{collections::HashMap, sync::Arc}; #[tokio::test(flavor = "multi_thread")] async fn test_auto_bootstrapping() { // Create a driver for a validator with a waypoint at version 0 - let (validator_driver, _, _, _, _) = create_validator_driver(None).await; + let (validator_driver, consensus_notifier, _, _, _, time_service) = + create_validator_driver(None).await; - // Wait until the validator is bootstrapped (auto-bootstrapping should occur) + // Spawn a driver client that's blocked on bootstrapping let driver_client = validator_driver.create_driver_client(); - driver_client.notify_once_bootstrapped().await.unwrap(); + let join_handle = tokio::spawn(async move { + driver_client.notify_once_bootstrapped().await.unwrap(); + }); + + // Verify auto-bootstrapping hasn't happened yet + let result = consensus_notifier + .sync_to_target(create_ledger_info_at_version(0)) + .await; + assert_err!(result); + + // Elapse enough time on the time service for auto-bootstrapping + time_service + .into_mock() + .advance_async(Duration::from_secs( + StateSyncDriverConfig::default().max_connection_deadline_secs, + )) + .await; + + // Wait until the validator is bootstrapped (auto-bootstrapping should occur) + join_handle.await.unwrap(); } #[tokio::test] async fn test_consensus_commit_notification() { // Create a driver for a full node - let (_full_node_driver, consensus_notifier, _, _, _) = create_full_node_driver(None).await; + let (_full_node_driver, consensus_notifier, _, _, _, _) = create_full_node_driver(None).await; // Verify that full nodes can't process commit notifications let result = consensus_notifier @@ -60,7 +82,7 @@ async fn test_consensus_commit_notification() { assert_err!(result); // Create a driver for a validator with a waypoint at version 0 - let (_validator_driver, consensus_notifier, _, _, _) = create_validator_driver(None).await; + let (_validator_driver, consensus_notifier, _, _, _, _) = create_validator_driver(None).await; // Send a new commit notification and verify the node isn't bootstrapped let result = consensus_notifier @@ -69,12 +91,26 @@ async fn test_consensus_commit_notification() { assert_err!(result); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_mempool_commit_notifications() { // Create a driver for a validator with a waypoint at version 0 let subscription_event_key = EventKey::random(); - let (validator_driver, consensus_notifier, mut mempool_listener, _, mut event_listener) = - create_validator_driver(Some(vec![subscription_event_key])).await; + let ( + validator_driver, + consensus_notifier, + mut mempool_listener, + _, + mut event_listener, + time_service, + ) = create_validator_driver(Some(vec![subscription_event_key])).await; + + // Elapse enough time on the time service for auto-bootstrapping + time_service + .into_mock() + .advance_async(Duration::from_secs( + StateSyncDriverConfig::default().max_connection_deadline_secs, + )) + .await; // Wait until the validator is bootstrapped let driver_client = validator_driver.create_driver_client(); @@ -110,11 +146,25 @@ async fn test_mempool_commit_notifications() { join_handle.await.unwrap(); } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_reconfiguration_notifications() { // Create a driver for a validator with a waypoint at version 0 - let (validator_driver, consensus_notifier, mut mempool_listener, mut reconfig_listener, _) = - create_validator_driver(None).await; + let ( + validator_driver, + consensus_notifier, + mut mempool_listener, + mut reconfig_listener, + _, + time_service, + ) = create_validator_driver(None).await; + + // Elapse enough time on the time service for auto-bootstrapping + time_service + .into_mock() + .advance_async(Duration::from_secs( + StateSyncDriverConfig::default().max_connection_deadline_secs, + )) + .await; // Wait until the validator is bootstrapped let driver_client = validator_driver.create_driver_client(); @@ -163,7 +213,7 @@ async fn test_reconfiguration_notifications() { #[tokio::test] async fn test_consensus_sync_request() { // Create a driver for a full node - let (_full_node_driver, consensus_notifier, _, _, _) = create_full_node_driver(None).await; + let (_full_node_driver, consensus_notifier, _, _, _, _) = create_full_node_driver(None).await; // Verify that full nodes can't process sync requests let result = consensus_notifier @@ -172,7 +222,7 @@ async fn test_consensus_sync_request() { assert_err!(result); // Create a driver for a validator with a waypoint at version 0 - let (_validator_driver, consensus_notifier, _, _, _) = create_validator_driver(None).await; + let (_validator_driver, consensus_notifier, _, _, _, _) = create_validator_driver(None).await; // Send a new sync request and verify the node isn't bootstrapped let result = consensus_notifier @@ -190,6 +240,7 @@ async fn create_validator_driver( MempoolNotificationListener, ReconfigNotificationListener, EventNotificationListener, + TimeService, ) { let mut node_config = NodeConfig::default(); node_config.base.role = RoleType::Validator; @@ -206,6 +257,7 @@ async fn create_full_node_driver( MempoolNotificationListener, ReconfigNotificationListener, EventNotificationListener, + TimeService, ) { let mut node_config = NodeConfig::default(); node_config.base.role = RoleType::FullNode; @@ -224,6 +276,7 @@ async fn create_driver_for_tests( MempoolNotificationListener, ReconfigNotificationListener, EventNotificationListener, + TimeService, ) { // Create test aptos database let db_path = aptos_temppath::TempPath::new(); @@ -262,6 +315,7 @@ async fn create_driver_for_tests( let (streaming_service_client, _) = new_streaming_service_client_listener_pair(); // Create a test aptos data client + let time_service = TimeService::mock(); let network_client = StorageServiceClient::new( MultiNetworkSender::new(HashMap::new()), PeerMetadataStorage::new(&[]), @@ -270,7 +324,7 @@ async fn create_driver_for_tests( node_config.state_sync.aptos_data_client, node_config.base.clone(), node_config.state_sync.storage_service, - TimeService::mock(), + time_service.clone(), network_client, None, ); @@ -291,6 +345,7 @@ async fn create_driver_for_tests( event_subscription_service, aptos_data_client, streaming_service_client, + time_service.clone(), ); // The driver will notify reconfiguration subscribers of the initial configs. @@ -303,5 +358,6 @@ async fn create_driver_for_tests( mempool_listener, reconfiguration_subscriber, event_subscriber, + time_service, ) } diff --git a/state-sync/state-sync-v2/state-sync-driver/src/tests/driver_factory.rs b/state-sync/state-sync-v2/state-sync-driver/src/tests/driver_factory.rs index fdf7ae1428fa5..01b8f1b1c2fb0 100644 --- a/state-sync/state-sync-v2/state-sync-driver/src/tests/driver_factory.rs +++ b/state-sync/state-sync-v2/state-sync-driver/src/tests/driver_factory.rs @@ -94,6 +94,7 @@ fn test_new_initialized_configs() { event_subscription_service, aptos_data_client, streaming_service_client, + TimeService::mock(), ); // Verify the initial configs were notified diff --git a/state-sync/state-sync-v2/state-sync-driver/src/tests/mocks.rs b/state-sync/state-sync-v2/state-sync-driver/src/tests/mocks.rs index cb7d46885d075..44da3af1b2f68 100644 --- a/state-sync/state-sync-v2/state-sync-driver/src/tests/mocks.rs +++ b/state-sync/state-sync-v2/state-sync-driver/src/tests/mocks.rs @@ -377,6 +377,14 @@ mock! { include_events: bool, ) -> Result; + async fn get_all_transactions_or_outputs( + &self, + start_version: Version, + end_version: Version, + proof_version: Version, + include_events: bool, + ) -> Result; + async fn continuously_stream_transaction_outputs( &self, start_version: Version, @@ -392,6 +400,14 @@ mock! { target: Option, ) -> Result; + async fn continuously_stream_transactions_or_outputs( + &self, + start_version: Version, + start_epoch: Epoch, + include_events: bool, + target: Option, + ) -> Result; + async fn terminate_stream_with_feedback( &self, data_stream_id: DataStreamId, diff --git a/state-sync/state-sync-v2/state-sync-driver/src/utils.rs b/state-sync/state-sync-v2/state-sync-driver/src/utils.rs index 95e8d8017266c..2229a5e277c27 100644 --- a/state-sync/state-sync-v2/state-sync-driver/src/utils.rs +++ b/state-sync/state-sync-v2/state-sync-driver/src/utils.rs @@ -1,6 +1,8 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 +use crate::driver::DriverConfiguration; +use crate::storage_synchronizer::StorageSynchronizerInterface; use crate::{ error::Error, logging::{LogEntry, LogSchema}, @@ -9,6 +11,7 @@ use crate::{ CommitNotification, CommittedTransactions, MempoolNotificationHandler, }, }; +use aptos_data_streaming_service::data_notification::NotificationId; use aptos_data_streaming_service::data_stream::DataStreamId; use aptos_data_streaming_service::streaming_client::NotificationAndFeedback; use aptos_data_streaming_service::{ @@ -20,11 +23,15 @@ use aptos_infallible::Mutex; use aptos_logger::prelude::*; use aptos_mempool_notifications::MempoolNotificationSender; use aptos_storage_interface::DbReader; +use aptos_time_service::TimeService; +use aptos_time_service::TimeServiceTrait; +use aptos_types::transaction::{TransactionListWithProof, TransactionOutputListWithProof}; use aptos_types::{ epoch_change::Verifier, epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures, transaction::Version, }; use futures::StreamExt; +use std::time::Instant; use std::{sync::Arc, time::Duration}; use tokio::time::timeout; @@ -102,6 +109,88 @@ impl SpeculativeStreamState { } } +/// A simple struct that holds all information relevant for managing +/// fallback behaviour to output syncing. +#[derive(Clone)] +pub struct OutputFallbackHandler { + // The configuration for the state sync driver + driver_configuration: DriverConfiguration, + + // The most recent time at which we fell back to output syncing + fallback_start_time: Arc>>, + + // The time service + time_service: TimeService, +} + +impl OutputFallbackHandler { + pub fn new(driver_configuration: DriverConfiguration, time_service: TimeService) -> Self { + let fallback_start_time = Arc::new(Mutex::new(None)); + Self { + driver_configuration, + fallback_start_time, + time_service, + } + } + + /// Initiates a fallback to output syncing (if we haven't already) + pub fn fallback_to_outputs(&mut self) { + if self.fallback_start_time.lock().is_none() { + self.set_fallback_start_time(self.time_service.now()); + info!(LogSchema::new(LogEntry::Driver).message(&format!( + "Falling back to output syncing for at least {:?} seconds!", + self.get_fallback_duration().as_secs() + ))); + } + } + + /// Returns true iff we're currently in fallback mode + pub fn in_fallback_mode(&mut self) -> bool { + let fallback_start_time = self.fallback_start_time.lock().take(); + if let Some(fallback_start_time) = fallback_start_time { + if let Some(fallback_deadline) = + fallback_start_time.checked_add(self.get_fallback_duration()) + { + // Check if we elapsed the max fallback duration + if self.time_service.now() >= fallback_deadline { + info!(LogSchema::new(LogEntry::AutoBootstrapping) + .message("Passed the output fallback deadline! Disabling fallback mode!")); + false + } else { + // Reinsert the fallback deadline (not enough time has passed) + self.set_fallback_start_time(fallback_start_time); + true + } + } else { + warn!(LogSchema::new(LogEntry::Driver) + .message("The fallback deadline overflowed! Disabling fallback mode!")); + false + } + } else { + false + } + } + + /// Returns the fallback duration as defined by the config + fn get_fallback_duration(&self) -> Duration { + Duration::from_secs( + self.driver_configuration + .config + .fallback_to_output_syncing_secs, + ) + } + + /// Sets the fallback start time internally + fn set_fallback_start_time(&mut self, fallback_start_time: Instant) { + if let Some(old_start_time) = self.fallback_start_time.lock().replace(fallback_start_time) { + warn!(LogSchema::new(LogEntry::Driver).message(&format!( + "Overwrote the old fallback start time ({:?}) with the new one ({:?})!", + old_start_time, fallback_start_time + ))); + } + } +} + /// Fetches a data notification from the given data stream listener. Returns an /// error if the data stream times out after `max_stream_wait_time_ms`. Also, /// tracks the number of consecutive timeouts to identify when the stream has @@ -278,3 +367,47 @@ pub fn update_new_epoch_metrics() { 1, ); } + +/// Executes the given list of transactions and +/// returns the number of transactions in the list. +pub async fn execute_transactions( + mut storage_synchronizer: StorageSyncer, + notification_id: NotificationId, + proof_ledger_info: LedgerInfoWithSignatures, + end_of_epoch_ledger_info: Option, + transaction_list_with_proof: TransactionListWithProof, +) -> Result { + let num_transactions = transaction_list_with_proof.transactions.len(); + storage_synchronizer + .execute_transactions( + notification_id, + transaction_list_with_proof, + proof_ledger_info, + end_of_epoch_ledger_info, + ) + .await?; + Ok(num_transactions) +} + +/// Applies the given list of transaction outputs and +/// returns the number of outputs in the list. +pub async fn apply_transaction_outputs( + mut storage_synchronizer: StorageSyncer, + notification_id: NotificationId, + proof_ledger_info: LedgerInfoWithSignatures, + end_of_epoch_ledger_info: Option, + transaction_outputs_with_proof: TransactionOutputListWithProof, +) -> Result { + let num_transaction_outputs = transaction_outputs_with_proof + .transactions_and_outputs + .len(); + storage_synchronizer + .apply_transaction_outputs( + notification_id, + transaction_outputs_with_proof, + proof_ledger_info, + end_of_epoch_ledger_info, + ) + .await?; + Ok(num_transaction_outputs) +} diff --git a/state-sync/storage-service/server/src/tests.rs b/state-sync/storage-service/server/src/tests.rs index 0ae9e234fa7e4..d4e4522375309 100644 --- a/state-sync/storage-service/server/src/tests.rs +++ b/state-sync/storage-service/server/src/tests.rs @@ -1594,6 +1594,7 @@ async fn test_get_transactions_or_outputs_with_proof_chunk_limit() { // Create test data let max_output_chunk_size = StorageServiceConfig::default().max_transaction_output_chunk_size; + let max_transaction_chunk_size = StorageServiceConfig::default().max_transaction_chunk_size; let chunk_size = max_output_chunk_size * 10; // Set a chunk request larger than the max let start_version = 0; let end_version = start_version + max_output_chunk_size - 1; @@ -1616,7 +1617,7 @@ async fn test_get_transactions_or_outputs_with_proof_chunk_limit() { expect_get_transactions( &mut db_reader, start_version, - max_output_chunk_size, + max_transaction_chunk_size, proof_version, false, transaction_list_with_proof.clone(), diff --git a/testsuite/smoke-test/src/state_sync.rs b/testsuite/smoke-test/src/state_sync.rs index ced8db8ca449e..53f300e881a9d 100644 --- a/testsuite/smoke-test/src/state_sync.rs +++ b/testsuite/smoke-test/src/state_sync.rs @@ -117,6 +117,70 @@ async fn test_full_node_bootstrap_outputs_exponential_backoff() { test_full_node_sync(vfn_peer_id, &mut swarm, true).await; } +#[tokio::test] +async fn test_full_node_bootstrap_transactions_or_outputs() { + // Create a validator swarm of 1 validator node with a small network limit + let mut swarm = SwarmBuilder::new_local(1) + .with_aptos() + .with_init_config(Arc::new(|_, config, _| { + config.state_sync.storage_service.max_network_chunk_bytes = 5 * 1024; + })) + .build() + .await; + + // Create a fullnode config that uses transactions or outputs to sync + let mut vfn_config = NodeConfig::default_for_validator_full_node(); + vfn_config.state_sync.state_sync_driver.bootstrapping_mode = + BootstrappingMode::ExecuteOrApplyFromGenesis; + vfn_config + .state_sync + .state_sync_driver + .continuous_syncing_mode = ContinuousSyncingMode::ExecuteTransactionsOrApplyOutputs; + vfn_config + .state_sync + .aptos_data_client + .max_num_output_reductions = 1; + vfn_config.state_sync.aptos_data_client.response_timeout_ms = 1; + + // Create the fullnode + let vfn_peer_id = create_full_node(vfn_config, &mut swarm).await; + + // Test the ability of the fullnode to sync + test_full_node_sync(vfn_peer_id, &mut swarm, true).await; +} + +#[tokio::test] +async fn test_full_node_bootstrap_snapshot_transactions_or_outputs() { + // Create a validator swarm of 1 validator node with a small network limit + let mut swarm = SwarmBuilder::new_local(1) + .with_aptos() + .with_init_config(Arc::new(|_, config, _| { + config.state_sync.storage_service.max_network_chunk_bytes = 300 * 1024; + })) + .build() + .await; + + // Create a fullnode config that uses snapshot syncing and transactions or outputs + let mut vfn_config = NodeConfig::default_for_validator_full_node(); + vfn_config.state_sync.state_sync_driver.bootstrapping_mode = + BootstrappingMode::DownloadLatestStates; + vfn_config + .state_sync + .state_sync_driver + .continuous_syncing_mode = ContinuousSyncingMode::ExecuteTransactionsOrApplyOutputs; + vfn_config + .state_sync + .aptos_data_client + .max_num_output_reductions = 2; + vfn_config.state_sync.aptos_data_client.response_timeout_ms = 1; + + // Create the fullnode + let vfn_peer_id = create_full_node(vfn_config, &mut swarm).await; + + // Test the ability of the fullnode to sync + test_full_node_sync(vfn_peer_id, &mut swarm, true).await; +} + #[tokio::test] async fn test_full_node_bootstrap_transactions() { // Create a validator swarm of 1 validator node @@ -416,6 +480,30 @@ async fn test_validator_bootstrap_transactions() { test_validator_sync(&mut swarm, 1).await; } +#[tokio::test] +async fn test_validator_bootstrap_transactions_or_outputs() { + // Create a swarm of 4 validators using transaction or output syncing + let mut swarm = SwarmBuilder::new_local(4) + .with_aptos() + .with_init_config(Arc::new(|_, config, _| { + config.state_sync.state_sync_driver.bootstrapping_mode = + BootstrappingMode::ExecuteOrApplyFromGenesis; + config.state_sync.state_sync_driver.continuous_syncing_mode = + ContinuousSyncingMode::ExecuteTransactionsOrApplyOutputs; + config.state_sync.storage_service.max_network_chunk_bytes = 10 * 1024; + config + .state_sync + .aptos_data_client + .max_num_output_reductions = 1; + config.state_sync.aptos_data_client.response_timeout_ms = 1; + })) + .build() + .await; + + // Test the ability of the validators to sync + test_validator_sync(&mut swarm, 1).await; +} + #[ignore] // We ignore this test because it takes a long time. But, it works, so it shouldn't be removed. #[tokio::test] async fn test_validator_bootstrap_transactions_network_limit() {