diff --git a/aptos-node/src/lib.rs b/aptos-node/src/lib.rs index 2f7acacc8c2f1..ca7123467609b 100644 --- a/aptos-node/src/lib.rs +++ b/aptos-node/src/lib.rs @@ -8,10 +8,11 @@ mod log_build_information; use anyhow::{anyhow, Context}; use aptos_api::bootstrap as bootstrap_api; use aptos_build_info::build_information; +use aptos_config::config::StateSyncConfig; use aptos_config::{ config::{ - AptosDataClientConfig, BaseConfig, DataStreamingServiceConfig, NetworkConfig, NodeConfig, - PersistableConfig, StorageServiceConfig, + AptosDataClientConfig, BaseConfig, NetworkConfig, NodeConfig, PersistableConfig, + StorageServiceConfig, }, network_id::NetworkId, utils::get_genesis_txn, @@ -412,10 +413,8 @@ fn create_state_sync_runtimes( )?; // Start the data streaming service - let (streaming_service_client, streaming_service_runtime) = setup_data_streaming_service( - node_config.state_sync.data_streaming_service, - aptos_data_client.clone(), - )?; + let (streaming_service_client, streaming_service_runtime) = + setup_data_streaming_service(node_config.state_sync.clone(), aptos_data_client.clone())?; // Create the chunk executor and persistent storage let chunk_executor = Arc::new(ChunkExecutor::::new(db_rw.clone())); @@ -446,14 +445,18 @@ fn create_state_sync_runtimes( } fn setup_data_streaming_service( - config: DataStreamingServiceConfig, + state_sync_config: StateSyncConfig, aptos_data_client: AptosNetDataClient, ) -> anyhow::Result<(StreamingServiceClient, Runtime)> { // Create the data streaming service let (streaming_service_client, streaming_service_listener) = new_streaming_service_client_listener_pair(); - let data_streaming_service = - DataStreamingService::new(config, aptos_data_client, streaming_service_listener); + let data_streaming_service = DataStreamingService::new( + state_sync_config.aptos_data_client, + state_sync_config.data_streaming_service, + aptos_data_client, + streaming_service_listener, + ); // Start the data streaming service let streaming_service_runtime = Builder::new_multi_thread() diff --git a/config/src/config/state_sync_config.rs b/config/src/config/state_sync_config.rs index eca0da935ec1d..b59474a2bf4af 100644 --- a/config/src/config/state_sync_config.rs +++ b/config/src/config/state_sync_config.rs @@ -76,13 +76,13 @@ pub struct StateSyncDriverConfig { impl Default for StateSyncDriverConfig { fn default() -> Self { Self { - bootstrapping_mode: BootstrappingMode::ExecuteTransactionsFromGenesis, + bootstrapping_mode: BootstrappingMode::ApplyTransactionOutputsFromGenesis, commit_notification_timeout_ms: 5000, - continuous_syncing_mode: ContinuousSyncingMode::ExecuteTransactions, + continuous_syncing_mode: ContinuousSyncingMode::ApplyTransactionOutputs, progress_check_interval_ms: 100, max_connection_deadline_secs: 10, max_consecutive_stream_notifications: 10, - max_num_stream_timeouts: 6, + max_num_stream_timeouts: 12, max_pending_data_chunks: 100, max_stream_wait_time_ms: 5000, num_versions_to_skip_snapshot_sync: 100_000_000, // At 5k TPS, this allows a node to fail for about 6 hours. @@ -170,9 +170,11 @@ impl Default for DataStreamingServiceConfig { pub struct AptosDataClientConfig { pub max_num_in_flight_priority_polls: u64, // Max num of in-flight polls for priority peers pub max_num_in_flight_regular_polls: u64, // Max num of in-flight polls for regular peers - pub response_timeout_ms: u64, // Timeout (in milliseconds) when waiting for a response - pub summary_poll_interval_ms: u64, // Interval (in milliseconds) between data summary polls - pub use_compression: bool, // Whether or not to request compression for incoming data + pub max_response_timeout_ms: u64, // Max timeout (in ms) when waiting for a response (after exponential increases) + pub response_timeout_ms: u64, // First timeout (in ms) when waiting for a response + pub subscription_timeout_ms: u64, // Timeout (in ms) when waiting for a subscription response + pub summary_poll_interval_ms: u64, // Interval (in ms) between data summary polls + pub use_compression: bool, // Whether or not to request compression for incoming data } impl Default for AptosDataClientConfig { @@ -180,7 +182,9 @@ impl Default for AptosDataClientConfig { Self { max_num_in_flight_priority_polls: 10, max_num_in_flight_regular_polls: 10, - response_timeout_ms: 20000, // 20 seconds + max_response_timeout_ms: 60000, // 60 seconds + response_timeout_ms: 10000, // 10 seconds + subscription_timeout_ms: 5000, // 5 seconds summary_poll_interval_ms: 200, use_compression: true, } diff --git a/network/src/protocols/rpc/mod.rs b/network/src/protocols/rpc/mod.rs index 6237fcd420cac..1f2ef749d89f5 100644 --- a/network/src/protocols/rpc/mod.rs +++ b/network/src/protocols/rpc/mod.rs @@ -548,21 +548,23 @@ impl OutboundRpcs { latency, ); } - Err(err) => { - if let RpcError::UnexpectedResponseChannelCancel = err { + Err(error) => { + if let RpcError::UnexpectedResponseChannelCancel = error { + // We don't log when the application has dropped the RPC + // response channel because this is often expected (e.g., + // on state sync subscription requests that timeout). counters::rpc_messages(network_context, REQUEST_LABEL, CANCELED_LABEL).inc(); } else { counters::rpc_messages(network_context, REQUEST_LABEL, FAILED_LABEL).inc(); + warn!( + NetworkSchema::new(network_context).remote_peer(peer_id), + "{} Error making outbound RPC request to {} (request_id {}). Error: {}", + network_context, + peer_id.short_str(), + request_id, + error + ); } - - warn!( - NetworkSchema::new(network_context).remote_peer(peer_id), - "{} Error making outbound rpc request with request_id {} to {}: {}", - network_context, - request_id, - peer_id.short_str(), - err - ); } } } diff --git a/state-sync/aptos-data-client/src/aptosnet/mod.rs b/state-sync/aptos-data-client/src/aptosnet/mod.rs index aef3026fc5881..a1c3aa12ebb43 100644 --- a/state-sync/aptos-data-client/src/aptosnet/mod.rs +++ b/state-sync/aptos-data-client/src/aptosnet/mod.rs @@ -296,6 +296,7 @@ impl AptosNetDataClient { async fn send_request_and_decode( &self, request: StorageServiceRequest, + request_timeout_ms: u64, ) -> Result> where T: TryFrom, @@ -311,7 +312,8 @@ impl AptosNetDataClient { error })?; let _timer = start_request_timer(&metrics::REQUEST_LATENCIES, &request.get_label(), peer); - self.send_request_to_peer_and_decode(peer, request).await + self.send_request_to_peer_and_decode(peer, request, request_timeout_ms) + .await } /// Sends a request to a specific peer and decodes the response @@ -319,12 +321,15 @@ impl AptosNetDataClient { &self, peer: PeerNetworkId, request: StorageServiceRequest, + request_timeout_ms: u64, ) -> Result> where T: TryFrom, E: Into, { - let response = self.send_request_to_peer(peer, request.clone()).await?; + let response = self + .send_request_to_peer(peer, request.clone(), request_timeout_ms) + .await?; let (context, storage_response) = response.into_parts(); @@ -359,9 +364,9 @@ impl AptosNetDataClient { &self, peer: PeerNetworkId, request: StorageServiceRequest, + request_timeout_ms: u64, ) -> Result, Error> { let id = self.next_response_id(); - trace!( (LogSchema::new(LogEntry::StorageServiceRequest) .event(LogEvent::SendRequest) @@ -370,18 +375,17 @@ impl AptosNetDataClient { .peer(&peer) .request_data(&request)) ); - increment_request_counter(&metrics::SENT_REQUESTS, &request.get_label(), peer); + // Send the request and process the result let result = self .network_client .send_request( peer, request.clone(), - Duration::from_millis(self.data_client_config.response_timeout_ms), + Duration::from_millis(request_timeout_ms), ) .await; - match result { Ok(response) => { trace!( @@ -476,14 +480,16 @@ impl AptosDataClient for AptosNetDataClient { &self, start_epoch: Epoch, expected_end_epoch: Epoch, + request_timeout_ms: u64, ) -> Result>> { let data_request = DataRequest::GetEpochEndingLedgerInfos(EpochEndingLedgerInfoRequest { start_epoch, expected_end_epoch, }); let storage_request = StorageServiceRequest::new(data_request, self.use_compression()); - let response: Response = - self.send_request_and_decode(storage_request).await?; + let response: Response = self + .send_request_and_decode(storage_request, request_timeout_ms) + .await?; Ok(response.map(|epoch_change| epoch_change.ledger_info_with_sigs)) } @@ -491,6 +497,7 @@ impl AptosDataClient for AptosNetDataClient { &self, known_version: Version, known_epoch: Epoch, + request_timeout_ms: u64, ) -> Result> { let data_request = DataRequest::GetNewTransactionOutputsWithProof(NewTransactionOutputsWithProofRequest { @@ -498,7 +505,8 @@ impl AptosDataClient for AptosNetDataClient { known_epoch, }); let storage_request = StorageServiceRequest::new(data_request, self.use_compression()); - self.send_request_and_decode(storage_request).await + self.send_request_and_decode(storage_request, request_timeout_ms) + .await } async fn get_new_transactions_with_proof( @@ -506,6 +514,7 @@ impl AptosDataClient for AptosNetDataClient { known_version: Version, known_epoch: Epoch, include_events: bool, + request_timeout_ms: u64, ) -> Result> { let data_request = DataRequest::GetNewTransactionsWithProof(NewTransactionsWithProofRequest { @@ -514,13 +523,19 @@ impl AptosDataClient for AptosNetDataClient { include_events, }); let storage_request = StorageServiceRequest::new(data_request, self.use_compression()); - self.send_request_and_decode(storage_request).await + self.send_request_and_decode(storage_request, request_timeout_ms) + .await } - async fn get_number_of_states(&self, version: Version) -> Result> { + async fn get_number_of_states( + &self, + version: Version, + request_timeout_ms: u64, + ) -> Result> { let data_request = DataRequest::GetNumberOfStatesAtVersion(version); let storage_request = StorageServiceRequest::new(data_request, self.use_compression()); - self.send_request_and_decode(storage_request).await + self.send_request_and_decode(storage_request, request_timeout_ms) + .await } async fn get_state_values_with_proof( @@ -528,6 +543,7 @@ impl AptosDataClient for AptosNetDataClient { version: u64, start_index: u64, end_index: u64, + request_timeout_ms: u64, ) -> Result> { let data_request = DataRequest::GetStateValuesWithProof(StateValuesWithProofRequest { version, @@ -535,7 +551,8 @@ impl AptosDataClient for AptosNetDataClient { end_index, }); let storage_request = StorageServiceRequest::new(data_request, self.use_compression()); - self.send_request_and_decode(storage_request).await + self.send_request_and_decode(storage_request, request_timeout_ms) + .await } async fn get_transaction_outputs_with_proof( @@ -543,6 +560,7 @@ impl AptosDataClient for AptosNetDataClient { proof_version: Version, start_version: Version, end_version: Version, + request_timeout_ms: u64, ) -> Result> { let data_request = DataRequest::GetTransactionOutputsWithProof(TransactionOutputsWithProofRequest { @@ -551,7 +569,8 @@ impl AptosDataClient for AptosNetDataClient { end_version, }); let storage_request = StorageServiceRequest::new(data_request, self.use_compression()); - self.send_request_and_decode(storage_request).await + self.send_request_and_decode(storage_request, request_timeout_ms) + .await } async fn get_transactions_with_proof( @@ -560,6 +579,7 @@ impl AptosDataClient for AptosNetDataClient { start_version: Version, end_version: Version, include_events: bool, + request_timeout_ms: u64, ) -> Result> { let data_request = DataRequest::GetTransactionsWithProof(TransactionsWithProofRequest { proof_version, @@ -568,7 +588,8 @@ impl AptosDataClient for AptosNetDataClient { include_events, }); let storage_request = StorageServiceRequest::new(data_request, self.use_compression()); - self.send_request_and_decode(storage_request).await + self.send_request_and_decode(storage_request, request_timeout_ms) + .await } } @@ -763,6 +784,7 @@ pub(crate) fn poll_peer( let data_request = DataRequest::GetStorageServerSummary; let storage_request = StorageServiceRequest::new(data_request, data_client.use_compression()); + let request_timeout = data_client.data_client_config.response_timeout_ms; // Start the peer polling timer let timer = start_request_timer( @@ -773,7 +795,7 @@ pub(crate) fn poll_peer( // Fetch the storage summary for the peer and stop the timer let result: Result = data_client - .send_request_to_peer_and_decode(peer, storage_request) + .send_request_to_peer_and_decode(peer, storage_request, request_timeout) .await .map(Response::into_payload); drop(timer); diff --git a/state-sync/aptos-data-client/src/aptosnet/tests.rs b/state-sync/aptos-data-client/src/aptosnet/tests.rs index 1f7d2cdb2bfbb..f0c770664545b 100644 --- a/state-sync/aptos-data-client/src/aptosnet/tests.rs +++ b/state-sync/aptos-data-client/src/aptosnet/tests.rs @@ -208,8 +208,9 @@ async fn request_works_only_when_data_available() { tokio::spawn(poller.start_poller()); // This request should fail because no peers are currently connected + let request_timeout = client.data_client_config.response_timeout_ms; let error = client - .get_transactions_with_proof(100, 50, 100, false) + .get_transactions_with_proof(100, 50, 100, false, request_timeout) .await .unwrap_err(); assert_matches!(error, Error::DataIsUnavailable(_)); @@ -220,7 +221,7 @@ async fn request_works_only_when_data_available() { // Requesting some txns now will still fail since no peers are advertising // availability for the desired range. let error = client - .get_transactions_with_proof(100, 50, 100, false) + .get_transactions_with_proof(100, 50, 100, false, request_timeout) .await .unwrap_err(); assert_matches!(error, Error::DataIsUnavailable(_)); @@ -268,7 +269,7 @@ async fn request_works_only_when_data_available() { // The client's request should succeed since a peer finally has advertised // data for this range. let response = client - .get_transactions_with_proof(100, 50, 100, false) + .get_transactions_with_proof(100, 50, 100, false, request_timeout) .await .unwrap(); assert_eq!(response.payload, TransactionListWithProof::new_empty()); @@ -992,9 +993,10 @@ async fn bad_peer_is_eventually_banned_internal() { let mut seen_data_unavailable_err = false; // Sending a bunch of requests to the bad peer's upper range will fail. + let request_timeout = client.data_client_config.response_timeout_ms; for _ in 0..20 { let result = client - .get_transactions_with_proof(200, 200, 200, false) + .get_transactions_with_proof(200, 200, 200, false, request_timeout) .await; // While the score is still decreasing, we should see a bunch of @@ -1024,7 +1026,7 @@ async fn bad_peer_is_eventually_banned_internal() { // We should still be able to send the good peer a request. let response = client - .get_transactions_with_proof(100, 50, 100, false) + .get_transactions_with_proof(100, 50, 100, false, request_timeout) .await .unwrap(); assert_eq!(response.payload, TransactionListWithProof::new_empty()); @@ -1054,9 +1056,10 @@ async fn bad_peer_is_eventually_banned_callback() { let mut seen_data_unavailable_err = false; // Sending a bunch of requests to the bad peer (that we later decide are bad). + let request_timeout = client.data_client_config.response_timeout_ms; for _ in 0..20 { let result = client - .get_transactions_with_proof(200, 200, 200, false) + .get_transactions_with_proof(200, 200, 200, false, request_timeout) .await; // While the score is still decreasing, we should see a bunch of @@ -1137,8 +1140,9 @@ async fn compression_mismatch_disabled() { }); // The client should receive a compressed response and return an error + let request_timeout = client.data_client_config.response_timeout_ms; let response = client - .get_transactions_with_proof(100, 50, 100, false) + .get_transactions_with_proof(100, 50, 100, false, request_timeout) .await .unwrap_err(); assert_matches!(response, Error::InvalidResponse(_)); @@ -1186,8 +1190,9 @@ async fn compression_mismatch_enabled() { }); // The client should receive a compressed response and return an error + let request_timeout = client.data_client_config.response_timeout_ms; let response = client - .get_transactions_with_proof(100, 50, 100, false) + .get_transactions_with_proof(100, 50, 100, false, request_timeout) .await .unwrap_err(); assert_matches!(response, Error::InvalidResponse(_)); @@ -1255,8 +1260,9 @@ async fn disable_compression() { // The client's request should succeed since a peer finally has advertised // data for this range. + let request_timeout = client.data_client_config.response_timeout_ms; let response = client - .get_transactions_with_proof(100, 50, 100, false) + .get_transactions_with_proof(100, 50, 100, false, request_timeout) .await .unwrap(); assert_eq!(response.payload, TransactionListWithProof::new_empty()); @@ -1313,8 +1319,11 @@ async fn bad_peer_is_eventually_added_back() { // Keep decreasing this peer's score by considering its responses bad. // Eventually its score drops below IGNORE_PEER_THRESHOLD. + let request_timeout = client.data_client_config.response_timeout_ms; for _ in 0..20 { - let result = client.get_transactions_with_proof(200, 0, 200, false).await; + let result = client + .get_transactions_with_proof(200, 0, 200, false, request_timeout) + .await; if let Ok(response) = result { response diff --git a/state-sync/aptos-data-client/src/lib.rs b/state-sync/aptos-data-client/src/lib.rs index ee7e4ba3a2044..e95f68513713f 100644 --- a/state-sync/aptos-data-client/src/lib.rs +++ b/state-sync/aptos-data-client/src/lib.rs @@ -76,6 +76,7 @@ pub trait AptosDataClient { &self, start_epoch: Epoch, expected_end_epoch: Epoch, + request_timeout_ms: u64, ) -> Result>>; /// Fetches a new transaction output list with proof. Versions start at @@ -86,6 +87,7 @@ pub trait AptosDataClient { &self, known_version: Version, known_epoch: Epoch, + request_timeout_ms: u64, ) -> Result>; /// Fetches a new transaction list with proof. Versions start at @@ -97,10 +99,15 @@ pub trait AptosDataClient { known_version: Version, known_epoch: Epoch, include_events: bool, + request_timeout_ms: u64, ) -> Result>; /// Fetches the number of states at the specified version. - async fn get_number_of_states(&self, version: Version) -> Result>; + async fn get_number_of_states( + &self, + version: Version, + request_timeout_ms: u64, + ) -> Result>; /// Fetches a single state value chunk with proof, containing the values /// from start to end index (inclusive) at the specified version. The proof @@ -112,6 +119,7 @@ pub trait AptosDataClient { version: u64, start_index: u64, end_index: u64, + request_timeout_ms: u64, ) -> Result>; /// Fetches a transaction output list with proof, with transaction @@ -124,6 +132,7 @@ pub trait AptosDataClient { proof_version: Version, start_version: Version, end_version: Version, + request_timeout_ms: u64, ) -> Result>; /// Fetches a transaction list with proof, with transactions from @@ -138,6 +147,7 @@ pub trait AptosDataClient { start_version: Version, end_version: Version, include_events: bool, + request_timeout_ms: u64, ) -> Result>; } diff --git a/state-sync/state-sync-v2/data-streaming-service/src/data_stream.rs b/state-sync/state-sync-v2/data-streaming-service/src/data_stream.rs index f003fb7d542ab..81052c2700164 100644 --- a/state-sync/state-sync-v2/data-streaming-service/src/data_stream.rs +++ b/state-sync/state-sync-v2/data-streaming-service/src/data_stream.rs @@ -1,6 +1,7 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 +use crate::metrics::increment_counter_multiple; use crate::{ data_notification, data_notification::{ @@ -16,7 +17,7 @@ use crate::{ stream_engine::{DataStreamEngine, StreamEngine}, streaming_client::{NotificationFeedback, StreamRequest}, }; -use aptos_config::config::DataStreamingServiceConfig; +use aptos_config::config::{AptosDataClientConfig, DataStreamingServiceConfig}; use aptos_data_client::{ AdvertisedData, AptosDataClient, GlobalDataSummary, Response, ResponseContext, ResponseError, ResponsePayload, @@ -26,6 +27,7 @@ use aptos_infallible::Mutex; use aptos_logger::prelude::*; use futures::channel::mpsc; use futures::{stream::FusedStream, SinkExt, Stream}; +use std::cmp::min; use std::{ collections::{BTreeMap, VecDeque}, pin::Pin, @@ -54,8 +56,11 @@ pub type PendingClientResponse = Arc { - // The configuration for this data stream - config: DataStreamingServiceConfig, + // The configuration for the data client + data_client_config: AptosDataClientConfig, + + // The configuration for the streaming service + streaming_service_config: DataStreamingServiceConfig, // The unique ID for this data stream. This is useful for logging. data_stream_id: DataStreamId, @@ -100,7 +105,8 @@ pub struct DataStream { impl DataStream { pub fn new( - config: DataStreamingServiceConfig, + data_client_config: AptosDataClientConfig, + data_stream_config: DataStreamingServiceConfig, data_stream_id: DataStreamId, stream_request: &StreamRequest, aptos_data_client: T, @@ -109,7 +115,7 @@ impl DataStream { ) -> Result<(Self, DataStreamListener), Error> { // Create a new data stream listener let (notification_sender, notification_receiver) = - mpsc::channel(config.max_data_stream_channel_sizes as usize); + mpsc::channel(data_stream_config.max_data_stream_channel_sizes as usize); let data_stream_listener = DataStreamListener::new(data_stream_id, notification_receiver); // Create a new stream engine @@ -117,7 +123,8 @@ impl DataStream { // Create a new data stream let data_stream = Self { - config, + data_client_config, + streaming_service_config: data_stream_config, data_stream_id, aptos_data_client, stream_engine, @@ -200,8 +207,10 @@ impl DataStream { /// at any given time. fn get_max_concurrent_requests(&self) -> u64 { match self.stream_engine { - StreamEngine::StateStreamEngine(_) => self.config.max_concurrent_state_requests, - _ => self.config.max_concurrent_requests, + StreamEngine::StateStreamEngine(_) => { + self.streaming_service_config.max_concurrent_state_requests + } + _ => self.streaming_service_config.max_concurrent_requests, } } @@ -225,7 +234,8 @@ impl DataStream { .create_data_client_requests(max_num_requests_to_send, global_data_summary)?; for client_request in &client_requests { // Send the client request - let pending_client_response = self.send_client_request(client_request.clone()); + let pending_client_response = + self.send_client_request(false, client_request.clone()); // Enqueue the pending response self.get_sent_data_requests() @@ -249,9 +259,11 @@ impl DataStream { } /// Sends a given request to the data client to be forwarded to the network - /// and returns a pending client response. + /// and returns a pending client response. If `request_retry` is true + /// exponential backoff takes affect (i.e., to increase the request timeout). fn send_client_request( &mut self, + request_retry: bool, data_client_request: DataClientRequest, ) -> PendingClientResponse { // Create a new pending client response @@ -262,11 +274,48 @@ impl DataStream { }, ))); + // Calculate the request timeout to use, based on the + // request type and the number of previous failures. + let request_timeout_ms = if is_subscription_request(&data_client_request) { + self.data_client_config.subscription_timeout_ms + } else if !request_retry { + self.data_client_config.response_timeout_ms + } else { + let response_timeout_ms = self.data_client_config.response_timeout_ms; + let max_response_timeout_ms = self.data_client_config.max_response_timeout_ms; + + // Exponentially increase the timeout based on the number of + // previous failures (but bounded by the max timeout). + let request_timeout_ms = min( + max_response_timeout_ms, + response_timeout_ms * (u32::pow(2, self.request_failure_count as u32) as u64), + ); + + // Update the retry counter and log the request + increment_counter_multiple( + &metrics::RETRIED_DATA_REQUESTS, + data_client_request.get_label(), + &request_timeout_ms.to_string(), + ); + info!( + (LogSchema::new(LogEntry::RetryDataRequest) + .stream_id(self.data_stream_id) + .message(&format!( + "Retrying data request type: {:?}, with new timeout: {:?} (ms)", + data_client_request.get_label(), + request_timeout_ms.to_string() + ))) + ); + + request_timeout_ms + }; + // Send the request to the network let join_handle = spawn_request_task( data_client_request, self.aptos_data_client.clone(), pending_client_response.clone(), + request_timeout_ms, ); self.spawned_tasks.push(join_handle); @@ -326,7 +375,7 @@ impl DataStream { global_data_summary: GlobalDataSummary, ) -> Result<(), Error> { if self.stream_engine.is_stream_complete() - || self.request_failure_count >= self.config.max_request_retry + || self.request_failure_count >= self.streaming_service_config.max_request_retry || self.send_failure { if !self.send_failure && self.stream_end_notification_id.is_none() { @@ -359,7 +408,18 @@ impl DataStream { } } Err(error) => { - self.handle_data_client_error(client_request, &error)?; + // If the error was a timeout and the request was a subscription request + // we need to notify the stream engine and not retry the request. + if matches!( + error, + aptos_data_client::Error::TimeoutWaitingForResponse(_) + ) && is_subscription_request(client_request) + { + self.stream_engine + .notify_subscription_timeout(client_request)?; + } else { + self.handle_data_client_error(client_request, &error)?; + }; break; } } @@ -430,7 +490,7 @@ impl DataStream { self.request_failure_count += 1; // Resend the client request - let pending_client_response = self.send_client_request(data_client_request.clone()); + let pending_client_response = self.send_client_request(true, data_client_request.clone()); // Push the pending response to the head of the sent requests queue self.get_sent_data_requests() @@ -519,7 +579,8 @@ impl DataStream { } fn garbage_collect_notification_response_map(&mut self) -> Result<(), Error> { - let max_notification_id_mappings = self.config.max_notification_id_mappings; + let max_notification_id_mappings = + self.streaming_service_config.max_notification_id_mappings; let map_length = self.notifications_to_responses.len() as u64; if map_length > max_notification_id_mappings { let num_entries_to_remove = map_length @@ -714,6 +775,7 @@ fn spawn_request_task( data_client_request: DataClientRequest, aptos_data_client: T, pending_response: PendingClientResponse, + request_timeout_ms: u64, ) -> JoinHandle<()> { // Update the requests sent counter increment_counter( @@ -732,25 +794,32 @@ fn spawn_request_task( // Fetch the client response let client_response = match data_client_request { DataClientRequest::EpochEndingLedgerInfos(request) => { - get_epoch_ending_ledger_infos(aptos_data_client, request).await + get_epoch_ending_ledger_infos(aptos_data_client, request, request_timeout_ms).await } DataClientRequest::NewTransactionsWithProof(request) => { - get_new_transactions_with_proof(aptos_data_client, request).await + get_new_transactions_with_proof(aptos_data_client, request, request_timeout_ms) + .await } DataClientRequest::NewTransactionOutputsWithProof(request) => { - get_new_transaction_outputs_with_proof(aptos_data_client, request).await + get_new_transaction_outputs_with_proof( + aptos_data_client, + request, + request_timeout_ms, + ) + .await } DataClientRequest::NumberOfStates(request) => { - get_number_of_states(aptos_data_client, request).await + get_number_of_states(aptos_data_client, request, request_timeout_ms).await } DataClientRequest::StateValuesWithProof(request) => { - get_states_values_with_proof(aptos_data_client, request).await + get_states_values_with_proof(aptos_data_client, request, request_timeout_ms).await } DataClientRequest::TransactionOutputsWithProof(request) => { - get_transaction_outputs_with_proof(aptos_data_client, request).await + get_transaction_outputs_with_proof(aptos_data_client, request, request_timeout_ms) + .await } DataClientRequest::TransactionsWithProof(request) => { - get_transactions_with_proof(aptos_data_client, request).await + get_transactions_with_proof(aptos_data_client, request, request_timeout_ms).await } }; @@ -775,11 +844,13 @@ fn spawn_request_task( async fn get_states_values_with_proof( aptos_data_client: T, request: StateValuesWithProofRequest, + request_timeout_ms: u64, ) -> Result, aptos_data_client::Error> { let client_response = aptos_data_client.get_state_values_with_proof( request.version, request.start_index, request.end_index, + request_timeout_ms, ); client_response .await @@ -789,9 +860,13 @@ async fn get_states_values_with_proof( aptos_data_client: T, request: EpochEndingLedgerInfosRequest, + request_timeout_ms: u64, ) -> Result, aptos_data_client::Error> { - let client_response = - aptos_data_client.get_epoch_ending_ledger_infos(request.start_epoch, request.end_epoch); + let client_response = aptos_data_client.get_epoch_ending_ledger_infos( + request.start_epoch, + request.end_epoch, + request_timeout_ms, + ); client_response .await .map(|response| response.map(ResponsePayload::from)) @@ -800,9 +875,13 @@ async fn get_epoch_ending_ledger_infos( aptos_data_client: T, request: NewTransactionOutputsWithProofRequest, + request_timeout_ms: u64, ) -> Result, aptos_data_client::Error> { - let client_response = aptos_data_client - .get_new_transaction_outputs_with_proof(request.known_version, request.known_epoch); + let client_response = aptos_data_client.get_new_transaction_outputs_with_proof( + request.known_version, + request.known_epoch, + request_timeout_ms, + ); client_response .await .map(|response| response.map(ResponsePayload::from)) @@ -811,11 +890,13 @@ async fn get_new_transaction_outputs_with_proof( aptos_data_client: T, request: NewTransactionsWithProofRequest, + request_timeout_ms: u64, ) -> Result, aptos_data_client::Error> { let client_response = aptos_data_client.get_new_transactions_with_proof( request.known_version, request.known_epoch, request.include_events, + request_timeout_ms, ); client_response .await @@ -825,8 +906,10 @@ async fn get_new_transactions_with_proof( aptos_data_client: T, request: NumberOfStatesRequest, + request_timeout_ms: u64, ) -> Result, aptos_data_client::Error> { - let client_response = aptos_data_client.get_number_of_states(request.version); + let client_response = + aptos_data_client.get_number_of_states(request.version, request_timeout_ms); client_response .await .map(|response| response.map(ResponsePayload::from)) @@ -835,11 +918,13 @@ async fn get_number_of_states( async fn get_transaction_outputs_with_proof( aptos_data_client: T, request: TransactionOutputsWithProofRequest, + request_timeout_ms: u64, ) -> Result, aptos_data_client::Error> { let client_response = aptos_data_client.get_transaction_outputs_with_proof( request.proof_version, request.start_version, request.end_version, + request_timeout_ms, ); client_response .await @@ -849,14 +934,25 @@ async fn get_transaction_outputs_with_proof( aptos_data_client: T, request: TransactionsWithProofRequest, + request_timeout_ms: u64, ) -> Result, aptos_data_client::Error> { let client_response = aptos_data_client.get_transactions_with_proof( request.proof_version, request.start_version, request.end_version, request.include_events, + request_timeout_ms, ); client_response .await .map(|response| response.map(ResponsePayload::from)) } + +/// Returns true iff the given request is a subscription request +fn is_subscription_request(request: &DataClientRequest) -> bool { + matches!(request, DataClientRequest::NewTransactionsWithProof(_)) + || matches!( + request, + DataClientRequest::NewTransactionOutputsWithProof(_) + ) +} diff --git a/state-sync/state-sync-v2/data-streaming-service/src/logging.rs b/state-sync/state-sync-v2/data-streaming-service/src/logging.rs index 692aa01cb4c0e..bd89f7c86740a 100644 --- a/state-sync/state-sync-v2/data-streaming-service/src/logging.rs +++ b/state-sync/state-sync-v2/data-streaming-service/src/logging.rs @@ -37,7 +37,9 @@ pub enum LogEntry { InitializeStream, ReceivedDataResponse, RefreshGlobalData, + RequestTimeout, RespondToStreamRequest, + RetryDataRequest, SendDataRequests, StreamNotification, TerminateStream, diff --git a/state-sync/state-sync-v2/data-streaming-service/src/metrics.rs b/state-sync/state-sync-v2/data-streaming-service/src/metrics.rs index 90a37d3f9b6a3..c09bf6bb6ba2b 100644 --- a/state-sync/state-sync-v2/data-streaming-service/src/metrics.rs +++ b/state-sync/state-sync-v2/data-streaming-service/src/metrics.rs @@ -75,6 +75,17 @@ pub static SENT_DATA_REQUESTS: Lazy = Lazy::new(|| { .unwrap() }); +/// Counter for tracking data requests that were retried (including +/// the new timeouts). +pub static RETRIED_DATA_REQUESTS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "aptos_data_streaming_service_retried_data_requests", + "Counters related to retried data requests", + &["request_type", "request_timeout"] + ) + .unwrap() +}); + /// Counter for tracking received data responses pub static RECEIVED_DATA_RESPONSE: Lazy = Lazy::new(|| { register_int_counter_vec!( @@ -105,11 +116,22 @@ pub static DATA_REQUEST_PROCESSING_LATENCY: Lazy = Lazy::new(|| { .unwrap() }); -/// Increments the given counter with the provided label values. +/// Increments the given counter with the single label value. pub fn increment_counter(counter: &Lazy, label: &str) { counter.with_label_values(&[label]).inc(); } +/// Increments the given counter with two label values. +pub fn increment_counter_multiple( + counter: &Lazy, + first_label: &str, + second_label: &str, +) { + counter + .with_label_values(&[first_label, second_label]) + .inc(); +} + /// Sets the number of active data streams pub fn set_active_data_streams(value: usize) { ACTIVE_DATA_STREAMS.set(value as i64); diff --git a/state-sync/state-sync-v2/data-streaming-service/src/stream_engine.rs b/state-sync/state-sync-v2/data-streaming-service/src/stream_engine.rs index d83c8c36d733d..62a268761b9c0 100644 --- a/state-sync/state-sync-v2/data-streaming-service/src/stream_engine.rs +++ b/state-sync/state-sync-v2/data-streaming-service/src/stream_engine.rs @@ -72,6 +72,19 @@ pub trait DataStreamEngine { /// Returns true iff the stream has sent all data to the stream listener. fn is_stream_complete(&self) -> bool; + /// Notifies the data stream engine that a timeout was encountered when + /// trying to send the subscription request. + /// + /// Note: Most engines shouldn't process these notifications, so a default + /// implementation that returns an error is provided. If a subscription request + /// does exist, there should only ever be a single request in-flight. + fn notify_subscription_timeout( + &mut self, + client_request: &DataClientRequest, + ) -> Result<(), Error> { + Err(Error::UnexpectedErrorEncountered(format!("Received a subscription request timeout but no subscription request was sent! Reported request: {:?}", client_request))) + } + /// Transforms a given data client response (for the previously sent /// request) into a data notification to be sent along the data stream. /// Note: this call may return `None`, in which case, no notification needs @@ -798,6 +811,48 @@ impl DataStreamEngine for ContinuousTransactionStreamEngine { self.stream_is_complete } + fn notify_subscription_timeout( + &mut self, + client_request: &DataClientRequest, + ) -> Result<(), Error> { + if !self.subscription_requested { + return Err(Error::UnexpectedErrorEncountered(format!( + "Received a subscription timeout but no request is in-flight! Request: {:?}", + client_request + ))); + } + + // Reset the subscription request and handle the timeout + self.subscription_requested = false; + if matches!( + self.request, + StreamRequest::ContinuouslyStreamTransactions(_) + ) && matches!( + client_request, + DataClientRequest::NewTransactionsWithProof(_) + ) { + info!( + (LogSchema::new(LogEntry::RequestTimeout) + .message("Subscription request for new transactions timed out!")) + ); + } else if matches!( + self.request, + StreamRequest::ContinuouslyStreamTransactionOutputs(_) + ) && matches!( + client_request, + DataClientRequest::NewTransactionOutputsWithProof(_) + ) { + info!( + (LogSchema::new(LogEntry::RequestTimeout) + .message("Subscription request for new transaction outputs timed out!")) + ); + } else { + return Err(Error::UnexpectedErrorEncountered(format!("Received a subscription request timeout but the request did not match the expected type for the stream! Request: {:?}, Stream: {:?}", client_request, self.request))); + } + + Ok(()) + } + fn transform_client_response_into_notification( &mut self, client_request: &DataClientRequest, diff --git a/state-sync/state-sync-v2/data-streaming-service/src/streaming_service.rs b/state-sync/state-sync-v2/data-streaming-service/src/streaming_service.rs index a19d19a446f60..d564f5844587a 100644 --- a/state-sync/state-sync-v2/data-streaming-service/src/streaming_service.rs +++ b/state-sync/state-sync-v2/data-streaming-service/src/streaming_service.rs @@ -10,7 +10,7 @@ use crate::{ StreamRequest, StreamRequestMessage, StreamingServiceListener, TerminateStreamRequest, }, }; -use aptos_config::config::DataStreamingServiceConfig; +use aptos_config::config::{AptosDataClientConfig, DataStreamingServiceConfig}; use aptos_data_client::{AptosDataClient, GlobalDataSummary, OptimalChunkSizes}; use aptos_id_generator::{IdGenerator, U64IdGenerator}; use aptos_logger::prelude::*; @@ -27,8 +27,11 @@ const TERMINATE_NO_FEEDBACK: &str = "no_feedback"; /// The data streaming service that responds to data stream requests. pub struct DataStreamingService { - // The configuration for this streaming service. - config: DataStreamingServiceConfig, + // The configuration for the data client + data_client_config: AptosDataClientConfig, + + // The configuration for the streaming service + streaming_service_config: DataStreamingServiceConfig, // The data client through which to fetch data from the Aptos network aptos_data_client: T, @@ -49,12 +52,14 @@ pub struct DataStreamingService { impl DataStreamingService { pub fn new( - config: DataStreamingServiceConfig, + data_client_config: AptosDataClientConfig, + streaming_service_config: DataStreamingServiceConfig, aptos_data_client: T, stream_requests: StreamingServiceListener, ) -> Self { Self { - config, + data_client_config, + streaming_service_config, aptos_data_client, global_data_summary: GlobalDataSummary::empty(), data_streams: HashMap::new(), @@ -67,11 +72,12 @@ impl DataStreamingService { /// Starts the dedicated streaming service pub async fn start_service(mut self) { let mut data_refresh_interval = IntervalStream::new(interval(Duration::from_millis( - self.config.global_summary_refresh_interval_ms, + self.streaming_service_config + .global_summary_refresh_interval_ms, ))) .fuse(); let mut progress_check_interval = IntervalStream::new(interval(Duration::from_millis( - self.config.progress_check_interval_ms, + self.streaming_service_config.progress_check_interval_ms, ))) .fuse(); @@ -195,7 +201,8 @@ impl DataStreamingService { // Create a new data stream let stream_id = self.stream_id_generator.next(); let (data_stream, stream_listener) = DataStream::new( - self.config, + self.data_client_config, + self.streaming_service_config, stream_id, &request_message.stream_request, self.aptos_data_client.clone(), diff --git a/state-sync/state-sync-v2/data-streaming-service/src/tests/data_stream.rs b/state-sync/state-sync-v2/data-streaming-service/src/tests/data_stream.rs index 9e75de8ee41f4..9b9f904bf7aff 100644 --- a/state-sync/state-sync-v2/data-streaming-service/src/tests/data_stream.rs +++ b/state-sync/state-sync-v2/data-streaming-service/src/tests/data_stream.rs @@ -1,6 +1,18 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 +use crate::data_notification::{ + NewTransactionOutputsWithProofRequest, NewTransactionsWithProofRequest, + TransactionOutputsWithProofRequest, TransactionsWithProofRequest, +}; +use crate::streaming_client::{ + ContinuouslyStreamTransactionOutputsRequest, ContinuouslyStreamTransactionsRequest, + GetAllTransactionOutputsRequest, +}; +use crate::tests::utils::{ + create_output_list_with_proof, MAX_ADVERTISED_TRANSACTION, MAX_NOTIFICATION_TIMEOUT_SECS, + MIN_ADVERTISED_TRANSACTION, +}; use crate::{ data_notification::{ DataClientRequest, DataPayload, EpochEndingLedgerInfosRequest, PendingClientResponse, @@ -14,11 +26,11 @@ use crate::{ create_data_client_response, create_ledger_info, create_random_u64, create_transaction_list_with_proof, get_data_notification, initialize_logger, MockAptosDataClient, NoopResponseCallback, MAX_ADVERTISED_EPOCH_END, MAX_ADVERTISED_STATES, - MAX_ADVERTISED_TRANSACTION_OUTPUT, MAX_NOTIFICATION_TIMEOUT_SECS, MIN_ADVERTISED_EPOCH_END, - MIN_ADVERTISED_STATES, MIN_ADVERTISED_TRANSACTION_OUTPUT, + MAX_ADVERTISED_TRANSACTION_OUTPUT, MIN_ADVERTISED_EPOCH_END, MIN_ADVERTISED_STATES, + MIN_ADVERTISED_TRANSACTION_OUTPUT, }, }; -use aptos_config::config::DataStreamingServiceConfig; +use aptos_config::config::{AptosDataClientConfig, DataStreamingServiceConfig}; use aptos_data_client::{ AdvertisedData, GlobalDataSummary, OptimalChunkSizes, Response, ResponseContext, ResponsePayload, @@ -39,14 +51,15 @@ use tokio::time::timeout; async fn test_stream_blocked() { // Create a state value stream let streaming_service_config = DataStreamingServiceConfig::default(); - let (mut data_stream, mut stream_listener) = - create_state_value_stream(streaming_service_config, MIN_ADVERTISED_STATES); + let (mut data_stream, mut stream_listener) = create_state_value_stream( + AptosDataClientConfig::default(), + streaming_service_config, + MIN_ADVERTISED_STATES, + ); // Initialize the data stream let global_data_summary = create_global_data_summary(100); - data_stream - .initialize_data_requests(global_data_summary.clone()) - .unwrap(); + initialize_data_requests(&mut data_stream, &global_data_summary); let mut number_of_refetches = 0; loop { @@ -70,10 +83,7 @@ async fn test_stream_blocked() { insert_response_into_pending_queue(&mut data_stream, pending_response); // Process the data responses and force a data re-fetch - data_stream - .process_data_responses(global_data_summary.clone()) - .await - .unwrap(); + process_data_responses(&mut data_stream, &global_data_summary).await; // If we're sent a data notification, verify it's an end of stream notification! if let Ok(data_notification) = timeout( @@ -114,6 +124,7 @@ async fn test_stream_garbage_collection() { // Create a transaction stream let streaming_service_config = DataStreamingServiceConfig::default(); let (mut data_stream, mut stream_listener) = create_transaction_stream( + AptosDataClientConfig::default(), streaming_service_config, MIN_ADVERTISED_TRANSACTION_OUTPUT, MAX_ADVERTISED_TRANSACTION_OUTPUT, @@ -121,19 +132,14 @@ async fn test_stream_garbage_collection() { // Initialize the data stream let global_data_summary = create_global_data_summary(1); - data_stream - .initialize_data_requests(global_data_summary.clone()) - .unwrap(); + initialize_data_requests(&mut data_stream, &global_data_summary); loop { // Insert a transaction response into the queue set_transaction_response_at_queue_head(&mut data_stream); // Process the data response - data_stream - .process_data_responses(global_data_summary.clone()) - .await - .unwrap(); + process_data_responses(&mut data_stream, &global_data_summary).await; // Process the data response let data_notification = get_data_notification(&mut stream_listener).await.unwrap(); @@ -154,16 +160,18 @@ async fn test_stream_garbage_collection() { async fn test_stream_initialization() { // Create an epoch ending data stream let streaming_service_config = DataStreamingServiceConfig::default(); - let (mut data_stream, _) = - create_epoch_ending_stream(streaming_service_config, MIN_ADVERTISED_EPOCH_END); + let (mut data_stream, _) = create_epoch_ending_stream( + AptosDataClientConfig::default(), + streaming_service_config, + MIN_ADVERTISED_EPOCH_END, + ); // Verify the data stream is not initialized assert!(!data_stream.data_requests_initialized()); // Initialize the data stream - data_stream - .initialize_data_requests(create_global_data_summary(100)) - .unwrap(); + let global_data_summary = create_global_data_summary(1); + initialize_data_requests(&mut data_stream, &global_data_summary); // Verify the data stream is now initialized assert!(data_stream.data_requests_initialized()); @@ -177,14 +185,15 @@ async fn test_stream_initialization() { async fn test_stream_data_error() { // Create an epoch ending data stream let streaming_service_config = DataStreamingServiceConfig::default(); - let (mut data_stream, mut stream_listener) = - create_epoch_ending_stream(streaming_service_config, MIN_ADVERTISED_EPOCH_END); + let (mut data_stream, mut stream_listener) = create_epoch_ending_stream( + AptosDataClientConfig::default(), + streaming_service_config, + MIN_ADVERTISED_EPOCH_END, + ); // Initialize the data stream let global_data_summary = create_global_data_summary(100); - data_stream - .initialize_data_requests(global_data_summary.clone()) - .unwrap(); + initialize_data_requests(&mut data_stream, &global_data_summary); // Clear the pending queue and insert an error response let client_request = DataClientRequest::EpochEndingLedgerInfos(EpochEndingLedgerInfosRequest { @@ -200,10 +209,7 @@ async fn test_stream_data_error() { insert_response_into_pending_queue(&mut data_stream, pending_response); // Process the responses and verify the data client request was resent to the network - data_stream - .process_data_responses(global_data_summary) - .await - .unwrap(); + process_data_responses(&mut data_stream, &global_data_summary).await; assert_none!(stream_listener.select_next_some().now_or_never()); verify_client_request_resubmitted(&mut data_stream, client_request); } @@ -212,14 +218,15 @@ async fn test_stream_data_error() { async fn test_stream_invalid_response() { // Create an epoch ending data stream let streaming_service_config = DataStreamingServiceConfig::default(); - let (mut data_stream, mut stream_listener) = - create_epoch_ending_stream(streaming_service_config, MIN_ADVERTISED_EPOCH_END); + let (mut data_stream, mut stream_listener) = create_epoch_ending_stream( + AptosDataClientConfig::default(), + streaming_service_config, + MIN_ADVERTISED_EPOCH_END, + ); // Initialize the data stream let global_data_summary = create_global_data_summary(100); - data_stream - .initialize_data_requests(global_data_summary.clone()) - .unwrap(); + initialize_data_requests(&mut data_stream, &global_data_summary); // Clear the pending queue and insert a response with an invalid type let client_request = DataClientRequest::EpochEndingLedgerInfos(EpochEndingLedgerInfosRequest { @@ -238,10 +245,7 @@ async fn test_stream_invalid_response() { insert_response_into_pending_queue(&mut data_stream, pending_response); // Process the responses and verify the data client request was resent to the network - data_stream - .process_data_responses(global_data_summary) - .await - .unwrap(); + process_data_responses(&mut data_stream, &global_data_summary).await; assert_none!(stream_listener.select_next_some().now_or_never()); verify_client_request_resubmitted(&mut data_stream, client_request); } @@ -255,14 +259,15 @@ async fn test_epoch_stream_out_of_order_responses() { max_concurrent_state_requests: 1, ..Default::default() }; - let (mut data_stream, mut stream_listener) = - create_epoch_ending_stream(streaming_service_config, MIN_ADVERTISED_EPOCH_END); + let (mut data_stream, mut stream_listener) = create_epoch_ending_stream( + AptosDataClientConfig::default(), + streaming_service_config, + MIN_ADVERTISED_EPOCH_END, + ); // Initialize the data stream let global_data_summary = create_global_data_summary(1); - data_stream - .initialize_data_requests(global_data_summary.clone()) - .unwrap(); + initialize_data_requests(&mut data_stream, &global_data_summary); // Verify at least three requests have been made let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); @@ -272,19 +277,13 @@ async fn test_epoch_stream_out_of_order_responses() { ); // Set a response for the second request and verify no notifications - set_epoch_ending_response_in_queue(&mut data_stream, 1); - data_stream - .process_data_responses(global_data_summary.clone()) - .await - .unwrap(); + set_epoch_ending_response_in_queue(&mut data_stream, 1, 0); + process_data_responses(&mut data_stream, &global_data_summary).await; assert_none!(stream_listener.select_next_some().now_or_never()); // Set a response for the first request and verify two notifications - set_epoch_ending_response_in_queue(&mut data_stream, 0); - data_stream - .process_data_responses(global_data_summary.clone()) - .await - .unwrap(); + set_epoch_ending_response_in_queue(&mut data_stream, 0, 0); + process_data_responses(&mut data_stream, &global_data_summary).await; for _ in 0..2 { verify_epoch_ending_notification( &mut stream_listener, @@ -295,12 +294,9 @@ async fn test_epoch_stream_out_of_order_responses() { assert_none!(stream_listener.select_next_some().now_or_never()); // Set the response for the first and third request and verify one notification sent - set_epoch_ending_response_in_queue(&mut data_stream, 0); - set_epoch_ending_response_in_queue(&mut data_stream, 2); - data_stream - .process_data_responses(global_data_summary.clone()) - .await - .unwrap(); + set_epoch_ending_response_in_queue(&mut data_stream, 0, 0); + set_epoch_ending_response_in_queue(&mut data_stream, 2, 0); + process_data_responses(&mut data_stream, &global_data_summary).await; verify_epoch_ending_notification( &mut stream_listener, create_ledger_info(0, MIN_ADVERTISED_EPOCH_END, true), @@ -309,12 +305,9 @@ async fn test_epoch_stream_out_of_order_responses() { assert_none!(stream_listener.select_next_some().now_or_never()); // Set the response for the first and third request and verify three notifications sent - set_epoch_ending_response_in_queue(&mut data_stream, 0); - set_epoch_ending_response_in_queue(&mut data_stream, 2); - data_stream - .process_data_responses(global_data_summary.clone()) - .await - .unwrap(); + set_epoch_ending_response_in_queue(&mut data_stream, 0, 0); + set_epoch_ending_response_in_queue(&mut data_stream, 2, 0); + process_data_responses(&mut data_stream, &global_data_summary).await; for _ in 0..3 { verify_epoch_ending_notification( &mut stream_listener, @@ -334,14 +327,15 @@ async fn test_state_stream_out_of_order_responses() { max_concurrent_state_requests, ..Default::default() }; - let (mut data_stream, mut stream_listener) = - create_state_value_stream(streaming_service_config, MIN_ADVERTISED_STATES); + let (mut data_stream, mut stream_listener) = create_state_value_stream( + AptosDataClientConfig::default(), + streaming_service_config, + MIN_ADVERTISED_STATES, + ); // Initialize the data stream let global_data_summary = create_global_data_summary(1); - data_stream - .initialize_data_requests(global_data_summary.clone()) - .unwrap(); + initialize_data_requests(&mut data_stream, &global_data_summary); // Verify a single request is made (to fetch the number of state values) let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); @@ -349,10 +343,7 @@ async fn test_state_stream_out_of_order_responses() { // Set a response for the number of state values set_num_state_values_response_in_queue(&mut data_stream, 0); - data_stream - .process_data_responses(global_data_summary.clone()) - .await - .unwrap(); + process_data_responses(&mut data_stream, &global_data_summary).await; // Verify at least six requests have been made let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); @@ -363,18 +354,12 @@ async fn test_state_stream_out_of_order_responses() { // Set a response for the second request and verify no notifications set_state_value_response_in_queue(&mut data_stream, 1); - data_stream - .process_data_responses(global_data_summary.clone()) - .await - .unwrap(); + process_data_responses(&mut data_stream, &global_data_summary).await; assert_none!(stream_listener.select_next_some().now_or_never()); // Set a response for the first request and verify two notifications set_state_value_response_in_queue(&mut data_stream, 0); - data_stream - .process_data_responses(global_data_summary.clone()) - .await - .unwrap(); + process_data_responses(&mut data_stream, &global_data_summary).await; for _ in 0..2 { let data_notification = get_data_notification(&mut stream_listener).await.unwrap(); assert_matches!( @@ -387,10 +372,7 @@ async fn test_state_stream_out_of_order_responses() { // Set the response for the first and third request and verify one notification sent set_state_value_response_in_queue(&mut data_stream, 0); set_state_value_response_in_queue(&mut data_stream, 2); - data_stream - .process_data_responses(global_data_summary.clone()) - .await - .unwrap(); + process_data_responses(&mut data_stream, &global_data_summary).await; let data_notification = get_data_notification(&mut stream_listener).await.unwrap(); assert_matches!( data_notification.data_payload, @@ -401,10 +383,7 @@ async fn test_state_stream_out_of_order_responses() { // Set the response for the first and third request and verify three notifications sent set_state_value_response_in_queue(&mut data_stream, 0); set_state_value_response_in_queue(&mut data_stream, 2); - data_stream - .process_data_responses(global_data_summary.clone()) - .await - .unwrap(); + process_data_responses(&mut data_stream, &global_data_summary).await; for _ in 0..3 { let data_notification = get_data_notification(&mut stream_listener).await.unwrap(); assert_matches!( @@ -415,6 +394,381 @@ async fn test_state_stream_out_of_order_responses() { assert_none!(stream_listener.select_next_some().now_or_never()); } +#[tokio::test] +async fn test_continuous_stream_epoch_change_retry() { + // Create a test streaming service config + let max_request_retry = 10; + let max_concurrent_requests = 3; + let streaming_service_config = DataStreamingServiceConfig { + max_concurrent_requests, + max_request_retry, + ..Default::default() + }; + + // Test both types of continuous data streams + let (data_stream_1, _stream_listener_1) = create_continuous_transaction_stream( + AptosDataClientConfig::default(), + streaming_service_config, + MIN_ADVERTISED_TRANSACTION, + MIN_ADVERTISED_EPOCH_END, + ); + let (data_stream_2, _stream_listener_2) = create_continuous_transaction_output_stream( + AptosDataClientConfig::default(), + streaming_service_config, + MIN_ADVERTISED_TRANSACTION_OUTPUT, + MIN_ADVERTISED_EPOCH_END, + ); + for mut data_stream in [data_stream_1, data_stream_2] { + // Initialize the data stream and drive progress + let global_data_summary = create_global_data_summary(1); + initialize_data_requests(&mut data_stream, &global_data_summary); + + // Verify a single request is made + let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); + assert_eq!(sent_requests.as_ref().unwrap().len(), 1); + + // Verify the request is for an epoch ending ledger info + let client_request = get_pending_client_request(&mut data_stream, 0); + let epoch_ending_request = + DataClientRequest::EpochEndingLedgerInfos(EpochEndingLedgerInfosRequest { + start_epoch: MIN_ADVERTISED_EPOCH_END, + end_epoch: MIN_ADVERTISED_EPOCH_END, + }); + assert_eq!(client_request, epoch_ending_request); + + // Handle multiple timeouts and retries + for _ in 0..max_request_retry - 1 { + // Set a timeout response for the epoch ending ledger info and process it + set_timeout_response_in_queue(&mut data_stream, 0); + process_data_responses(&mut data_stream, &global_data_summary).await; + + // Verify the data client request was resent to the network (retried) + let client_request = get_pending_client_request(&mut data_stream, 0); + assert_eq!(client_request, epoch_ending_request); + } + + // Set an epoch ending response in the queue and process it + set_epoch_ending_response_in_queue(&mut data_stream, 0, MIN_ADVERTISED_TRANSACTION + 100); + process_data_responses(&mut data_stream, &global_data_summary).await; + + // Verify the correct number of data requests are now pending, + // i.e., a target has been found and we're fetching data up to it. + let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); + assert_eq!(sent_requests.as_ref().unwrap().len(), 3); + } +} + +#[tokio::test] +async fn test_continuous_stream_subscription_retry() { + // Create a test streaming service config + let max_request_retry = 3; + let max_concurrent_requests = 3; + let streaming_service_config = DataStreamingServiceConfig { + max_concurrent_requests, + max_request_retry, + ..Default::default() + }; + + // Test both types of continuous data streams + let (data_stream_1, stream_listener_1) = create_continuous_transaction_stream( + AptosDataClientConfig::default(), + streaming_service_config, + MAX_ADVERTISED_TRANSACTION, + MAX_ADVERTISED_EPOCH_END, + ); + let (data_stream_2, stream_listener_2) = create_continuous_transaction_output_stream( + AptosDataClientConfig::default(), + streaming_service_config, + MAX_ADVERTISED_TRANSACTION_OUTPUT, + MAX_ADVERTISED_EPOCH_END, + ); + for (mut data_stream, mut stream_listener, transactions_only) in [ + (data_stream_1, stream_listener_1, true), + (data_stream_2, stream_listener_2, false), + ] { + // Initialize the data stream + let global_data_summary = create_global_data_summary(1); + initialize_data_requests(&mut data_stream, &global_data_summary); + + // Verify a single request is made + let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); + assert_eq!(sent_requests.as_ref().unwrap().len(), 1); + + // Verify the request is for new transaction data + let client_request = get_pending_client_request(&mut data_stream, 0); + let expected_request = if transactions_only { + DataClientRequest::NewTransactionsWithProof(NewTransactionsWithProofRequest { + known_version: MAX_ADVERTISED_TRANSACTION, + known_epoch: MAX_ADVERTISED_EPOCH_END, + include_events: false, + }) + } else { + DataClientRequest::NewTransactionOutputsWithProof( + NewTransactionOutputsWithProofRequest { + known_version: MAX_ADVERTISED_TRANSACTION_OUTPUT, + known_epoch: MAX_ADVERTISED_EPOCH_END, + }, + ) + }; + assert_eq!(client_request, expected_request); + + // Set a timeout response for the subscription request and process it + set_timeout_response_in_queue(&mut data_stream, 0); + process_data_responses(&mut data_stream, &global_data_summary).await; + assert_none!(stream_listener.select_next_some().now_or_never()); + + // Handle multiple timeouts and retries because no new data is known + // about, so the best we can do is send subscriptions + for _ in 0..max_request_retry * 3 { + // Set a timeout response for the request and process it + set_timeout_response_in_queue(&mut data_stream, 0); + process_data_responses(&mut data_stream, &global_data_summary).await; + + // Verify the same subscription request was resent to the network + let client_request = get_pending_client_request(&mut data_stream, 0); + assert_eq!(client_request, expected_request); + } + + // Set a subscription response in the queue and process it + set_subscription_response_in_queue( + &mut data_stream, + 0, + MAX_ADVERTISED_TRANSACTION + 1, + transactions_only, + ); + process_data_responses(&mut data_stream, &global_data_summary).await; + + // Verify another subscription request is now sent (for data beyond the previous target) + let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); + assert_eq!(sent_requests.as_ref().unwrap().len(), 1); + let client_request = get_pending_client_request(&mut data_stream, 0); + let expected_request = if transactions_only { + DataClientRequest::NewTransactionsWithProof(NewTransactionsWithProofRequest { + known_version: MAX_ADVERTISED_TRANSACTION + 1, + known_epoch: MAX_ADVERTISED_EPOCH_END, + include_events: false, + }) + } else { + DataClientRequest::NewTransactionOutputsWithProof( + NewTransactionOutputsWithProofRequest { + known_version: MAX_ADVERTISED_TRANSACTION_OUTPUT + 1, + known_epoch: MAX_ADVERTISED_EPOCH_END, + }, + ) + }; + assert_eq!(client_request, expected_request); + + // Set a timeout response for the subscription request and process it. + // This will cause the same request to be re-sent. + set_timeout_response_in_queue(&mut data_stream, 0); + process_data_responses(&mut data_stream, &global_data_summary).await; + + // Set a timeout response for the subscription request and process it, + // but this time the node knows about new data to fetch. + set_timeout_response_in_queue(&mut data_stream, 0); + let mut new_global_data_summary = global_data_summary.clone(); + let new_highest_synced_version = MAX_ADVERTISED_TRANSACTION + 1000; + new_global_data_summary.advertised_data.synced_ledger_infos = vec![create_ledger_info( + new_highest_synced_version, + MAX_ADVERTISED_EPOCH_END, + false, + )]; + process_data_responses(&mut data_stream, &new_global_data_summary).await; + + // Verify multiple data requests have now been sent to fetch the missing data + let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); + assert_eq!(sent_requests.as_ref().unwrap().len(), 3); + for i in 0..3 { + let expected_version = MAX_ADVERTISED_TRANSACTION + 2 + i as u64; + let client_request = get_pending_client_request(&mut data_stream, i); + let expected_request = if transactions_only { + DataClientRequest::TransactionsWithProof(TransactionsWithProofRequest { + start_version: expected_version, + end_version: expected_version, + proof_version: new_highest_synced_version, + include_events: false, + }) + } else { + DataClientRequest::TransactionOutputsWithProof(TransactionOutputsWithProofRequest { + start_version: expected_version, + end_version: expected_version, + proof_version: new_highest_synced_version, + }) + }; + assert_eq!(client_request, expected_request); + } + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_continuous_stream_subscription_timeout() { + // Create a test data client config + let subscription_timeout_ms = 2022; + let data_client_config = AptosDataClientConfig { + subscription_timeout_ms, + ..Default::default() + }; + + // Test both types of continuous data streams + let (data_stream_1, stream_listener_1) = create_continuous_transaction_stream( + data_client_config, + DataStreamingServiceConfig::default(), + MAX_ADVERTISED_TRANSACTION, + MAX_ADVERTISED_EPOCH_END, + ); + let (data_stream_2, stream_listener_2) = create_continuous_transaction_output_stream( + data_client_config, + DataStreamingServiceConfig::default(), + MAX_ADVERTISED_TRANSACTION_OUTPUT, + MAX_ADVERTISED_EPOCH_END, + ); + for (mut data_stream, mut stream_listener, transactions_only) in [ + (data_stream_1, stream_listener_1, true), + (data_stream_2, stream_listener_2, false), + ] { + // Initialize the data stream + let global_data_summary = create_global_data_summary(1); + initialize_data_requests(&mut data_stream, &global_data_summary); + + // Verify a single request is made + let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); + assert_eq!(sent_requests.as_ref().unwrap().len(), 1); + + // Wait until a notification is sent. The mock data client + // will verify the timeout. + wait_for_notification_and_verify( + &mut data_stream, + &mut stream_listener, + transactions_only, + true, + &global_data_summary, + ) + .await; + + // Handle multiple timeouts and retries because no new data is known + // about, so the best we can do is send subscriptions + for _ in 0..3 { + set_timeout_response_in_queue(&mut data_stream, 0); + process_data_responses(&mut data_stream, &global_data_summary).await; + } + + // Wait until a notification is sent. The mock data client + // will verify the timeout. + wait_for_notification_and_verify( + &mut data_stream, + &mut stream_listener, + transactions_only, + true, + &global_data_summary, + ) + .await; + } +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_transaction_and_output_stream_timeout() { + // Create a test data client config + let max_response_timeout_ms = 85; + let response_timeout_ms = 7; + let data_client_config = AptosDataClientConfig { + max_response_timeout_ms, + response_timeout_ms, + ..Default::default() + }; + + // Create a test streaming service config + let max_concurrent_requests = 3; + let max_request_retry = 10; + let streaming_service_config = DataStreamingServiceConfig { + max_concurrent_requests, + max_request_retry, + ..Default::default() + }; + + // Test both types of data streams + let (data_stream_1, stream_listener_1) = create_transaction_stream( + data_client_config, + streaming_service_config, + MIN_ADVERTISED_TRANSACTION, + MAX_ADVERTISED_TRANSACTION, + ); + let (data_stream_2, stream_listener_2) = create_output_stream( + data_client_config, + streaming_service_config, + MIN_ADVERTISED_TRANSACTION_OUTPUT, + MAX_ADVERTISED_TRANSACTION_OUTPUT, + ); + for (mut data_stream, mut stream_listener, transactions_only) in [ + (data_stream_1, stream_listener_1, true), + (data_stream_2, stream_listener_2, false), + ] { + // Initialize the data stream + let global_data_summary = create_global_data_summary(1); + initialize_data_requests(&mut data_stream, &global_data_summary); + + // Verify the correct number of requests are made + let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); + assert_eq!( + sent_requests.as_ref().unwrap().len(), + max_concurrent_requests as usize + ); + + // Wait for the data client to satisfy all requests + for i in 0..max_concurrent_requests as usize { + wait_for_data_client_to_respond(&mut data_stream, i).await; + } + + // Handle multiple timeouts and retries on the first request + for _ in 0..max_request_retry / 2 { + set_timeout_response_in_queue(&mut data_stream, 0); + process_data_responses(&mut data_stream, &global_data_summary).await; + wait_for_data_client_to_respond(&mut data_stream, 0).await; + } + + // Wait until a notification is finally sent along the stream + wait_for_notification_and_verify( + &mut data_stream, + &mut stream_listener, + transactions_only, + false, + &global_data_summary, + ) + .await; + + // Wait for the data client to satisfy all requests + for i in 0..max_concurrent_requests as usize { + wait_for_data_client_to_respond(&mut data_stream, i).await; + } + + // Set a timeout on the second request + set_timeout_response_in_queue(&mut data_stream, 1); + + // Handle multiple invalid type responses on the first request + for _ in 0..max_request_retry / 2 { + set_state_value_response_in_queue(&mut data_stream, 0); + process_data_responses(&mut data_stream, &global_data_summary).await; + wait_for_data_client_to_respond(&mut data_stream, 0).await; + } + + // Handle multiple invalid type responses on the third request + for _ in 0..max_request_retry / 2 { + set_state_value_response_in_queue(&mut data_stream, 2); + process_data_responses(&mut data_stream, &global_data_summary).await; + wait_for_data_client_to_respond(&mut data_stream, 2).await; + } + + // Wait until a notification is finally sent along the stream + wait_for_notification_and_verify( + &mut data_stream, + &mut stream_listener, + transactions_only, + false, + &global_data_summary, + ) + .await; + } +} + #[tokio::test] async fn test_stream_listener_dropped() { // Create an epoch ending data stream @@ -423,14 +777,15 @@ async fn test_stream_listener_dropped() { max_concurrent_requests, ..Default::default() }; - let (mut data_stream, mut stream_listener) = - create_epoch_ending_stream(streaming_service_config, MIN_ADVERTISED_EPOCH_END); + let (mut data_stream, mut stream_listener) = create_epoch_ending_stream( + AptosDataClientConfig::default(), + streaming_service_config, + MIN_ADVERTISED_EPOCH_END, + ); // Initialize the data stream let global_data_summary = create_global_data_summary(1); - data_stream - .initialize_data_requests(global_data_summary.clone()) - .unwrap(); + initialize_data_requests(&mut data_stream, &global_data_summary); // Verify no notifications have been sent yet let (sent_requests, sent_notifications) = data_stream.get_sent_requests_and_notifications(); @@ -441,11 +796,8 @@ async fn test_stream_listener_dropped() { assert_eq!(sent_notifications.len(), 0); // Set a response for the first request and verify a notification is sent - set_epoch_ending_response_in_queue(&mut data_stream, 0); - data_stream - .process_data_responses(global_data_summary.clone()) - .await - .unwrap(); + set_epoch_ending_response_in_queue(&mut data_stream, 0, 0); + process_data_responses(&mut data_stream, &global_data_summary).await; verify_epoch_ending_notification( &mut stream_listener, create_ledger_info(0, MIN_ADVERTISED_EPOCH_END, true), @@ -461,7 +813,7 @@ async fn test_stream_listener_dropped() { // Set a response for the first request and verify an error is returned // when the notification is sent. - set_epoch_ending_response_in_queue(&mut data_stream, 0); + set_epoch_ending_response_in_queue(&mut data_stream, 0, 0); data_stream .process_data_responses(global_data_summary.clone()) .await @@ -470,17 +822,15 @@ async fn test_stream_listener_dropped() { assert_eq!(sent_notifications.len(), 2); // Set a response for the first request and verify no notifications are sent - set_epoch_ending_response_in_queue(&mut data_stream, 0); - data_stream - .process_data_responses(global_data_summary.clone()) - .await - .unwrap(); + set_epoch_ending_response_in_queue(&mut data_stream, 0, 0); + process_data_responses(&mut data_stream, &global_data_summary).await; let (_, sent_notifications) = data_stream.get_sent_requests_and_notifications(); assert_eq!(sent_notifications.len(), 2); } /// Creates a state value stream for the given `version`. fn create_state_value_stream( + data_client_config: AptosDataClientConfig, streaming_service_config: DataStreamingServiceConfig, version: Version, ) -> (DataStream, DataStreamListener) { @@ -489,11 +839,12 @@ fn create_state_value_stream( version, start_index: 0, }); - create_data_stream(streaming_service_config, stream_request) + create_data_stream(data_client_config, streaming_service_config, stream_request) } /// Creates an epoch ending stream starting at `start_epoch` fn create_epoch_ending_stream( + data_client_config: AptosDataClientConfig, streaming_service_config: DataStreamingServiceConfig, start_epoch: u64, ) -> (DataStream, DataStreamListener) { @@ -502,54 +853,95 @@ fn create_epoch_ending_stream( StreamRequest::GetAllEpochEndingLedgerInfos(GetAllEpochEndingLedgerInfosRequest { start_epoch, }); - create_data_stream(streaming_service_config, stream_request) + create_data_stream(data_client_config, streaming_service_config, stream_request) +} + +/// Creates a continuous transaction output stream for the given `version`. +fn create_continuous_transaction_output_stream( + data_client_config: AptosDataClientConfig, + streaming_service_config: DataStreamingServiceConfig, + known_version: Version, + known_epoch: Version, +) -> (DataStream, DataStreamListener) { + // Create a continuous transaction output stream request + let stream_request = StreamRequest::ContinuouslyStreamTransactionOutputs( + ContinuouslyStreamTransactionOutputsRequest { + known_version, + known_epoch, + target: None, + }, + ); + create_data_stream(data_client_config, streaming_service_config, stream_request) +} + +/// Creates a continuous transaction stream for the given `version`. +fn create_continuous_transaction_stream( + data_client_config: AptosDataClientConfig, + streaming_service_config: DataStreamingServiceConfig, + known_version: Version, + known_epoch: Version, +) -> (DataStream, DataStreamListener) { + // Create a continuous transaction stream request + let stream_request = + StreamRequest::ContinuouslyStreamTransactions(ContinuouslyStreamTransactionsRequest { + known_version, + known_epoch, + include_events: false, + target: None, + }); + create_data_stream(data_client_config, streaming_service_config, stream_request) } -/// Creates a transaction output stream for the given `version`. +/// Creates a transaction stream for the given `version`. fn create_transaction_stream( + data_client_config: AptosDataClientConfig, streaming_service_config: DataStreamingServiceConfig, start_version: Version, end_version: Version, ) -> (DataStream, DataStreamListener) { - // Create a transaction output stream + // Create a transaction stream request let stream_request = StreamRequest::GetAllTransactions(GetAllTransactionsRequest { start_version, end_version, proof_version: end_version, include_events: false, }); - create_data_stream(streaming_service_config, stream_request) + create_data_stream(data_client_config, streaming_service_config, stream_request) +} + +/// Creates an output stream for the given `version`. +fn create_output_stream( + data_client_config: AptosDataClientConfig, + streaming_service_config: DataStreamingServiceConfig, + start_version: Version, + end_version: Version, +) -> (DataStream, DataStreamListener) { + // Create an output stream request + let stream_request = StreamRequest::GetAllTransactionOutputs(GetAllTransactionOutputsRequest { + start_version, + end_version, + proof_version: end_version, + }); + create_data_stream(data_client_config, streaming_service_config, stream_request) } fn create_data_stream( + data_client_config: AptosDataClientConfig, streaming_service_config: DataStreamingServiceConfig, stream_request: StreamRequest, ) -> (DataStream, DataStreamListener) { initialize_logger(); // Create an advertised data - let advertised_data = AdvertisedData { - states: vec![CompleteDataRange::new(MIN_ADVERTISED_STATES, MAX_ADVERTISED_STATES).unwrap()], - epoch_ending_ledger_infos: vec![CompleteDataRange::new( - MIN_ADVERTISED_EPOCH_END, - MAX_ADVERTISED_EPOCH_END, - ) - .unwrap()], - synced_ledger_infos: vec![], - transactions: vec![], - transaction_outputs: vec![CompleteDataRange::new( - MIN_ADVERTISED_TRANSACTION_OUTPUT, - MAX_ADVERTISED_TRANSACTION_OUTPUT, - ) - .unwrap()], - }; + let advertised_data = create_advertised_data(); // Create an aptos data client mock and notification generator - let aptos_data_client = MockAptosDataClient::new(false, false, false); + let aptos_data_client = MockAptosDataClient::new(data_client_config, true, false, true, false); let notification_generator = Arc::new(U64IdGenerator::new()); // Return the data stream and listener pair DataStream::new( + data_client_config, streaming_service_config, create_random_u64(10000), &stream_request, @@ -560,9 +952,36 @@ fn create_data_stream( .unwrap() } +fn create_advertised_data() -> AdvertisedData { + AdvertisedData { + states: vec![CompleteDataRange::new(MIN_ADVERTISED_STATES, MAX_ADVERTISED_STATES).unwrap()], + epoch_ending_ledger_infos: vec![CompleteDataRange::new( + MIN_ADVERTISED_EPOCH_END, + MAX_ADVERTISED_EPOCH_END, + ) + .unwrap()], + synced_ledger_infos: vec![create_ledger_info( + MAX_ADVERTISED_TRANSACTION, + MAX_ADVERTISED_EPOCH_END, + true, + )], + transactions: vec![CompleteDataRange::new( + MIN_ADVERTISED_TRANSACTION, + MAX_ADVERTISED_TRANSACTION, + ) + .unwrap()], + transaction_outputs: vec![CompleteDataRange::new( + MIN_ADVERTISED_TRANSACTION_OUTPUT, + MAX_ADVERTISED_TRANSACTION_OUTPUT, + ) + .unwrap()], + } +} + fn create_global_data_summary(chunk_sizes: u64) -> GlobalDataSummary { let mut global_data_summary = GlobalDataSummary::empty(); global_data_summary.optimal_chunk_sizes = create_optimal_chunk_sizes(chunk_sizes); + global_data_summary.advertised_data = create_advertised_data(); global_data_summary } @@ -580,13 +999,13 @@ fn create_optimal_chunk_sizes(chunk_sizes: u64) -> OptimalChunkSizes { fn set_epoch_ending_response_in_queue( data_stream: &mut DataStream, index: usize, + version: u64, ) { - // Set the response at the specified index let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); let pending_response = sent_requests.as_mut().unwrap().get_mut(index).unwrap(); let client_response = Some(Ok(create_data_client_response( ResponsePayload::EpochEndingLedgerInfos(vec![create_ledger_info( - 0, + version, MIN_ADVERTISED_EPOCH_END, true, )]), @@ -630,10 +1049,70 @@ fn set_state_value_response_in_queue( pending_response.lock().client_response = client_response; } +/// Sets the client response at the index in the pending queue to contain a +/// subscription data response. +fn set_subscription_response_in_queue( + data_stream: &mut DataStream, + index: usize, + single_data_version: u64, + transactions_only: bool, +) { + let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); + let pending_response = sent_requests.as_mut().unwrap().get_mut(index).unwrap(); + let client_response = if transactions_only { + Some(Ok(create_data_client_response( + ResponsePayload::NewTransactionsWithProof(( + create_transaction_list_with_proof(single_data_version, single_data_version, false), + create_ledger_info(single_data_version, MAX_ADVERTISED_EPOCH_END, false), + )), + ))) + } else { + Some(Ok(create_data_client_response( + ResponsePayload::NewTransactionOutputsWithProof(( + create_output_list_with_proof(single_data_version, single_data_version), + create_ledger_info(single_data_version, MAX_ADVERTISED_EPOCH_END, false), + )), + ))) + }; + pending_response.lock().client_response = client_response; +} + +/// Sets the client response at the index in the pending queue to contain a +/// timeout response. +fn set_timeout_response_in_queue(data_stream: &mut DataStream, index: usize) { + let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); + let pending_response = sent_requests.as_mut().unwrap().get_mut(index).unwrap(); + let client_response = Some(Err(aptos_data_client::Error::TimeoutWaitingForResponse( + "Timed out!".into(), + ))); + pending_response.lock().client_response = client_response; +} + +/// Waits for the data client to set the response at the index in the +/// pending queue. +async fn wait_for_data_client_to_respond( + data_stream: &mut DataStream, + index: usize, +) { + let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); + let pending_response = sent_requests.as_mut().unwrap().get_mut(index).unwrap(); + + loop { + if let Some(client_response) = &pending_response.lock().client_response { + if !matches!( + client_response, + Err(aptos_data_client::Error::TimeoutWaitingForResponse(_)) + ) { + return; + } + } + tokio::time::sleep(Duration::from_millis(100)).await; + } +} + /// Sets the client response at the head of the pending queue to contain an /// transaction response. fn set_transaction_response_at_queue_head(data_stream: &mut DataStream) { - // Set the response at the specified index let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); if !sent_requests.as_mut().unwrap().is_empty() { let pending_response = sent_requests.as_mut().unwrap().get_mut(0).unwrap(); @@ -687,3 +1166,95 @@ async fn verify_epoch_ending_notification( ); } } + +/// Helper function to initialize the data requests +fn initialize_data_requests( + data_stream: &mut DataStream, + global_data_summary: &GlobalDataSummary, +) { + data_stream + .initialize_data_requests(global_data_summary.clone()) + .unwrap(); +} + +/// Helper function to process data responses on the given data stream +async fn process_data_responses( + data_stream: &mut DataStream, + global_data_summary: &GlobalDataSummary, +) { + data_stream + .process_data_responses(global_data_summary.clone()) + .await + .unwrap(); +} + +/// Helper function to get the pending client request at +/// the specified index. +fn get_pending_client_request( + data_stream: &mut DataStream, + index: usize, +) -> DataClientRequest { + let (sent_requests, _) = data_stream.get_sent_requests_and_notifications(); + let pending_response = sent_requests.as_ref().unwrap().get(index).unwrap(); + let client_request = pending_response.lock().client_request.clone(); + client_request +} + +/// Waits for a subscription notification along the given +/// listener and continues to drive progress until one is received. +/// Verifies the notification when it is received. +async fn wait_for_notification_and_verify( + data_stream: &mut DataStream, + stream_listener: &mut DataStreamListener, + transactions_only: bool, + subscription_notification: bool, + global_data_summary: &GlobalDataSummary, +) { + loop { + if let Ok(data_notification) = + timeout(Duration::from_secs(1), stream_listener.select_next_some()).await + { + if subscription_notification { + // Verify we got the correct subscription data + if transactions_only { + if !matches!( + data_notification.data_payload, + DataPayload::ContinuousTransactionsWithProof(..) + ) { + panic!( + "Invalid data notification found: {:?}", + data_notification.data_payload + ); + } + } else if !matches!( + data_notification.data_payload, + DataPayload::ContinuousTransactionOutputsWithProof(..) + ) { + panic!("Invalid data notification found: {:?}", data_notification); + } + } else { + // Verify we got the correct transaction data + if transactions_only { + if !matches!( + data_notification.data_payload, + DataPayload::TransactionsWithProof(..) + ) { + panic!( + "Invalid data notification found: {:?}", + data_notification.data_payload + ); + } + } else if !matches!( + data_notification.data_payload, + DataPayload::TransactionOutputsWithProof(..) + ) { + panic!("Invalid data notification found: {:?}", data_notification); + } + } + + break; + } else { + process_data_responses(data_stream, global_data_summary).await; + } + } +} diff --git a/state-sync/state-sync-v2/data-streaming-service/src/tests/streaming_service.rs b/state-sync/state-sync-v2/data-streaming-service/src/tests/streaming_service.rs index 9b95a9d3fb04c..0cb27b7219640 100644 --- a/state-sync/state-sync-v2/data-streaming-service/src/tests/streaming_service.rs +++ b/state-sync/state-sync-v2/data-streaming-service/src/tests/streaming_service.rs @@ -17,7 +17,7 @@ use crate::{ MIN_ADVERTISED_TRANSACTION, MIN_ADVERTISED_TRANSACTION_OUTPUT, TOTAL_NUM_STATE_VALUES, }, }; -use aptos_config::config::DataStreamingServiceConfig; +use aptos_config::config::{AptosDataClientConfig, DataStreamingServiceConfig}; use claims::{assert_le, assert_matches, assert_ok, assert_some}; macro_rules! unexpected_payload_type { @@ -1520,10 +1520,13 @@ pub fn create_streaming_client_and_server( new_streaming_service_client_listener_pair(); // Create a mock data client + let aptos_data_client_config = AptosDataClientConfig::default(); let aptos_data_client = MockAptosDataClient::new( + aptos_data_client_config, data_beyond_highest_advertised, limit_chunk_sizes, skip_emulate_network_latencies, + true, ); // Create the data streaming service config @@ -1535,6 +1538,7 @@ pub fn create_streaming_client_and_server( // Create the streaming service and connect it to the listener let streaming_service = DataStreamingService::new( + aptos_data_client_config, data_streaming_service_config, aptos_data_client, streaming_service_listener, diff --git a/state-sync/state-sync-v2/data-streaming-service/src/tests/utils.rs b/state-sync/state-sync-v2/data-streaming-service/src/tests/utils.rs index f573ee534b38d..53181618ca769 100644 --- a/state-sync/state-sync-v2/data-streaming-service/src/tests/utils.rs +++ b/state-sync/state-sync-v2/data-streaming-service/src/tests/utils.rs @@ -2,11 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{data_notification::DataNotification, data_stream::DataStreamListener, error::Error}; +use aptos_config::config::AptosDataClientConfig; use aptos_crypto::{ed25519::Ed25519PrivateKey, HashValue, PrivateKey, SigningKey, Uniform}; use aptos_data_client::{ AdvertisedData, AptosDataClient, GlobalDataSummary, OptimalChunkSizes, Response, ResponseCallback, ResponseContext, ResponseError, }; +use aptos_infallible::Mutex; use aptos_logger::Level; use aptos_types::aggregate_signature::AggregateSignature; use aptos_types::{ @@ -30,11 +32,22 @@ use aptos_types::{ use async_trait::async_trait; use futures::StreamExt; use rand::{rngs::OsRng, Rng}; +use std::cmp::min; +use std::ops::DerefMut; +use std::sync::Arc; use std::{collections::HashMap, thread, time::Duration}; +use storage_service_types::requests::{ + DataRequest, EpochEndingLedgerInfoRequest, NewTransactionOutputsWithProofRequest, + NewTransactionsWithProofRequest, StateValuesWithProofRequest, + TransactionOutputsWithProofRequest, TransactionsWithProofRequest, +}; use storage_service_types::responses::CompleteDataRange; use storage_service_types::Epoch; use tokio::time::timeout; +// TODO(joshlind): provide a better way to mock the data client. +// Especially around verifying timeouts! + /// The number of state values held at any version pub const TOTAL_NUM_STATE_VALUES: u64 = 2000; @@ -47,8 +60,8 @@ pub const MIN_ADVERTISED_TRANSACTION: u64 = 1000; pub const MAX_ADVERTISED_TRANSACTION: u64 = 10000; pub const MIN_ADVERTISED_TRANSACTION_OUTPUT: u64 = 1000; pub const MAX_ADVERTISED_TRANSACTION_OUTPUT: u64 = 10000; -pub const MAX_REAL_EPOCH_END: u64 = MAX_ADVERTISED_EPOCH_END + 5; -pub const MAX_REAL_TRANSACTION: u64 = MAX_ADVERTISED_TRANSACTION + 5000; +pub const MAX_REAL_EPOCH_END: u64 = MAX_ADVERTISED_EPOCH_END + 2; +pub const MAX_REAL_TRANSACTION: u64 = MAX_ADVERTISED_TRANSACTION + 10; pub const MAX_REAL_TRANSACTION_OUTPUT: u64 = MAX_REAL_TRANSACTION; pub const MAX_RESPONSE_ID: u64 = 100000; @@ -58,19 +71,24 @@ pub const MAX_NOTIFICATION_TIMEOUT_SECS: u64 = 40; /// A simple mock of the Aptos Data Client #[derive(Clone, Debug)] pub struct MockAptosDataClient { + pub aptos_data_client_config: AptosDataClientConfig, pub advertised_epoch_ending_ledger_infos: HashMap, pub advertised_synced_ledger_infos: Vec, pub data_beyond_highest_advertised: bool, // If true, data exists beyond the highest advertised + pub data_request_counter: Arc>>, // Tracks the number of times the same data request was made pub highest_epoch_ending_ledger_infos: HashMap, pub limit_chunk_sizes: bool, // If true, responses will be truncated to emulate chunk and network limits pub skip_emulate_network_latencies: bool, // If true, skips network latency emulation + pub skip_timeout_verification: bool, // If true, skips timeout verification for incoming requests } impl MockAptosDataClient { pub fn new( + aptos_data_client_config: AptosDataClientConfig, data_beyond_highest_advertised: bool, limit_chunk_sizes: bool, skip_emulate_network_latencies: bool, + skip_timeout_verification: bool, ) -> Self { // Create the advertised data let advertised_epoch_ending_ledger_infos = create_epoch_ending_ledger_infos( @@ -95,13 +113,19 @@ impl MockAptosDataClient { MAX_REAL_TRANSACTION, ); + // Create the data request counter + let data_request_counter = Arc::new(Mutex::new(HashMap::new())); + Self { + aptos_data_client_config, advertised_epoch_ending_ledger_infos, advertised_synced_ledger_infos, data_beyond_highest_advertised, + data_request_counter, highest_epoch_ending_ledger_infos, limit_chunk_sizes, skip_emulate_network_latencies, + skip_timeout_verification, } } @@ -133,6 +157,43 @@ impl MockAptosDataClient { end_index // No need to limit the chunk } } + + fn verify_request_timeout( + &self, + request_timeout_ms: u64, + is_subscription_request: bool, + data_request: DataRequest, + ) { + if self.skip_timeout_verification { + return; + } + + // Verify the given timeout for the request + let expected_timeout = if is_subscription_request { + self.aptos_data_client_config.subscription_timeout_ms + } else { + let min_timeout = self.aptos_data_client_config.response_timeout_ms; + let max_timeout = self.aptos_data_client_config.max_response_timeout_ms; + + // Check how many times the given request has been made + // and update the request counter. + let mut data_request_counter_lock = self.data_request_counter.lock(); + let num_times_requested = *data_request_counter_lock.get(&data_request).unwrap_or(&0); + let _ = data_request_counter_lock + .deref_mut() + .insert(data_request, num_times_requested + 1); + drop(data_request_counter_lock); + + // Calculate the expected timeout based on exponential backoff + min( + max_timeout, + min_timeout * (u32::pow(2, num_times_requested as u32) as u64), + ) + }; + + // Verify the request timeouts match + assert_eq!(request_timeout_ms, expected_timeout); + } } #[async_trait] @@ -176,10 +237,20 @@ impl AptosDataClient for MockAptosDataClient { async fn get_state_values_with_proof( &self, - _version: Version, + version: Version, start_index: u64, end_index: u64, + request_timeout_ms: u64, ) -> Result, aptos_data_client::Error> { + self.verify_request_timeout( + request_timeout_ms, + false, + DataRequest::GetStateValuesWithProof(StateValuesWithProofRequest { + version, + start_index, + end_index, + }), + ); self.emulate_network_latencies(); // Calculate the last index based on if we should limit the chunk size @@ -211,7 +282,16 @@ impl AptosDataClient for MockAptosDataClient { &self, start_epoch: Epoch, end_epoch: Epoch, + request_timeout_ms: u64, ) -> Result>, aptos_data_client::Error> { + self.verify_request_timeout( + request_timeout_ms, + false, + DataRequest::GetEpochEndingLedgerInfos(EpochEndingLedgerInfoRequest { + start_epoch, + expected_end_epoch: end_epoch, + }), + ); self.emulate_network_latencies(); // Calculate the last epoch based on if we should limit the chunk size @@ -236,17 +316,31 @@ impl AptosDataClient for MockAptosDataClient { &self, known_version: Version, known_epoch: Epoch, + request_timeout_ms: u64, ) -> Result< Response<(TransactionOutputListWithProof, LedgerInfoWithSignatures)>, aptos_data_client::Error, > { + self.verify_request_timeout( + request_timeout_ms, + true, + DataRequest::GetNewTransactionOutputsWithProof(NewTransactionOutputsWithProofRequest { + known_version, + known_epoch, + }), + ); self.emulate_network_latencies(); // Attempt to fetch the new data if self.data_beyond_highest_advertised && known_version < MAX_REAL_TRANSACTION_OUTPUT { + // Create a mock data client without timeout verification (to handle the internal requests) + let mut aptos_data_client = self.clone(); + aptos_data_client.skip_timeout_verification = true; + let target_ledger_info = if known_epoch <= MAX_REAL_EPOCH_END { // Fetch the epoch ending ledger info - self.get_epoch_ending_ledger_infos(known_epoch, known_epoch) + aptos_data_client + .get_epoch_ending_ledger_infos(known_epoch, known_epoch, request_timeout_ms) .await .unwrap() .payload[0] @@ -257,12 +351,12 @@ impl AptosDataClient for MockAptosDataClient { }; // Fetch the new transaction outputs - let target_ledger_version = target_ledger_info.ledger_info().version(); - let outputs_with_proof = self + let outputs_with_proof = aptos_data_client .get_transaction_outputs_with_proof( - target_ledger_version, known_version + 1, - target_ledger_version, + known_version + 1, + known_version + 1, + self.aptos_data_client_config.response_timeout_ms, ) .await .unwrap() @@ -283,18 +377,32 @@ impl AptosDataClient for MockAptosDataClient { known_version: Version, known_epoch: Epoch, include_events: bool, + request_timeout_ms: u64, ) -> Result< Response<(TransactionListWithProof, LedgerInfoWithSignatures)>, aptos_data_client::Error, > { + self.verify_request_timeout( + request_timeout_ms, + true, + DataRequest::GetNewTransactionsWithProof(NewTransactionsWithProofRequest { + known_version, + known_epoch, + include_events, + }), + ); self.emulate_network_latencies(); // Attempt to fetch the new data if self.data_beyond_highest_advertised && known_version < MAX_REAL_TRANSACTION { - self.emulate_network_latencies(); + // Create a mock data client without timeout verification (to handle the internal requests) + let mut aptos_data_client = self.clone(); + aptos_data_client.skip_timeout_verification = true; + let target_ledger_info = if known_epoch <= MAX_REAL_EPOCH_END { // Fetch the epoch ending ledger info - self.get_epoch_ending_ledger_infos(known_epoch, known_epoch) + aptos_data_client + .get_epoch_ending_ledger_infos(known_epoch, known_epoch, request_timeout_ms) .await .unwrap() .payload[0] @@ -305,13 +413,13 @@ impl AptosDataClient for MockAptosDataClient { }; // Fetch the new transactions - let target_ledger_version = target_ledger_info.ledger_info().version(); - let transactions_with_proof = self + let transactions_with_proof = aptos_data_client .get_transactions_with_proof( - target_ledger_version, known_version + 1, - target_ledger_version, + known_version + 1, + known_version + 1, include_events, + self.aptos_data_client_config.response_timeout_ms, ) .await .unwrap() @@ -329,17 +437,35 @@ impl AptosDataClient for MockAptosDataClient { async fn get_number_of_states( &self, - _version: Version, + version: Version, + request_timeout_ms: u64, ) -> Result, aptos_data_client::Error> { + self.verify_request_timeout( + request_timeout_ms, + false, + DataRequest::GetNumberOfStatesAtVersion(version), + ); + self.emulate_network_latencies(); + Ok(create_data_client_response(TOTAL_NUM_STATE_VALUES)) } async fn get_transaction_outputs_with_proof( &self, - _proof_version: Version, + proof_version: Version, start_version: Version, end_version: Version, + request_timeout_ms: u64, ) -> Result, aptos_data_client::Error> { + self.verify_request_timeout( + request_timeout_ms, + false, + DataRequest::GetTransactionOutputsWithProof(TransactionOutputsWithProofRequest { + proof_version, + start_version, + end_version, + }), + ); self.emulate_network_latencies(); // Calculate the last version based on if we should limit the chunk size @@ -360,11 +486,22 @@ impl AptosDataClient for MockAptosDataClient { async fn get_transactions_with_proof( &self, - _proof_version: Version, + proof_version: Version, start_version: Version, end_version: Version, include_events: bool, + request_timeout_ms: u64, ) -> Result, aptos_data_client::Error> { + self.verify_request_timeout( + request_timeout_ms, + false, + DataRequest::GetTransactionsWithProof(TransactionsWithProofRequest { + proof_version, + start_version, + end_version, + include_events, + }), + ); self.emulate_network_latencies(); // Calculate the last version based on if we should limit the chunk size @@ -569,6 +706,7 @@ pub async fn get_data_notification( } } +/// Creates a transaction list with proof for testing pub fn create_transaction_list_with_proof( start_version: u64, end_version: u64, @@ -591,3 +729,23 @@ pub fn create_transaction_list_with_proof( transaction_list_with_proof } + +/// Creates an output list with proof for testing +pub fn create_output_list_with_proof( + start_version: u64, + end_version: u64, +) -> TransactionOutputListWithProof { + let transaction_list_with_proof = + create_transaction_list_with_proof(start_version, end_version, false); + let transactions_and_outputs = transaction_list_with_proof + .transactions + .iter() + .map(|txn| (txn.clone(), create_transaction_output())) + .collect(); + + TransactionOutputListWithProof::new( + transactions_and_outputs, + Some(start_version), + transaction_list_with_proof.proof, + ) +} diff --git a/state-sync/state-sync-v2/state-sync-driver/src/tests/bootstrapper.rs b/state-sync/state-sync-v2/state-sync-driver/src/tests/bootstrapper.rs index 9c45375961b22..a17f83b007b38 100644 --- a/state-sync/state-sync-v2/state-sync-driver/src/tests/bootstrapper.rs +++ b/state-sync/state-sync-v2/state-sync-driver/src/tests/bootstrapper.rs @@ -138,6 +138,7 @@ async fn test_critical_timeout() { // Create a driver configuration with a genesis waypoint and a stream timeout of 1 second let mut driver_configuration = create_full_node_driver_configuration(); driver_configuration.config.max_stream_wait_time_ms = 1000; + driver_configuration.config.max_num_stream_timeouts = 6; // Create the mock streaming client let mut mock_streaming_client = create_mock_streaming_client(); diff --git a/testsuite/smoke-test/src/state_sync.rs b/testsuite/smoke-test/src/state_sync.rs index 64a77f179acea..d35d5e5ad15a6 100644 --- a/testsuite/smoke-test/src/state_sync.rs +++ b/testsuite/smoke-test/src/state_sync.rs @@ -95,6 +95,28 @@ async fn test_full_node_bootstrap_outputs_no_compression() { test_full_node_sync(vfn_peer_id, &mut swarm, true).await; } +#[tokio::test] +async fn test_full_node_bootstrap_outputs_exponential_backoff() { + // Create a validator swarm of 1 validator node + let mut swarm = new_local_swarm_with_aptos(1).await; + + // Create a fullnode config that uses transaction outputs to sync with a small timeout + let mut vfn_config = NodeConfig::default_for_validator_full_node(); + vfn_config.state_sync.state_sync_driver.bootstrapping_mode = + BootstrappingMode::ApplyTransactionOutputsFromGenesis; + vfn_config + .state_sync + .state_sync_driver + .continuous_syncing_mode = ContinuousSyncingMode::ApplyTransactionOutputs; + vfn_config.state_sync.aptos_data_client.response_timeout_ms = 1; + + // Create the fullnode + let vfn_peer_id = create_full_node(vfn_config, &mut swarm).await; + + // Test the ability of the fullnode to sync + test_full_node_sync(vfn_peer_id, &mut swarm, true).await; +} + #[tokio::test] async fn test_full_node_bootstrap_transactions() { // Create a validator swarm of 1 validator node @@ -356,6 +378,26 @@ async fn test_validator_bootstrap_state_snapshot_network_limit_tiny() { test_validator_sync(&mut swarm, 1).await; } +#[tokio::test] +async fn test_validator_bootstrap_state_snapshot_exponential_backoff() { + // Create a swarm of 4 validators using state snapshot syncing and a small response timeout + let mut swarm = SwarmBuilder::new_local(4) + .with_aptos() + .with_init_config(Arc::new(|_, config, _| { + config.state_sync.state_sync_driver.bootstrapping_mode = + BootstrappingMode::DownloadLatestStates; + config.state_sync.state_sync_driver.continuous_syncing_mode = + ContinuousSyncingMode::ApplyTransactionOutputs; + config.state_sync.aptos_data_client.use_compression = false; + config.state_sync.aptos_data_client.response_timeout_ms = 1; + })) + .build() + .await; + + // Test the ability of the validators to sync + test_validator_sync(&mut swarm, 1).await; +} + #[tokio::test] async fn test_validator_bootstrap_transactions() { // Create a swarm of 4 validators using transaction syncing @@ -415,6 +457,25 @@ async fn test_validator_bootstrap_transactions_network_limit_tiny() { test_validator_sync(&mut swarm, 1).await; } +#[tokio::test] +async fn test_validator_bootstrap_outputs_network_exponential_backoff() { + // Create a swarm of 4 validators using output syncing and a small response timeout + let mut swarm = SwarmBuilder::new_local(4) + .with_aptos() + .with_init_config(Arc::new(|_, config, _| { + config.state_sync.state_sync_driver.bootstrapping_mode = + BootstrappingMode::ApplyTransactionOutputsFromGenesis; + config.state_sync.state_sync_driver.continuous_syncing_mode = + ContinuousSyncingMode::ApplyTransactionOutputs; + config.state_sync.aptos_data_client.response_timeout_ms = 1; + })) + .build() + .await; + + // Test the ability of the validators to sync + test_validator_sync(&mut swarm, 1).await; +} + #[tokio::test] async fn test_validator_bootstrap_transactions_no_compression() { // Create a swarm of 4 validators using transaction syncing and no compression