Skip to content

Commit

Permalink
[State Sync] Add custom timeout for subscription requests and backoff.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Nov 24, 2022
1 parent 49d77e6 commit 7e7deef
Show file tree
Hide file tree
Showing 16 changed files with 1,268 additions and 241 deletions.
21 changes: 12 additions & 9 deletions aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ mod log_build_information;
use anyhow::{anyhow, Context};
use aptos_api::bootstrap as bootstrap_api;
use aptos_build_info::build_information;
use aptos_config::config::StateSyncConfig;
use aptos_config::{
config::{
AptosDataClientConfig, BaseConfig, DataStreamingServiceConfig, NetworkConfig, NodeConfig,
PersistableConfig, StorageServiceConfig,
AptosDataClientConfig, BaseConfig, NetworkConfig, NodeConfig, PersistableConfig,
StorageServiceConfig,
},
network_id::NetworkId,
utils::get_genesis_txn,
Expand Down Expand Up @@ -412,10 +413,8 @@ fn create_state_sync_runtimes<M: MempoolNotificationSender + 'static>(
)?;

// Start the data streaming service
let (streaming_service_client, streaming_service_runtime) = setup_data_streaming_service(
node_config.state_sync.data_streaming_service,
aptos_data_client.clone(),
)?;
let (streaming_service_client, streaming_service_runtime) =
setup_data_streaming_service(node_config.state_sync.clone(), aptos_data_client.clone())?;

// Create the chunk executor and persistent storage
let chunk_executor = Arc::new(ChunkExecutor::<AptosVM>::new(db_rw.clone()));
Expand Down Expand Up @@ -446,14 +445,18 @@ fn create_state_sync_runtimes<M: MempoolNotificationSender + 'static>(
}

