Skip to content

Commit

Permalink
[State sync] Add client-side of new state syncing mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Dec 17, 2022
1 parent abf10b4 commit 1e9c313
Show file tree
Hide file tree
Showing 22 changed files with 1,752 additions and 272 deletions.
1 change: 1 addition & 0 deletions aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ fn create_state_sync_runtimes<M: MempoolNotificationSender + 'static>(
event_subscription_service,
aptos_data_client,
streaming_service_client,
TimeService::real(),
);

// Create and return the new state sync handle
Expand Down
12 changes: 10 additions & 2 deletions config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub enum BootstrappingMode {
ApplyTransactionOutputsFromGenesis, // Applies transaction outputs (starting at genesis)
DownloadLatestStates, // Downloads the state keys and values (at the latest version)
ExecuteTransactionsFromGenesis, // Executes transactions (starting at genesis)
ExecuteOrApplyFromGenesis, // Executes transactions or applies outputs from genesis (whichever is faster)
}

impl BootstrappingMode {
Expand All @@ -34,6 +35,7 @@ impl BootstrappingMode {
BootstrappingMode::ExecuteTransactionsFromGenesis => {
"execute_transactions_from_genesis"
}
BootstrappingMode::ExecuteOrApplyFromGenesis => "execute_or_apply_from_genesis",
}
}
}
Expand All @@ -45,13 +47,17 @@ impl BootstrappingMode {
pub enum ContinuousSyncingMode {
ApplyTransactionOutputs, // Applies transaction outputs to stay up-to-date
ExecuteTransactions, // Executes transactions to stay up-to-date
ExecuteTransactionsOrApplyOutputs, // Executes transactions or applies outputs to stay up-to-date (whichever is faster)
}

impl ContinuousSyncingMode {
pub fn to_label(&self) -> &'static str {
match self {
ContinuousSyncingMode::ApplyTransactionOutputs => "apply_transaction_outputs",
ContinuousSyncingMode::ExecuteTransactions => "execute_transactions",
ContinuousSyncingMode::ExecuteTransactionsOrApplyOutputs => {
"execute_transactions_or_apply_outputs"
}
}
}
}
Expand All @@ -62,6 +68,7 @@ pub struct StateSyncDriverConfig {
pub bootstrapping_mode: BootstrappingMode, // The mode by which to bootstrap
pub commit_notification_timeout_ms: u64, // The max time taken to process a commit notification
pub continuous_syncing_mode: ContinuousSyncingMode, // The mode by which to sync after bootstrapping
pub fallback_to_output_syncing_secs: u64, // The duration to fallback to output syncing after an execution failure
pub progress_check_interval_ms: u64, // The interval (ms) at which to check state sync progress
pub max_connection_deadline_secs: u64, // The max time (secs) to wait for connections from peers
pub max_consecutive_stream_notifications: u64, // The max number of notifications to process per driver loop
Expand All @@ -79,6 +86,7 @@ impl Default for StateSyncDriverConfig {
bootstrapping_mode: BootstrappingMode::ApplyTransactionOutputsFromGenesis,
commit_notification_timeout_ms: 5000,
continuous_syncing_mode: ContinuousSyncingMode::ApplyTransactionOutputs,
fallback_to_output_syncing_secs: 120, // 2 minutes
progress_check_interval_ms: 100,
max_connection_deadline_secs: 10,
max_consecutive_stream_notifications: 10,
Expand Down Expand Up @@ -116,7 +124,7 @@ impl Default for StorageServiceConfig {
max_state_chunk_size: 2000,
max_subscription_period_ms: 5000,
max_transaction_chunk_size: 2000,
max_transaction_output_chunk_size: 2000,
max_transaction_output_chunk_size: 1000,
storage_summary_refresh_interval_ms: 50,
}
}
Expand Down Expand Up @@ -183,7 +191,7 @@ impl Default for AptosDataClientConfig {
Self {
max_num_in_flight_priority_polls: 10,
max_num_in_flight_regular_polls: 10,
max_num_output_reductions: 2,
max_num_output_reductions: 0,
max_response_timeout_ms: 60000, // 60 seconds
response_timeout_ms: 10000, // 10 seconds
subscription_timeout_ms: 5000, // 5 seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub enum DataClientRequest {
StateValuesWithProof(StateValuesWithProofRequest),
TransactionsWithProof(TransactionsWithProofRequest),
TransactionOutputsWithProof(TransactionOutputsWithProofRequest),
NewTransactionsOrOutputsWithProof(NewTransactionsOrOutputsWithProofRequest),
TransactionsOrOutputsWithProof(TransactionsOrOutputsWithProofRequest),
}

impl DataClientRequest {
Expand All @@ -56,6 +58,8 @@ impl DataClientRequest {
Self::StateValuesWithProof(_) => "state_values_with_proof",
Self::TransactionsWithProof(_) => "transactions_with_proof",
Self::TransactionOutputsWithProof(_) => "transaction_outputs_with_proof",
Self::NewTransactionsOrOutputsWithProof(_) => "new_transactions_or_outputs_with_proof",
Self::TransactionsOrOutputsWithProof(_) => "transactions_or_outputs_with_proof",
}
}
}
Expand Down Expand Up @@ -83,6 +87,14 @@ pub struct NewTransactionsWithProofRequest {
pub include_events: bool,
}

/// A client request for fetching new transactions or outputs with proofs.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct NewTransactionsOrOutputsWithProofRequest {
pub known_version: Version,
pub known_epoch: Epoch,
pub include_events: bool,
}

