diff --git a/config/src/config/state_sync_config.rs b/config/src/config/state_sync_config.rs index b59474a2bf4af..6ab8e8576b0b0 100644 --- a/config/src/config/state_sync_config.rs +++ b/config/src/config/state_sync_config.rs @@ -170,6 +170,7 @@ impl Default for DataStreamingServiceConfig { pub struct AptosDataClientConfig { pub max_num_in_flight_priority_polls: u64, // Max num of in-flight polls for priority peers pub max_num_in_flight_regular_polls: u64, // Max num of in-flight polls for regular peers + pub max_num_output_reductions: u64, // The max num of output reductions before transactions are returned pub max_response_timeout_ms: u64, // Max timeout (in ms) when waiting for a response (after exponential increases) pub response_timeout_ms: u64, // First timeout (in ms) when waiting for a response pub subscription_timeout_ms: u64, // Timeout (in ms) when waiting for a subscription response @@ -182,6 +183,7 @@ impl Default for AptosDataClientConfig { Self { max_num_in_flight_priority_polls: 10, max_num_in_flight_regular_polls: 10, + max_num_output_reductions: 2, max_response_timeout_ms: 60000, // 60 seconds response_timeout_ms: 10000, // 10 seconds subscription_timeout_ms: 5000, // 5 seconds diff --git a/state-sync/aptos-data-client/src/aptosnet/mod.rs b/state-sync/aptos-data-client/src/aptosnet/mod.rs index a1c3aa12ebb43..d0f4b83cecc6b 100644 --- a/state-sync/aptos-data-client/src/aptosnet/mod.rs +++ b/state-sync/aptos-data-client/src/aptosnet/mod.rs @@ -38,10 +38,13 @@ use std::{convert::TryFrom, fmt, sync::Arc, time::Duration}; use storage_service_client::StorageServiceClient; use storage_service_types::requests::{ DataRequest, EpochEndingLedgerInfoRequest, NewTransactionOutputsWithProofRequest, - NewTransactionsWithProofRequest, StateValuesWithProofRequest, StorageServiceRequest, - TransactionOutputsWithProofRequest, TransactionsWithProofRequest, + NewTransactionsOrOutputsWithProofRequest, NewTransactionsWithProofRequest, + StateValuesWithProofRequest, StorageServiceRequest, TransactionOutputsWithProofRequest, + TransactionsOrOutputsWithProofRequest, TransactionsWithProofRequest, +}; +use storage_service_types::responses::{ + StorageServerSummary, StorageServiceResponse, TransactionOrOutputListWithProof, }; -use storage_service_types::responses::{StorageServerSummary, StorageServiceResponse}; use storage_service_types::Epoch; use tokio::{runtime::Handle, task::JoinHandle}; @@ -128,6 +131,11 @@ impl AptosNetDataClient { self.data_client_config.use_compression } + /// Returns the max number of output reductions as defined by the config + fn get_max_num_output_reductions(&self) -> u64 { + self.data_client_config.max_num_output_reductions + } + /// Generates a new response id fn next_response_id(&self) -> u64 { self.response_id_generator.next() @@ -468,6 +476,22 @@ impl AptosNetDataClient { .write() .update_score_error(peer, error_type); } + + /// Creates a storage service request using the given data request + /// and sends it across the network + async fn create_and_send_storage_request( + &self, + request_timeout_ms: u64, + data_request: DataRequest, + ) -> Result> + where + T: TryFrom, + E: Into, + { + let storage_request = StorageServiceRequest::new(data_request, self.use_compression()); + self.send_request_and_decode(storage_request, request_timeout_ms) + .await + } } #[async_trait] @@ -486,9 +510,8 @@ impl AptosDataClient for AptosNetDataClient { start_epoch, expected_end_epoch, }); - let storage_request = StorageServiceRequest::new(data_request, self.use_compression()); let response: Response = self - .send_request_and_decode(storage_request, request_timeout_ms) + .create_and_send_storage_request(request_timeout_ms, data_request) .await?; Ok(response.map(|epoch_change| epoch_change.ledger_info_with_sigs)) } @@ -504,8 +527,7 @@ impl AptosDataClient for AptosNetDataClient { known_version, known_epoch, }); - let storage_request = StorageServiceRequest::new(data_request, self.use_compression()); - self.send_request_and_decode(storage_request, request_timeout_ms) + self.create_and_send_storage_request(request_timeout_ms, data_request) .await } @@ -522,8 +544,26 @@ impl AptosDataClient for AptosNetDataClient { known_epoch, include_events, }); - let storage_request = StorageServiceRequest::new(data_request, self.use_compression()); - self.send_request_and_decode(storage_request, request_timeout_ms) + self.create_and_send_storage_request(request_timeout_ms, data_request) + .await + } + + async fn get_new_transactions_or_outputs_with_proof( + &self, + known_version: Version, + known_epoch: Epoch, + include_events: bool, + request_timeout_ms: u64, + ) -> Result> { + let data_request = DataRequest::GetNewTransactionsOrOutputsWithProof( + NewTransactionsOrOutputsWithProofRequest { + known_version, + known_epoch, + include_events, + max_num_output_reductions: self.get_max_num_output_reductions(), + }, + ); + self.create_and_send_storage_request(request_timeout_ms, data_request) .await } @@ -533,8 +573,7 @@ impl AptosDataClient for AptosNetDataClient { request_timeout_ms: u64, ) -> Result> { let data_request = DataRequest::GetNumberOfStatesAtVersion(version); - let storage_request = StorageServiceRequest::new(data_request, self.use_compression()); - self.send_request_and_decode(storage_request, request_timeout_ms) + self.create_and_send_storage_request(request_timeout_ms, data_request) .await } @@ -550,8 +589,7 @@ impl AptosDataClient for AptosNetDataClient { start_index, end_index, }); - let storage_request = StorageServiceRequest::new(data_request, self.use_compression()); - self.send_request_and_decode(storage_request, request_timeout_ms) + self.create_and_send_storage_request(request_timeout_ms, data_request) .await } @@ -568,8 +606,7 @@ impl AptosDataClient for AptosNetDataClient { start_version, end_version, }); - let storage_request = StorageServiceRequest::new(data_request, self.use_compression()); - self.send_request_and_decode(storage_request, request_timeout_ms) + self.create_and_send_storage_request(request_timeout_ms, data_request) .await } @@ -587,8 +624,27 @@ impl AptosDataClient for AptosNetDataClient { end_version, include_events, }); - let storage_request = StorageServiceRequest::new(data_request, self.use_compression()); - self.send_request_and_decode(storage_request, request_timeout_ms) + self.create_and_send_storage_request(request_timeout_ms, data_request) + .await + } + + async fn get_transactions_or_outputs_with_proof( + &self, + proof_version: Version, + start_version: Version, + end_version: Version, + include_events: bool, + request_timeout_ms: u64, + ) -> Result> { + let data_request = + DataRequest::GetTransactionsOrOutputsWithProof(TransactionsOrOutputsWithProofRequest { + proof_version, + start_version, + end_version, + include_events, + max_num_output_reductions: self.get_max_num_output_reductions(), + }); + self.create_and_send_storage_request(request_timeout_ms, data_request) .await } } diff --git a/state-sync/aptos-data-client/src/lib.rs b/state-sync/aptos-data-client/src/lib.rs index e95f68513713f..0a90dcd9b1e6f 100644 --- a/state-sync/aptos-data-client/src/lib.rs +++ b/state-sync/aptos-data-client/src/lib.rs @@ -12,6 +12,7 @@ use async_trait::async_trait; use itertools::Itertools; use serde::{Deserialize, Serialize}; use std::{fmt, fmt::Display}; +use storage_service_types::responses::TransactionOrOutputListWithProof; use storage_service_types::{responses::CompleteDataRange, Epoch}; use thiserror::Error; @@ -102,6 +103,18 @@ pub trait AptosDataClient { request_timeout_ms: u64, ) -> Result>; + /// Fetches a new transaction or output list with proof. Versions start at + /// `known_version + 1` and `known_epoch` (inclusive). The end version + /// and proof version are specified by the server. If the data cannot be + /// fetched, an error is returned. + async fn get_new_transactions_or_outputs_with_proof( + &self, + known_version: Version, + known_epoch: Epoch, + include_events: bool, + request_timeout_ms: u64, + ) -> Result>; + /// Fetches the number of states at the specified version. async fn get_number_of_states( &self, @@ -149,6 +162,21 @@ pub trait AptosDataClient { include_events: bool, request_timeout_ms: u64, ) -> Result>; + + /// Fetches a transaction or output list with proof, with data from + /// start to end versions (inclusive). The proof is relative to the + /// specified `proof_version`. If `include_events` is true, events are + /// included in the proof. In some cases, fewer data items may be returned + /// (e.g., to tolerate network or chunk limits). If the data cannot + /// be fetched, an error is returned. + async fn get_transactions_or_outputs_with_proof( + &self, + proof_version: Version, + start_version: Version, + end_version: Version, + include_events: bool, + request_timeout_ms: u64, + ) -> Result>; } /// A response error that users of the Aptos Data Client can use to notify @@ -271,11 +299,37 @@ impl From<(TransactionListWithProof, LedgerInfoWithSignatures)> for ResponsePayl } } +impl TryFrom<(TransactionOrOutputListWithProof, LedgerInfoWithSignatures)> for ResponsePayload { + type Error = Error; + + fn try_from( + inner: (TransactionOrOutputListWithProof, LedgerInfoWithSignatures), + ) -> Result { + let ((transaction_list, output_list), ledger_info) = inner; + if let Some(transaction_list) = transaction_list { + Ok(Self::NewTransactionsWithProof(( + transaction_list, + ledger_info, + ))) + } else if let Some(output_list) = output_list { + Ok(Self::NewTransactionOutputsWithProof(( + output_list, + ledger_info, + ))) + } else { + Err(Error::InvalidResponse( + "Invalid response! No transaction or output list was returned!".into(), + )) + } + } +} + impl From for ResponsePayload { fn from(inner: u64) -> Self { Self::NumberOfStates(inner) } } + impl From for ResponsePayload { fn from(inner: TransactionOutputListWithProof) -> Self { Self::TransactionOutputsWithProof(inner) @@ -288,6 +342,23 @@ impl From for ResponsePayload { } } +impl TryFrom for ResponsePayload { + type Error = Error; + + fn try_from(inner: TransactionOrOutputListWithProof) -> Result { + let (transaction_list, output_list) = inner; + if let Some(transaction_list) = transaction_list { + Ok(Self::TransactionsWithProof(transaction_list)) + } else if let Some(output_list) = output_list { + Ok(Self::TransactionOutputsWithProof(output_list)) + } else { + Err(Error::InvalidResponse( + "Invalid response! No transaction or output list was returned!".into(), + )) + } + } +} + /// A snapshot of the global state of data available in the Aptos network. #[derive(Clone, Debug, Eq, PartialEq)] pub struct GlobalDataSummary { diff --git a/state-sync/state-sync-v2/data-streaming-service/src/tests/utils.rs b/state-sync/state-sync-v2/data-streaming-service/src/tests/utils.rs index 53181618ca769..180934eb820ea 100644 --- a/state-sync/state-sync-v2/data-streaming-service/src/tests/utils.rs +++ b/state-sync/state-sync-v2/data-streaming-service/src/tests/utils.rs @@ -41,7 +41,7 @@ use storage_service_types::requests::{ NewTransactionsWithProofRequest, StateValuesWithProofRequest, TransactionOutputsWithProofRequest, TransactionsWithProofRequest, }; -use storage_service_types::responses::CompleteDataRange; +use storage_service_types::responses::{CompleteDataRange, TransactionOrOutputListWithProof}; use storage_service_types::Epoch; use tokio::time::timeout; @@ -435,6 +435,18 @@ impl AptosDataClient for MockAptosDataClient { } } + async fn get_new_transactions_or_outputs_with_proof( + &self, + _known_version: Version, + _known_epoch: Epoch, + _include_events: bool, + _request_timeout_ms: u64, + ) -> aptos_data_client::Result< + Response<(TransactionOrOutputListWithProof, LedgerInfoWithSignatures)>, + > { + todo!() // Implement when we have a client side implementation + } + async fn get_number_of_states( &self, version: Version, @@ -513,6 +525,17 @@ impl AptosDataClient for MockAptosDataClient { // Return the transaction list with proofs Ok(create_data_client_response(transaction_list_with_proof)) } + + async fn get_transactions_or_outputs_with_proof( + &self, + _proof_version: Version, + _start_version: Version, + _end_version: Version, + _include_events: bool, + _request_timeout_ms: u64, + ) -> aptos_data_client::Result> { + todo!() // Implement when we have a client side implementation + } } #[derive(Debug)] diff --git a/state-sync/storage-service/server/src/lib.rs b/state-sync/storage-service/server/src/lib.rs index dedcb0965c111..eb868477868f4 100644 --- a/state-sync/storage-service/server/src/lib.rs +++ b/state-sync/storage-service/server/src/lib.rs @@ -34,11 +34,12 @@ use std::{ use storage_interface::DbReader; use storage_service_types::requests::{ DataRequest, EpochEndingLedgerInfoRequest, StateValuesWithProofRequest, StorageServiceRequest, - TransactionOutputsWithProofRequest, TransactionsWithProofRequest, + TransactionOutputsWithProofRequest, TransactionsOrOutputsWithProofRequest, + TransactionsWithProofRequest, }; use storage_service_types::responses::{ CompleteDataRange, DataResponse, DataSummary, ProtocolMetadata, ServerProtocolVersion, - StorageServerSummary, StorageServiceResponse, + StorageServerSummary, StorageServiceResponse, TransactionOrOutputListWithProof, }; use storage_service_types::{Result, StorageServiceError}; use thiserror::Error; @@ -107,8 +108,8 @@ impl DataSubscriptionRequest { } } - /// Creates a new storage service request to satisfy the transaction - /// subscription using the new data at the specified `target_ledger_info`. + /// Creates a new storage service request to satisfy the subscription + /// using the new data at the specified `target_ledger_info`. fn get_storage_request_for_missing_data( &self, config: StorageServiceConfig, @@ -157,6 +158,17 @@ impl DataSubscriptionRequest { include_events: request.include_events, }) } + DataRequest::GetNewTransactionsOrOutputsWithProof(request) => { + DataRequest::GetTransactionsOrOutputsWithProof( + TransactionsOrOutputsWithProofRequest { + proof_version: target_version, + start_version, + end_version, + include_events: request.include_events, + max_num_output_reductions: request.max_num_output_reductions, + }, + ) + } request => unreachable!("Unexpected subscription request: {:?}", request), }; let storage_request = @@ -169,6 +181,7 @@ impl DataSubscriptionRequest { match &self.request.data_request { DataRequest::GetNewTransactionOutputsWithProof(request) => request.known_version, DataRequest::GetNewTransactionsWithProof(request) => request.known_version, + DataRequest::GetNewTransactionsOrOutputsWithProof(request) => request.known_version, request => unreachable!("Unexpected subscription request: {:?}", request), } } @@ -178,6 +191,7 @@ impl DataSubscriptionRequest { match &self.request.data_request { DataRequest::GetNewTransactionOutputsWithProof(request) => request.known_epoch, DataRequest::GetNewTransactionsWithProof(request) => request.known_epoch, + DataRequest::GetNewTransactionsOrOutputsWithProof(request) => request.known_epoch, request => unreachable!("Unexpected subscription request: {:?}", request), } } @@ -190,6 +204,9 @@ impl DataSubscriptionRequest { config.max_transaction_output_chunk_size } DataRequest::GetNewTransactionsWithProof(_) => config.max_transaction_chunk_size, + DataRequest::GetNewTransactionsOrOutputsWithProof(_) => { + config.max_transaction_output_chunk_size + } request => unreachable!("Unexpected subscription request: {:?}", request), } } @@ -541,6 +558,26 @@ fn notify_peer_of_new_data( target_ledger_info.clone(), )) } + Ok(DataResponse::TransactionsOrOutputsWithProof(( + transactions_with_proof, + outputs_with_proof, + ))) => { + if let Some(transactions_with_proof) = transactions_with_proof { + DataResponse::NewTransactionsOrOutputsWithProof(( + (Some(transactions_with_proof), None), + target_ledger_info.clone(), + )) + } else if let Some(outputs_with_proof) = outputs_with_proof { + DataResponse::NewTransactionsOrOutputsWithProof(( + (None, Some(outputs_with_proof)), + target_ledger_info.clone(), + )) + } else { + return Err(Error::UnexpectedErrorEncountered( + "Failed to get a transaction or output response for peer!".into(), + )); + } + } data_response => { return Err(Error::UnexpectedErrorEncountered(format!( "Failed to get appropriate data response for peer! Got: {:?}", @@ -813,7 +850,13 @@ impl Handler { DataRequest::GetTransactionsWithProof(request) => { self.get_transactions_with_proof(request) } - _ => unreachable!("Received an unexpected request: {:?}", request), + DataRequest::GetTransactionsOrOutputsWithProof(request) => { + self.get_transactions_or_outputs_with_proof(request) + } + _ => Err(Error::UnexpectedErrorEncountered(format!( + "Received an unexpected request: {:?}", + request + ))), }?; let storage_response = StorageServiceResponse::new(data_response, request.use_compression)?; @@ -898,6 +941,25 @@ impl Handler { Ok(DataResponse::TransactionsWithProof(transactions_with_proof)) } + + fn get_transactions_or_outputs_with_proof( + &self, + request: &TransactionsOrOutputsWithProofRequest, + ) -> Result { + let (transactions_with_proof, outputs_with_proof) = + self.storage.get_transactions_or_outputs_with_proof( + request.proof_version, + request.start_version, + request.end_version, + request.include_events, + request.max_num_output_reductions, + )?; + + Ok(DataResponse::TransactionsOrOutputsWithProof(( + transactions_with_proof, + outputs_with_proof, + ))) + } } /// The interface into local storage (e.g., the Aptos DB) used by the storage @@ -943,6 +1005,21 @@ pub trait StorageReaderInterface: Clone + Send + 'static { end_version: u64, ) -> Result; + /// Returns a list of transaction or outputs with a proof relative to the + /// `proof_version`. The data list is expected to start at `start_version` + /// and end at `end_version` (inclusive). In some cases, less data may be + /// returned (e.g., due to network or chunk limits). If `include_events` + /// is true, events are also returned. `max_num_output_reductions` specifies + /// how many output reductions can occur before transactions are returned. + fn get_transactions_or_outputs_with_proof( + &self, + proof_version: u64, + start_version: u64, + end_version: u64, + include_events: bool, + max_num_output_reductions: u64, + ) -> Result; + /// Returns the number of states in the state tree at the specified version. fn get_number_of_states(&self, version: u64) -> Result; @@ -1100,7 +1177,7 @@ impl StorageReaderInterface for StorageReader { end_version: u64, include_events: bool, ) -> Result { - // Calculate the number of transaction outputs to fetch + // Calculate the number of transactions to fetch let expected_num_transactions = inclusive_range_len(start_version, end_version)?; let max_num_transactions = self.config.max_transaction_chunk_size; let mut num_transactions_to_fetch = min(expected_num_transactions, max_num_transactions); @@ -1244,6 +1321,63 @@ impl StorageReaderInterface for StorageReader { ))) } + fn get_transactions_or_outputs_with_proof( + &self, + proof_version: u64, + start_version: u64, + end_version: u64, + include_events: bool, + max_num_output_reductions: u64, + ) -> Result { + // Calculate the number of transaction outputs to fetch + let expected_num_outputs = inclusive_range_len(start_version, end_version)?; + let max_num_outputs = self.config.max_transaction_output_chunk_size; + let mut num_outputs_to_fetch = min(expected_num_outputs, max_num_outputs); + + // Attempt to serve the outputs. Halve the data only as many + // times as the fallback count allows. If the data still + // doesn't fit, return a transaction chunk instead. + let mut num_output_reductions = 0; + while num_output_reductions <= max_num_output_reductions { + let output_list_with_proof = self + .storage + .get_transaction_outputs(start_version, num_outputs_to_fetch, proof_version) + .map_err(|error| Error::StorageErrorEncountered(error.to_string()))?; + let (overflow_frame, num_bytes) = check_overflow_network_frame( + &output_list_with_proof, + self.config.max_network_chunk_bytes, + )?; + + if !overflow_frame { + return Ok((None, Some(output_list_with_proof))); + } else if num_outputs_to_fetch == 1 { + break; // We cannot return less than a single item. Fallback to transactions + } else { + increment_network_frame_overflow( + DataResponse::TransactionsOrOutputsWithProof(( + None, + Some(output_list_with_proof), + )) + .get_label(), + ); + let new_num_outputs_to_fetch = num_outputs_to_fetch / 2; + debug!("The request for {:?} outputs was too large (num bytes: {:?}). Current number of data reductions: {:?}", + num_outputs_to_fetch, num_bytes, num_output_reductions); + num_outputs_to_fetch = new_num_outputs_to_fetch; // Try again with half the amount of data + num_output_reductions += 1; + } + } + + // Return transactions only + let transactions_with_proof = self.get_transactions_with_proof( + proof_version, + start_version, + end_version, + include_events, + )?; + Ok((Some(transactions_with_proof), None)) + } + fn get_number_of_states(&self, version: u64) -> Result { let number_of_states = self .storage diff --git a/state-sync/storage-service/server/src/tests.rs b/state-sync/storage-service/server/src/tests.rs index 66bba94c9e5db..341876b695e89 100644 --- a/state-sync/storage-service/server/src/tests.rs +++ b/state-sync/storage-service/server/src/tests.rs @@ -54,6 +54,9 @@ use network::{ use rand::Rng; use std::{sync::Arc, time::Duration}; use storage_interface::{DbReader, ExecutedTrees, Order}; +use storage_service_types::requests::{ + NewTransactionsOrOutputsWithProofRequest, TransactionsOrOutputsWithProofRequest, +}; use storage_service_types::{ requests::{ DataRequest, EpochEndingLedgerInfoRequest, NewTransactionOutputsWithProofRequest, @@ -766,6 +769,299 @@ async fn test_get_new_transaction_outputs_max_chunk() { .await; } +#[tokio::test(flavor = "multi_thread")] +async fn test_get_new_transactions_or_outputs() { + // Test small and large chunk sizes + let max_output_chunk_size = StorageServiceConfig::default().max_transaction_output_chunk_size; + for chunk_size in [1, 100, max_output_chunk_size] { + // Test fallback to transaction syncing + for fallback_to_transactions in [false, true] { + // Create test data + let highest_version = 5060; + let highest_epoch = 30; + let lowest_version = 101; + let peer_version = highest_version - chunk_size; + let highest_ledger_info = + create_test_ledger_info_with_sigs(highest_epoch, highest_version); + let output_list_with_proof = + create_output_list_with_proof(peer_version + 1, highest_version, highest_version); + let transaction_list_with_proof = create_transaction_list_with_proof( + highest_version, + highest_version, + highest_version, + false, + ); // Creates a small transaction list + + // Create the mock db reader + let mut db_reader = + create_mock_db_for_subscription(highest_ledger_info.clone(), lowest_version); + expect_get_transaction_outputs( + &mut db_reader, + peer_version + 1, + highest_version - peer_version, + highest_version, + output_list_with_proof.clone(), + ); + if fallback_to_transactions { + expect_get_transactions( + &mut db_reader, + peer_version + 1, + highest_version - peer_version, + highest_version, + false, + transaction_list_with_proof.clone(), + ); + } + + // Create the storage client and server + let storage_config = configure_network_chunk_limit( + fallback_to_transactions, + &output_list_with_proof, + &transaction_list_with_proof, + ); + let (mut mock_client, service, mock_time) = + MockClient::new(Some(db_reader), Some(storage_config)); + tokio::spawn(service.start()); + + // Send a request to subscribe to new transactions or outputs + let mut response_receiver = get_new_transactions_or_outputs_with_proof( + &mut mock_client, + peer_version, + highest_epoch, + false, + 0, // Outputs cannot be reduced and will fallback to transactions + ) + .await; + + // Verify no subscription response has been received yet + assert_none!(response_receiver.try_recv().unwrap()); + + // Elapse enough time to force the subscription thread to work + wait_for_subscription_service_to_refresh(&mut mock_client, &mock_time).await; + + // Verify a response is received and that it contains the correct data + if fallback_to_transactions { + verify_new_transactions_or_outputs_with_proof( + &mut mock_client, + response_receiver, + Some(transaction_list_with_proof), + None, + highest_ledger_info, + ) + .await; + } else { + verify_new_transactions_or_outputs_with_proof( + &mut mock_client, + response_receiver, + None, + Some(output_list_with_proof), + highest_ledger_info, + ) + .await; + } + } + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_get_new_transactions_or_outputs_epoch_change() { + // Test fallback to transaction syncing + for fallback_to_transactions in [false, true] { + // Create test data + let highest_version = 10000; + let highest_epoch = 10000; + let lowest_version = 0; + let peer_version = highest_version - 1000; + let peer_epoch = highest_epoch - 1000; + let epoch_change_version = peer_version + 1; + let epoch_change_proof = EpochChangeProof { + ledger_info_with_sigs: vec![create_test_ledger_info_with_sigs( + peer_epoch, + epoch_change_version, + )], + more: false, + }; + let output_list_with_proof = create_output_list_with_proof( + peer_version + 1, + epoch_change_version, + epoch_change_version, + ); + let transaction_list_with_proof = create_transaction_list_with_proof( + peer_version + 1, + peer_version + 1, + epoch_change_version, + false, + ); // Creates a small transaction list + + // Create the mock db reader + let mut db_reader = create_mock_db_for_subscription( + create_test_ledger_info_with_sigs(highest_epoch, highest_version), + lowest_version, + ); + expect_get_epoch_ending_ledger_infos( + &mut db_reader, + peer_epoch, + peer_epoch + 1, + epoch_change_proof.clone(), + ); + expect_get_transaction_outputs( + &mut db_reader, + peer_version + 1, + epoch_change_version - peer_version, + epoch_change_version, + output_list_with_proof.clone(), + ); + if fallback_to_transactions { + expect_get_transactions( + &mut db_reader, + peer_version + 1, + epoch_change_version - peer_version, + epoch_change_version, + false, + transaction_list_with_proof.clone(), + ); + } + + // Create the storage client and server + let storage_config = configure_network_chunk_limit( + fallback_to_transactions, + &output_list_with_proof, + &transaction_list_with_proof, + ); + let (mut mock_client, service, mock_time) = + MockClient::new(Some(db_reader), Some(storage_config)); + tokio::spawn(service.start()); + + // Send a request to subscribe to new transaction outputs + let response_receiver = get_new_transactions_or_outputs_with_proof( + &mut mock_client, + peer_version, + peer_epoch, + false, + 5, + ) + .await; + + // Elapse enough time to force the subscription thread to work + wait_for_subscription_service_to_refresh(&mut mock_client, &mock_time).await; + + // Verify a response is received and that it contains the correct data + if fallback_to_transactions { + verify_new_transactions_or_outputs_with_proof( + &mut mock_client, + response_receiver, + Some(transaction_list_with_proof), + None, + epoch_change_proof.ledger_info_with_sigs[0].clone(), + ) + .await; + } else { + verify_new_transactions_or_outputs_with_proof( + &mut mock_client, + response_receiver, + None, + Some(output_list_with_proof), + epoch_change_proof.ledger_info_with_sigs[0].clone(), + ) + .await; + } + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_get_new_transactions_or_outputs_max_chunk() { + // Test fallback to transaction syncing + for fallback_to_transactions in [false, true] { + // Create test data + let highest_version = 65660; + let highest_epoch = 30; + let lowest_version = 101; + let max_chunk_size = StorageServiceConfig::default().max_transaction_output_chunk_size; + let requested_chunk_size = max_chunk_size + 1; + let peer_version = highest_version - requested_chunk_size; + let highest_ledger_info = create_test_ledger_info_with_sigs(highest_epoch, highest_version); + let output_list_with_proof = create_output_list_with_proof( + peer_version + 1, + peer_version + requested_chunk_size, + highest_version, + ); + let transaction_list_with_proof = create_transaction_list_with_proof( + peer_version + 1, + peer_version + 1, + peer_version + requested_chunk_size, + false, + ); // Creates a small transaction list + + // Create the mock db reader + let max_num_output_reductions = 5; + let mut db_reader = + create_mock_db_for_subscription(highest_ledger_info.clone(), lowest_version); + for i in 0..=max_num_output_reductions { + expect_get_transaction_outputs( + &mut db_reader, + peer_version + 1, + (max_chunk_size as u32 / (u32::pow(2, i as u32))) as u64, + highest_version, + output_list_with_proof.clone(), + ); + } + if fallback_to_transactions { + expect_get_transactions( + &mut db_reader, + peer_version + 1, + max_chunk_size, + highest_version, + false, + transaction_list_with_proof.clone(), + ); + } + + // Create the storage client and server + let storage_config = configure_network_chunk_limit( + fallback_to_transactions, + &output_list_with_proof, + &transaction_list_with_proof, + ); + let (mut mock_client, service, mock_time) = + MockClient::new(Some(db_reader), Some(storage_config)); + tokio::spawn(service.start()); + + // Send a request to subscribe to new transaction outputs + let response_receiver = get_new_transactions_or_outputs_with_proof( + &mut mock_client, + peer_version, + highest_epoch, + false, + max_num_output_reductions, + ) + .await; + + // Elapse enough time to force the subscription thread to work + wait_for_subscription_service_to_refresh(&mut mock_client, &mock_time).await; + + // Verify a response is received and that it contains the correct data + if fallback_to_transactions { + verify_new_transactions_or_outputs_with_proof( + &mut mock_client, + response_receiver, + Some(transaction_list_with_proof), + None, + highest_ledger_info, + ) + .await; + } else { + verify_new_transactions_or_outputs_with_proof( + &mut mock_client, + response_receiver, + None, + Some(output_list_with_proof), + highest_ledger_info, + ) + .await; + } + } +} + #[tokio::test] async fn test_get_number_of_states_at_version() { // Create test data @@ -1182,6 +1478,183 @@ async fn test_get_transaction_outputs_with_proof_invalid() { } } +#[tokio::test] +async fn test_get_transactions_or_outputs_with_proof() { + // Test small and large chunk requests + let max_output_chunk_size = StorageServiceConfig::default().max_transaction_output_chunk_size; + for chunk_size in [1, 100, max_output_chunk_size] { + // Test fallback to transaction syncing + for fallback_to_transactions in [false, true] { + // Create test data + let start_version = 0; + let end_version = start_version + chunk_size - 1; + let proof_version = end_version; + let output_list_with_proof = + create_output_list_with_proof(start_version, end_version, proof_version); + let transaction_list_with_proof = create_transaction_list_with_proof( + start_version, + start_version, + proof_version, + false, + ); // Creates a small transaction list + + // Create the mock db reader + let max_num_output_reductions = 5; + let mut db_reader = create_mock_db_reader(); + for i in 0..=max_num_output_reductions { + expect_get_transaction_outputs( + &mut db_reader, + start_version, + (chunk_size as u32 / (u32::pow(2, i as u32))) as u64, + proof_version, + output_list_with_proof.clone(), + ); + } + if fallback_to_transactions { + expect_get_transactions( + &mut db_reader, + start_version, + chunk_size, + proof_version, + false, + transaction_list_with_proof.clone(), + ); + } + + // Create the storage client and server + let storage_config = configure_network_chunk_limit( + fallback_to_transactions, + &output_list_with_proof, + &transaction_list_with_proof, + ); + let (mut mock_client, service, _) = + MockClient::new(Some(db_reader), Some(storage_config)); + tokio::spawn(service.start()); + + // Create a request to fetch transactions or outputs with a proof + let response = get_transactions_or_outputs_with_proof( + &mut mock_client, + start_version, + end_version, + end_version, + false, + max_num_output_reductions, + true, + ) + .await + .unwrap(); + + // Verify the response is correct + verify_transactions_or_output_response( + fallback_to_transactions, + &output_list_with_proof, + &transaction_list_with_proof, + &response, + ); + } + } +} + +#[tokio::test] +async fn test_get_transactions_or_outputs_with_proof_network_limit() { + // Test different byte limits + for network_limit_bytes in [1, 2 * 1024, 10 * 1024, 30 * 1024] { + get_transactions_or_outputs_with_proof_network_limit(network_limit_bytes).await; + } +} + +#[tokio::test] +async fn test_get_transactions_or_outputs_with_proof_invalid() { + // Create the storage client and server + let (mut mock_client, service, _) = MockClient::new(None, None); + tokio::spawn(service.start()); + + // Test invalid ranges + let start_version = 1000; + for end_version in [0, 999] { + let response = get_transactions_or_outputs_with_proof( + &mut mock_client, + start_version, + end_version, + end_version, + false, + 3, + true, + ) + .await + .unwrap_err(); + assert_matches!(response, StorageServiceError::InvalidRequest(_)); + } +} + +#[tokio::test] +async fn test_get_transactions_or_outputs_with_proof_chunk_limit() { + // Test fallback to transaction syncing + for fallback_to_transactions in [false, true] { + // Create test data + let max_output_chunk_size = + StorageServiceConfig::default().max_transaction_output_chunk_size; + let chunk_size = max_output_chunk_size * 10; // Set a chunk request larger than the max + let start_version = 0; + let end_version = start_version + max_output_chunk_size - 1; + let proof_version = end_version; + let output_list_with_proof = + create_output_list_with_proof(start_version, end_version, proof_version); + let transaction_list_with_proof = + create_transaction_list_with_proof(start_version, start_version, proof_version, false); // Creates a small transaction list + + // Create the mock db reader + let mut db_reader = create_mock_db_reader(); + expect_get_transaction_outputs( + &mut db_reader, + start_version, + max_output_chunk_size, + proof_version, + output_list_with_proof.clone(), + ); + if fallback_to_transactions { + expect_get_transactions( + &mut db_reader, + start_version, + max_output_chunk_size, + proof_version, + false, + transaction_list_with_proof.clone(), + ); + } + + // Create the storage client and server + let storage_config = configure_network_chunk_limit( + fallback_to_transactions, + &output_list_with_proof, + &transaction_list_with_proof, + ); + let (mut mock_client, service, _) = MockClient::new(Some(db_reader), Some(storage_config)); + tokio::spawn(service.start()); + + // Create a request to fetch transactions outputs with a proof + let response = get_transactions_or_outputs_with_proof( + &mut mock_client, + start_version, + start_version + chunk_size - 1, + end_version, + false, + 0, + false, + ) + .await + .unwrap(); + + // Verify the response is correct + verify_transactions_or_output_response( + fallback_to_transactions, + &output_list_with_proof, + &transaction_list_with_proof, + &response, + ); + } +} + #[tokio::test] async fn test_get_epoch_ending_ledger_infos() { // Test small and large chunk requests @@ -1585,7 +2058,7 @@ async fn get_outputs_with_proof_network_limit(network_limit_bytes: u64) { let response = get_outputs_with_proof( &mut mock_client, start_version, - start_version + (max_output_chunk_size * 10), // Request more than the max chunk, + start_version + (max_output_chunk_size * 10), // Request more than the max chunk proof_version, use_compression, ) @@ -1687,6 +2160,121 @@ async fn get_transactions_with_proof_network_limit(network_limit_bytes: u64) { } } +/// A helper method to request transactions or outputs with proof using the +/// the specified network limit. +async fn get_transactions_or_outputs_with_proof_network_limit(network_limit_bytes: u64) { + for use_compression in [true, false] { + for include_events in [true, false] { + // Create test data + let min_bytes_per_output = 2500; // 2.5 KB + let min_bytes_per_transaction = 499; // 0.5 KB + let start_version = 455; + let proof_version = 1000000; + let max_output_size = StorageServiceConfig::default().max_transaction_output_chunk_size; + let max_transaction_size = StorageServiceConfig::default().max_transaction_chunk_size; + + // Create the mock db reader + let mut db_reader = create_mock_db_reader(); + let mut expectation_sequence = Sequence::new(); + let mut chunk_size = max_output_size; + let mut max_num_output_reductions = 0; + while chunk_size >= 1 { + let output_list_with_proof = + create_output_list_using_sizes(start_version, chunk_size, min_bytes_per_output); + db_reader + .expect_get_transaction_outputs() + .times(1) + .with(eq(start_version), eq(chunk_size), eq(proof_version)) + .in_sequence(&mut expectation_sequence) + .returning(move |_, _, _| Ok(output_list_with_proof.clone())); + chunk_size /= 2; + max_num_output_reductions += 1; + } + let mut chunk_size = max_transaction_size; + while chunk_size >= 1 { + let transaction_list_with_proof = create_transaction_list_using_sizes( + start_version, + chunk_size, + min_bytes_per_transaction, + include_events, + ); + db_reader + .expect_get_transactions() + .times(1) + .with( + eq(start_version), + eq(chunk_size), + eq(proof_version), + eq(include_events), + ) + .in_sequence(&mut expectation_sequence) + .returning(move |_, _, _, _| Ok(transaction_list_with_proof.clone())); + chunk_size /= 2; + } + + // Create the storage client and server + let storage_config = StorageServiceConfig { + max_network_chunk_bytes: network_limit_bytes, + ..Default::default() + }; + let (mut mock_client, service, _) = + MockClient::new(Some(db_reader), Some(storage_config)); + tokio::spawn(service.start()); + + // Process a request to fetch transactions or outputs with a proof + let response = get_transactions_or_outputs_with_proof( + &mut mock_client, + start_version, + start_version + (max_output_size * 10), // Request more than the max chunk + proof_version, + include_events, + max_num_output_reductions, + use_compression, + ) + .await + .unwrap(); + + // Verify the response is correct + match response.get_data_response().unwrap() { + DataResponse::TransactionsOrOutputsWithProof( + transactions_or_outputs_with_proof, + ) => { + let (transactions_with_proof, outputs_with_proof) = + transactions_or_outputs_with_proof; + + if let Some(transactions_with_proof) = transactions_with_proof { + let num_response_bytes = + bcs::to_bytes(&transactions_with_proof).unwrap().len() as u64; + let num_transactions = transactions_with_proof.transactions.len() as u64; + if num_response_bytes > network_limit_bytes { + assert_eq!(num_transactions, 1); // Data cannot be reduced more than a single item + } else { + let max_transactions = network_limit_bytes / min_bytes_per_transaction; + assert!(num_transactions <= max_transactions); + } + } else if let Some(outputs_with_proof) = outputs_with_proof { + let num_response_bytes = + bcs::to_bytes(&outputs_with_proof).unwrap().len() as u64; + let num_outputs = outputs_with_proof.transactions_and_outputs.len() as u64; + if num_response_bytes > network_limit_bytes { + assert_eq!(num_outputs, 1); // Data cannot be reduced more than a single item + } else { + let max_outputs = network_limit_bytes / min_bytes_per_output; + assert!(num_outputs <= max_outputs); + } + } else { + panic!("No transactions or outputs were returned!"); + } + } + _ => panic!( + "Expected transactions or outputs with proof but got: {:?}", + response + ), + }; + } + } +} + /// Waits until the storage summary has refreshed for the first time async fn wait_for_storage_to_refresh(mock_client: &mut MockClient, mock_time: &MockTimeService) { let storage_request = StorageServiceRequest::new(DataRequest::GetStorageServerSummary, true); @@ -1758,6 +2346,27 @@ async fn get_outputs_with_proof( send_storage_request(mock_client, use_compression, data_request).await } +/// Sends a transaction or outputs with proof request and processes the response +async fn get_transactions_or_outputs_with_proof( + mock_client: &mut MockClient, + start_version: u64, + end_version: u64, + proof_version: u64, + include_events: bool, + max_num_output_reductions: u64, + use_compression: bool, +) -> Result { + let data_request = + DataRequest::GetTransactionsOrOutputsWithProof(TransactionsOrOutputsWithProofRequest { + proof_version, + start_version, + end_version, + include_events, + max_num_output_reductions, + }); + send_storage_request(mock_client, use_compression, data_request).await +} + /// Sends a state values with proof request and processes the response async fn get_state_values_with_proof( mock_client: &mut MockClient, @@ -1835,6 +2444,26 @@ async fn get_new_transactions_with_proof( mock_client.send_request(storage_request).await } +/// Creates and sends a request for new transactions or outputs +async fn get_new_transactions_or_outputs_with_proof( + mock_client: &mut MockClient, + known_version: u64, + known_epoch: u64, + include_events: bool, + max_num_output_reductions: u64, +) -> Receiver> { + let data_request = DataRequest::GetNewTransactionsOrOutputsWithProof( + NewTransactionsOrOutputsWithProofRequest { + known_version, + known_epoch, + include_events, + max_num_output_reductions, + }, + ); + let storage_request = StorageServiceRequest::new(data_request, true); + mock_client.send_request(storage_request).await +} + /// Sends the given storage request to the given client async fn send_storage_request( mock_client: &mut MockClient, @@ -1938,6 +2567,27 @@ fn expect_get_state_values_with_proof( .returning(move |_, _, _| Ok(state_value_chunk_with_proof.clone())); } +/// Creates a new storage service config with the limit +/// configured to be the size of an output list or transaction +/// list (depending on if `fallback_to_transactions` is set). +fn configure_network_chunk_limit( + fallback_to_transactions: bool, + output_list_with_proof: &TransactionOutputListWithProof, + transaction_list_with_proof: &TransactionListWithProof, +) -> StorageServiceConfig { + let max_network_chunk_bytes = if fallback_to_transactions { + // Network limit is only big enough for the transaction list + bcs::to_bytes(&transaction_list_with_proof).unwrap().len() as u64 + 1 + } else { + // Network limit is big enough for the output list + bcs::to_bytes(&output_list_with_proof).unwrap().len() as u64 + 1 + }; + StorageServiceConfig { + max_network_chunk_bytes, + ..Default::default() + } +} + /// Creates a test epoch change proof fn create_epoch_ending_ledger_infos( start_epoch: Epoch, @@ -2209,6 +2859,69 @@ async fn verify_new_transactions_with_proof( }; } +/// Verifies that a new transactions or outputs with proof response is received +/// and that the response contains the correct data. +async fn verify_new_transactions_or_outputs_with_proof( + mock_client: &mut MockClient, + receiver: Receiver>, + expected_transaction_list_with_proof: Option, + expected_output_list_with_proof: Option, + expected_ledger_info: LedgerInfoWithSignatures, +) { + let response = mock_client.wait_for_response(receiver).await.unwrap(); + match response.get_data_response().unwrap() { + DataResponse::NewTransactionsOrOutputsWithProof(( + transactions_or_outputs_with_proof, + ledger_info, + )) => { + let (transactions_with_proof, outputs_with_proof) = transactions_or_outputs_with_proof; + if let Some(transactions_with_proof) = transactions_with_proof { + assert_eq!( + transactions_with_proof, + expected_transaction_list_with_proof.unwrap() + ); + } else { + assert_eq!( + outputs_with_proof.unwrap(), + expected_output_list_with_proof.unwrap() + ); + } + assert_eq!(ledger_info, expected_ledger_info); + } + response => panic!( + "Expected new transaction outputs with proof but got: {:?}", + response + ), + }; +} + +/// Verifies that a transactions or outputs with proof response is received +/// and that the response contains the correct data. +fn verify_transactions_or_output_response( + fallback_to_transactions: bool, + output_list_with_proof: &TransactionOutputListWithProof, + transaction_list_with_proof: &TransactionListWithProof, + response: &StorageServiceResponse, +) { + match response.get_data_response().unwrap() { + DataResponse::TransactionsOrOutputsWithProof(transactions_or_outputs_with_proof) => { + let (transactions_with_proof, outputs_with_proof) = transactions_or_outputs_with_proof; + if fallback_to_transactions { + assert_eq!( + transactions_with_proof.unwrap(), + transaction_list_with_proof.clone() + ); + } else { + assert_eq!(outputs_with_proof.unwrap(), output_list_with_proof.clone()); + } + } + _ => panic!( + "Expected transactions or outputs with proof but got: {:?}", + response + ), + }; +} + /// Initializes the Aptos logger for tests pub fn initialize_logger() { aptos_logger::Logger::builder() diff --git a/state-sync/storage-service/types/src/requests.rs b/state-sync/storage-service/types/src/requests.rs index 5f8d498395b78..8c8cf03638ad4 100644 --- a/state-sync/storage-service/types/src/requests.rs +++ b/state-sync/storage-service/types/src/requests.rs @@ -42,6 +42,8 @@ pub enum DataRequest { GetStorageServerSummary, // Fetches a summary of the storage server state GetTransactionOutputsWithProof(TransactionOutputsWithProofRequest), // Fetches a list of transaction outputs with a proof GetTransactionsWithProof(TransactionsWithProofRequest), // Fetches a list of transactions with a proof + GetNewTransactionsOrOutputsWithProof(NewTransactionsOrOutputsWithProofRequest), // Subscribes to new transactions or outputs with a proof + GetTransactionsOrOutputsWithProof(TransactionsOrOutputsWithProofRequest), // Fetches a list of transactions or outputs with a proof } impl DataRequest { @@ -57,6 +59,10 @@ impl DataRequest { Self::GetStorageServerSummary => "get_storage_server_summary", Self::GetTransactionOutputsWithProof(_) => "get_transaction_outputs_with_proof", Self::GetTransactionsWithProof(_) => "get_transactions_with_proof", + Self::GetNewTransactionsOrOutputsWithProof(_) => { + "get_new_transactions_or_outputs_with_proof" + } + Self::GetTransactionsOrOutputsWithProof(_) => "get_transactions_or_outputs_with_proof", } } @@ -67,6 +73,7 @@ impl DataRequest { pub fn is_data_subscription_request(&self) -> bool { matches!(self, &Self::GetNewTransactionOutputsWithProof(_)) || matches!(self, &Self::GetNewTransactionsWithProof(_)) + || matches!(self, Self::GetNewTransactionsOrOutputsWithProof(_)) } pub fn is_protocol_version_request(&self) -> bool { @@ -125,3 +132,24 @@ pub struct TransactionsWithProofRequest { pub end_version: u64, // The ending version of the transaction list (inclusive) pub include_events: bool, // Whether or not to include events in the response } + +/// A storage service request for fetching a new transaction or output list +/// beyond the already known version and epoch. +#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] +pub struct NewTransactionsOrOutputsWithProofRequest { + pub known_version: u64, // The highest known version + pub known_epoch: u64, // The highest known epoch + pub include_events: bool, // Whether or not to include events in the response + pub max_num_output_reductions: u64, // The max num of output reductions before transactions are returned +} + +/// A storage service request for fetching a transaction list with a +/// corresponding proof or an output list with a corresponding proof. +#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] +pub struct TransactionsOrOutputsWithProofRequest { + pub proof_version: u64, // The version the proof should be relative to + pub start_version: u64, // The starting version of the transaction/output list + pub end_version: u64, // The ending version of the transaction/output list (inclusive) + pub include_events: bool, // Whether or not to include events (if transactions are returned) + pub max_num_output_reductions: u64, // The max num of output reductions before transactions are returned +} diff --git a/state-sync/storage-service/types/src/responses.rs b/state-sync/storage-service/types/src/responses.rs index 326e0d9a55049..de4f00f82ec19 100644 --- a/state-sync/storage-service/types/src/responses.rs +++ b/state-sync/storage-service/types/src/responses.rs @@ -2,9 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use crate::requests::DataRequest::{ - GetEpochEndingLedgerInfos, GetNewTransactionOutputsWithProof, GetNewTransactionsWithProof, - GetNumberOfStatesAtVersion, GetServerProtocolVersion, GetStateValuesWithProof, - GetStorageServerSummary, GetTransactionOutputsWithProof, GetTransactionsWithProof, + GetEpochEndingLedgerInfos, GetNewTransactionOutputsWithProof, + GetNewTransactionsOrOutputsWithProof, GetNewTransactionsWithProof, GetNumberOfStatesAtVersion, + GetServerProtocolVersion, GetStateValuesWithProof, GetStorageServerSummary, + GetTransactionOutputsWithProof, GetTransactionsOrOutputsWithProof, GetTransactionsWithProof, }; use crate::responses::Error::DegenerateRangeError; use crate::{Epoch, StorageServiceRequest, COMPRESSION_SUFFIX_LABEL}; @@ -106,6 +107,12 @@ impl StorageServiceResponse { } } +/// A useful type to hold optional transaction data +pub type TransactionOrOutputListWithProof = ( + Option, + Option, +); + /// A single data response. #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] #[allow(clippy::large_enum_variant)] @@ -119,6 +126,8 @@ pub enum DataResponse { StorageServerSummary(StorageServerSummary), TransactionOutputsWithProof(TransactionOutputListWithProof), TransactionsWithProof(TransactionListWithProof), + NewTransactionsOrOutputsWithProof((TransactionOrOutputListWithProof, LedgerInfoWithSignatures)), + TransactionsOrOutputsWithProof(TransactionOrOutputListWithProof), } impl DataResponse { @@ -134,6 +143,8 @@ impl DataResponse { Self::StorageServerSummary(_) => "storage_server_summary", Self::TransactionOutputsWithProof(_) => "transaction_outputs_with_proof", Self::TransactionsWithProof(_) => "transactions_with_proof", + Self::NewTransactionsOrOutputsWithProof(_) => "new_transactions_or_outputs_with_proof", + Self::TransactionsOrOutputsWithProof(_) => "transactions_or_outputs_with_proof", } } } @@ -284,6 +295,36 @@ impl TryFrom for TransactionListWithProof { } } +impl TryFrom + for (TransactionOrOutputListWithProof, LedgerInfoWithSignatures) +{ + type Error = crate::responses::Error; + fn try_from(response: StorageServiceResponse) -> crate::Result { + let data_response = response.get_data_response()?; + match data_response { + DataResponse::NewTransactionsOrOutputsWithProof(inner) => Ok(inner), + _ => Err(Error::UnexpectedResponseError(format!( + "expected new_transactions_or_outputs_with_proof, found {}", + data_response.get_label() + ))), + } + } +} + +impl TryFrom for TransactionOrOutputListWithProof { + type Error = crate::responses::Error; + fn try_from(response: StorageServiceResponse) -> crate::Result { + let data_response = response.get_data_response()?; + match data_response { + DataResponse::TransactionsOrOutputsWithProof(inner) => Ok(inner), + _ => Err(Error::UnexpectedResponseError(format!( + "expected transactions_or_outputs_with_proof, found {}", + data_response.get_label() + ))), + } + } +} + /// The protocol version run by this server. Clients request this first to /// identify what API calls and data requests the server supports. #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] @@ -322,6 +363,7 @@ impl ProtocolMetadata { match &request.data_request { GetNewTransactionsWithProof(_) | GetNewTransactionOutputsWithProof(_) + | GetNewTransactionsOrOutputsWithProof(_) | GetNumberOfStatesAtVersion(_) | GetServerProtocolVersion | GetStorageServerSummary => true, @@ -361,6 +403,16 @@ impl ProtocolMetadata { self.max_transaction_chunk_size >= chunk_size }) }), + GetTransactionsOrOutputsWithProof(request) => CompleteDataRange::new( + request.start_version, + request.end_version, + ) + .map_or(false, |range| { + range.len().map_or(false, |chunk_size| { + self.max_transaction_chunk_size >= chunk_size + && self.max_transaction_output_chunk_size >= chunk_size + }) + }), } } } @@ -481,6 +533,34 @@ impl DataSummary { can_serve_txns && can_create_proof } + GetNewTransactionsOrOutputsWithProof(request) => { + self.can_service_optimistic_request(request.known_version) + } + GetTransactionsOrOutputsWithProof(request) => { + let desired_range = + match CompleteDataRange::new(request.start_version, request.end_version) { + Ok(desired_range) => desired_range, + Err(_) => return false, + }; + + let can_serve_txns = self + .transactions + .map(|range| range.superset_of(&desired_range)) + .unwrap_or(false); + + let can_serve_outputs = self + .transaction_outputs + .map(|range| range.superset_of(&desired_range)) + .unwrap_or(false); + + let can_create_proof = self + .synced_ledger_info + .as_ref() + .map(|li| li.ledger_info().version() >= request.proof_version) + .unwrap_or(false); + + can_serve_txns && can_serve_outputs && can_create_proof + } } } diff --git a/state-sync/storage-service/types/src/tests.rs b/state-sync/storage-service/types/src/tests.rs index 2eacfc20af89f..dda1e5f177db6 100644 --- a/state-sync/storage-service/types/src/tests.rs +++ b/state-sync/storage-service/types/src/tests.rs @@ -1,6 +1,7 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 +use crate::requests::TransactionsOrOutputsWithProofRequest; use crate::{ requests::{ DataRequest, EpochEndingLedgerInfoRequest, StateValuesWithProofRequest, @@ -136,6 +137,54 @@ fn test_data_summary_can_service_txn_outputs_request() { } } +#[test] +fn test_data_summary_can_service_txns_or_outputs_request() { + let summary = DataSummary { + synced_ledger_info: Some(create_mock_ledger_info(250)), + transactions: Some(create_range(50, 200)), + transaction_outputs: Some(create_range(100, 250)), + ..Default::default() + }; + + for compression in [true, false] { + // in range (for txns and outputs) and can provide proof => can service + assert!(summary.can_service(&txns_or_outputs_request(225, 100, 200, compression))); + assert!(summary.can_service(&txns_or_outputs_request(225, 125, 175, compression))); + assert!(summary.can_service(&txns_or_outputs_request(225, 100, 100, compression))); + assert!(summary.can_service(&txns_or_outputs_request(225, 150, 150, compression))); + assert!(summary.can_service(&txns_or_outputs_request(225, 200, 200, compression))); + assert!(summary.can_service(&txns_or_outputs_request(250, 200, 200, compression))); + + // in range (for txns but not outputs) and can provide proof => cannot service + assert!(!summary.can_service(&txns_or_outputs_request(225, 51, 200, compression))); + assert!(!summary.can_service(&txns_or_outputs_request(225, 99, 100, compression))); + assert!(!summary.can_service(&txns_or_outputs_request(225, 51, 71, compression))); + + // in range (for outputs but not txns) and can provide proof => cannot service + assert!(!summary.can_service(&txns_or_outputs_request(225, 200, 202, compression))); + assert!(!summary.can_service(&txns_or_outputs_request(225, 150, 201, compression))); + assert!(!summary.can_service(&txns_or_outputs_request(225, 201, 225, compression))); + + // can provide proof, but out of range => cannot service + assert!(!summary.can_service(&txns_or_outputs_request(225, 99, 200, compression))); + assert!(!summary.can_service(&txns_or_outputs_request(225, 100, 201, compression))); + assert!(!summary.can_service(&txns_or_outputs_request(225, 50, 250, compression))); + assert!(!summary.can_service(&txns_or_outputs_request(225, 50, 150, compression))); + assert!(!summary.can_service(&txns_or_outputs_request(225, 150, 250, compression))); + + // in range, but cannot provide proof => cannot service + assert!(!summary.can_service(&txns_or_outputs_request(300, 100, 200, compression))); + assert!(!summary.can_service(&txns_or_outputs_request(300, 125, 175, compression))); + assert!(!summary.can_service(&txns_or_outputs_request(300, 100, 100, compression))); + assert!(!summary.can_service(&txns_or_outputs_request(300, 150, 150, compression))); + assert!(!summary.can_service(&txns_or_outputs_request(300, 200, 200, compression))); + assert!(!summary.can_service(&txns_or_outputs_request(251, 200, 200, compression))); + + // invalid range + assert!(!summary.can_service(&outputs_request(225, 175, 125, compression))); + } +} + #[test] fn test_data_summary_can_service_state_chunk_request() { let summary = DataSummary { @@ -246,6 +295,23 @@ fn outputs_request( StorageServiceRequest::new(data_request, use_compression) } +fn txns_or_outputs_request( + proof: Version, + start: Version, + end: Version, + use_compression: bool, +) -> StorageServiceRequest { + let data_request = + DataRequest::GetTransactionsOrOutputsWithProof(TransactionsOrOutputsWithProofRequest { + proof_version: proof, + start_version: start, + end_version: end, + include_events: true, + max_num_output_reductions: 3, + }); + StorageServiceRequest::new(data_request, use_compression) +} + fn state_values_request( version: Version, start_index: u64,