fn setup_data_streaming_service(
config: DataStreamingServiceConfig,
state_sync_config: StateSyncConfig,
aptos_data_client: AptosNetDataClient,
) -> anyhow::Result<(StreamingServiceClient, Runtime)> {
// Create the data streaming service
let (streaming_service_client, streaming_service_listener) =
new_streaming_service_client_listener_pair();
let data_streaming_service =
DataStreamingService::new(config, aptos_data_client, streaming_service_listener);
let data_streaming_service = DataStreamingService::new(
state_sync_config.aptos_data_client,
state_sync_config.data_streaming_service,
aptos_data_client,
streaming_service_listener,
);

// Start the data streaming service
let streaming_service_runtime = Builder::new_multi_thread()
Expand Down
18 changes: 11 additions & 7 deletions config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ pub struct StateSyncDriverConfig {
impl Default for StateSyncDriverConfig {
fn default() -> Self {
Self {
bootstrapping_mode: BootstrappingMode::ExecuteTransactionsFromGenesis,
bootstrapping_mode: BootstrappingMode::ApplyTransactionOutputsFromGenesis,
commit_notification_timeout_ms: 5000,
continuous_syncing_mode: ContinuousSyncingMode::ExecuteTransactions,
continuous_syncing_mode: ContinuousSyncingMode::ApplyTransactionOutputs,
progress_check_interval_ms: 100,
max_connection_deadline_secs: 10,
max_consecutive_stream_notifications: 10,
max_num_stream_timeouts: 6,
max_num_stream_timeouts: 12,
max_pending_data_chunks: 100,
max_stream_wait_time_ms: 5000,
num_versions_to_skip_snapshot_sync: 100_000_000, // At 5k TPS, this allows a node to fail for about 6 hours.
Expand Down Expand Up @@ -170,17 +170,21 @@ impl Default for DataStreamingServiceConfig {
pub struct AptosDataClientConfig {
pub max_num_in_flight_priority_polls: u64, // Max num of in-flight polls for priority peers
pub max_num_in_flight_regular_polls: u64, // Max num of in-flight polls for regular peers
pub response_timeout_ms: u64, // Timeout (in milliseconds) when waiting for a response
pub summary_poll_interval_ms: u64, // Interval (in milliseconds) between data summary polls
pub use_compression: bool, // Whether or not to request compression for incoming data
pub max_response_timeout_ms: u64, // Max timeout (in ms) when waiting for a response (after exponential increases)
pub response_timeout_ms: u64, // First timeout (in ms) when waiting for a response
pub subscription_timeout_ms: u64, // Timeout (in ms) when waiting for a subscription response
pub summary_poll_interval_ms: u64, // Interval (in ms) between data summary polls
pub use_compression: bool, // Whether or not to request compression for incoming data
}

impl Default for AptosDataClientConfig {
fn default() -> Self {
Self {
max_num_in_flight_priority_polls: 10,
max_num_in_flight_regular_polls: 10,
response_timeout_ms: 20000, // 20 seconds
max_response_timeout_ms: 60000, // 60 seconds
response_timeout_ms: 10000, // 10 seconds
subscription_timeout_ms: 5000, // 5 seconds
summary_poll_interval_ms: 200,
use_compression: true,
}
Expand Down
24 changes: 13 additions & 11 deletions network/src/protocols/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,21 +548,23 @@ impl OutboundRpcs {
latency,
);
}
Err(err) => {
if let RpcError::UnexpectedResponseChannelCancel = err {
Err(error) => {
if let RpcError::UnexpectedResponseChannelCancel = error {
// We don't log when the application has dropped the RPC
// response channel because this is often expected (e.g.,
// on state sync subscription requests that timeout).
counters::rpc_messages(network_context, REQUEST_LABEL, CANCELED_LABEL).inc();
} else {
counters::rpc_messages(network_context, REQUEST_LABEL, FAILED_LABEL).inc();
warn!(
NetworkSchema::new(network_context).remote_peer(peer_id),
"{} Error making outbound RPC request to {} (request_id {}). Error: {}",
network_context,
peer_id.short_str(),
request_id,
error
);
}

warn!(
NetworkSchema::new(network_context).remote_peer(peer_id),
"{} Error making outbound rpc request with request_id {} to {}: {}",
network_context,
request_id,
peer_id.short_str(),
err
);
}
}
}
Expand Down
54 changes: 38 additions & 16 deletions state-sync/aptos-data-client/src/aptosnet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ impl AptosNetDataClient {
async fn send_request_and_decode<T, E>(
&self,
request: StorageServiceRequest,
request_timeout_ms: u64,
) -> Result<Response<T>>
where
T: TryFrom<StorageServiceResponse, Error = E>,
Expand All @@ -311,20 +312,24 @@ impl AptosNetDataClient {
error
})?;
let _timer = start_request_timer(&metrics::REQUEST_LATENCIES, &request.get_label(), peer);
self.send_request_to_peer_and_decode(peer, request).await
self.send_request_to_peer_and_decode(peer, request, request_timeout_ms)
.await
}

/// Sends a request to a specific peer and decodes the response
async fn send_request_to_peer_and_decode<T, E>(
&self,
peer: PeerNetworkId,
request: StorageServiceRequest,
request_timeout_ms: u64,
) -> Result<Response<T>>
where
T: TryFrom<StorageServiceResponse, Error = E>,
E: Into<Error>,
{
let response = self.send_request_to_peer(peer, request.clone()).await?;
let response = self
.send_request_to_peer(peer, request.clone(), request_timeout_ms)
.await?;

let (context, storage_response) = response.into_parts();

Expand Down Expand Up @@ -359,9 +364,9 @@ impl AptosNetDataClient {
&self,
peer: PeerNetworkId,
request: StorageServiceRequest,
request_timeout_ms: u64,
) -> Result<Response<StorageServiceResponse>, Error> {
let id = self.next_response_id();

trace!(
(LogSchema::new(LogEntry::StorageServiceRequest)
.event(LogEvent::SendRequest)
Expand All @@ -370,18 +375,17 @@ impl AptosNetDataClient {
.peer(&peer)
.request_data(&request))
);

increment_request_counter(&metrics::SENT_REQUESTS, &request.get_label(), peer);

// Send the request and process the result
let result = self
.network_client
.send_request(
peer,
request.clone(),
Duration::from_millis(self.data_client_config.response_timeout_ms),
Duration::from_millis(request_timeout_ms),
)
.await;

match result {
Ok(response) => {
trace!(
Expand Down Expand Up @@ -476,36 +480,41 @@ impl AptosDataClient for AptosNetDataClient {
&self,
start_epoch: Epoch,
expected_end_epoch: Epoch,
request_timeout_ms: u64,
) -> Result<Response<Vec<LedgerInfoWithSignatures>>> {
let data_request = DataRequest::GetEpochEndingLedgerInfos(EpochEndingLedgerInfoRequest {
start_epoch,
expected_end_epoch,
});
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
let response: Response<EpochChangeProof> =
self.send_request_and_decode(storage_request).await?;
let response: Response<EpochChangeProof> = self
.send_request_and_decode(storage_request, request_timeout_ms)
.await?;
Ok(response.map(|epoch_change| epoch_change.ledger_info_with_sigs))
}

async fn get_new_transaction_outputs_with_proof(
&self,
known_version: Version,
known_epoch: Epoch,
request_timeout_ms: u64,
) -> Result<Response<(TransactionOutputListWithProof, LedgerInfoWithSignatures)>> {
let data_request =
DataRequest::GetNewTransactionOutputsWithProof(NewTransactionOutputsWithProofRequest {
known_version,
known_epoch,
});
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request).await
self.send_request_and_decode(storage_request, request_timeout_ms)
.await
}

async fn get_new_transactions_with_proof(
&self,
known_version: Version,
known_epoch: Epoch,
include_events: bool,
request_timeout_ms: u64,
) -> Result<Response<(TransactionListWithProof, LedgerInfoWithSignatures)>> {
let data_request =
DataRequest::GetNewTransactionsWithProof(NewTransactionsWithProofRequest {
Expand All @@ -514,35 +523,44 @@ impl AptosDataClient for AptosNetDataClient {
include_events,
});
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request).await
self.send_request_and_decode(storage_request, request_timeout_ms)
.await
}

async fn get_number_of_states(&self, version: Version) -> Result<Response<u64>> {
async fn get_number_of_states(
&self,
version: Version,
request_timeout_ms: u64,
) -> Result<Response<u64>> {
let data_request = DataRequest::GetNumberOfStatesAtVersion(version);
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request).await
self.send_request_and_decode(storage_request, request_timeout_ms)
.await
}

async fn get_state_values_with_proof(
&self,
version: u64,
start_index: u64,
end_index: u64,
request_timeout_ms: u64,
) -> Result<Response<StateValueChunkWithProof>> {
let data_request = DataRequest::GetStateValuesWithProof(StateValuesWithProofRequest {
version,
start_index,
end_index,
});
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request).await
self.send_request_and_decode(storage_request, request_timeout_ms)
.await
}

async fn get_transaction_outputs_with_proof(
&self,
proof_version: Version,
start_version: Version,
end_version: Version,
request_timeout_ms: u64,
) -> Result<Response<TransactionOutputListWithProof>> {
let data_request =
DataRequest::GetTransactionOutputsWithProof(TransactionOutputsWithProofRequest {
Expand All @@ -551,7 +569,8 @@ impl AptosDataClient for AptosNetDataClient {
end_version,
});
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request).await
self.send_request_and_decode(storage_request, request_timeout_ms)
.await
}

async fn get_transactions_with_proof(
Expand All @@ -560,6 +579,7 @@ impl AptosDataClient for AptosNetDataClient {
start_version: Version,
end_version: Version,
include_events: bool,
request_timeout_ms: u64,
) -> Result<Response<TransactionListWithProof>> {
let data_request = DataRequest::GetTransactionsWithProof(TransactionsWithProofRequest {
proof_version,
Expand All @@ -568,7 +588,8 @@ impl AptosDataClient for AptosNetDataClient {
include_events,
});
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request).await
self.send_request_and_decode(storage_request, request_timeout_ms)
.await
}
}

Expand Down Expand Up @@ -763,6 +784,7 @@ pub(crate) fn poll_peer(
let data_request = DataRequest::GetStorageServerSummary;
let storage_request =
StorageServiceRequest::new(data_request, data_client.use_compression());
let request_timeout = data_client.data_client_config.response_timeout_ms;

// Start the peer polling timer
let timer = start_request_timer(
Expand All @@ -773,7 +795,7 @@ pub(crate) fn poll_peer(

// Fetch the storage summary for the peer and stop the timer
let result: Result<StorageServerSummary> = data_client
.send_request_to_peer_and_decode(peer, storage_request)
.send_request_to_peer_and_decode(peer, storage_request, request_timeout)
.await
.map(Response::into_payload);
drop(timer);
Expand Down
Loading

0 comments on commit 7e7deef

Please sign in to comment.