/// A client request for fetching new transaction outputs with proofs.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct NewTransactionOutputsWithProofRequest {
Expand Down Expand Up @@ -113,6 +125,15 @@ pub struct TransactionOutputsWithProofRequest {
pub proof_version: Version,
}

/// A client request for fetching transaction or outputs with proofs.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct TransactionsOrOutputsWithProofRequest {
pub start_version: Version,
pub end_version: Version,
pub proof_version: Version,
pub include_events: bool,
}

/// A pending client response where data has been requested from the
/// network and will be available in `client_response` when received.
pub struct PendingClientResponse {
Expand Down
72 changes: 72 additions & 0 deletions state-sync/state-sync-v2/data-streaming-service/src/data_stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

use crate::data_notification::{
NewTransactionsOrOutputsWithProofRequest, TransactionsOrOutputsWithProofRequest,
};
use crate::metrics::increment_counter_multiple;
use crate::{
data_notification,
Expand Down Expand Up @@ -728,6 +731,15 @@ fn sanity_check_client_response(
ResponsePayload::NewTransactionsWithProof(_)
)
}
DataClientRequest::NewTransactionsOrOutputsWithProof(_) => {
matches!(
data_client_response.payload,
ResponsePayload::NewTransactionsWithProof(_)
) || matches!(
data_client_response.payload,
ResponsePayload::NewTransactionOutputsWithProof(_)
)
}
DataClientRequest::NumberOfStates(_) => {
matches!(
data_client_response.payload,
Expand All @@ -752,6 +764,15 @@ fn sanity_check_client_response(
ResponsePayload::TransactionOutputsWithProof(_)
)
}
DataClientRequest::TransactionsOrOutputsWithProof(_) => {
matches!(
data_client_response.payload,
ResponsePayload::TransactionsWithProof(_)
) || matches!(
data_client_response.payload,
ResponsePayload::TransactionOutputsWithProof(_)
)
}
}
}

Expand Down Expand Up @@ -808,6 +829,14 @@ fn spawn_request_task<T: AptosDataClient + Send + Clone + 'static>(
)
.await
}
DataClientRequest::NewTransactionsOrOutputsWithProof(request) => {
get_new_transactions_or_outputs_with_proof(
aptos_data_client,
request,
request_timeout_ms,
)
.await
}
DataClientRequest::NumberOfStates(request) => {
get_number_of_states(aptos_data_client, request, request_timeout_ms).await
}
Expand All @@ -821,6 +850,14 @@ fn spawn_request_task<T: AptosDataClient + Send + Clone + 'static>(
DataClientRequest::TransactionsWithProof(request) => {
get_transactions_with_proof(aptos_data_client, request, request_timeout_ms).await
}
DataClientRequest::TransactionsOrOutputsWithProof(request) => {
get_transactions_or_outputs_with_proof(
aptos_data_client,
request,
request_timeout_ms,
)
.await
}
};

// Increment the appropriate counter depending on the response
Expand Down Expand Up @@ -903,6 +940,21 @@ async fn get_new_transactions_with_proof<T: AptosDataClient + Send + Clone + 'st
.map(|response| response.map(ResponsePayload::from))
}

async fn get_new_transactions_or_outputs_with_proof<T: AptosDataClient + Send + Clone + 'static>(
aptos_data_client: T,
request: NewTransactionsOrOutputsWithProofRequest,
request_timeout_ms: u64,
) -> Result<Response<ResponsePayload>, aptos_data_client::Error> {
let client_response = aptos_data_client.get_new_transactions_or_outputs_with_proof(
request.known_version,
request.known_epoch,
request.include_events,
request_timeout_ms,
);
let (context, payload) = client_response.await?.into_parts();
Ok(Response::new(context, ResponsePayload::try_from(payload)?))
}

async fn get_number_of_states<T: AptosDataClient + Send + Clone + 'static>(
aptos_data_client: T,
request: NumberOfStatesRequest,
Expand Down Expand Up @@ -948,11 +1000,31 @@ async fn get_transactions_with_proof<T: AptosDataClient + Send + Clone + 'static
.map(|response| response.map(ResponsePayload::from))
}

async fn get_transactions_or_outputs_with_proof<T: AptosDataClient + Send + Clone + 'static>(
aptos_data_client: T,
request: TransactionsOrOutputsWithProofRequest,
request_timeout_ms: u64,
) -> Result<Response<ResponsePayload>, aptos_data_client::Error> {
let client_response = aptos_data_client.get_transactions_or_outputs_with_proof(
request.proof_version,
request.start_version,
request.end_version,
request.include_events,
request_timeout_ms,
);
let (context, payload) = client_response.await?.into_parts();
Ok(Response::new(context, ResponsePayload::try_from(payload)?))
}

/// Returns true iff the given request is a subscription request
fn is_subscription_request(request: &DataClientRequest) -> bool {
matches!(request, DataClientRequest::NewTransactionsWithProof(_))
|| matches!(
request,
DataClientRequest::NewTransactionOutputsWithProof(_)
)
|| matches!(
request,
DataClientRequest::NewTransactionsOrOutputsWithProof(_)
)
}
Loading

0 comments on commit 1e9c313

Please sign in to comment.