Skip to content

Commit

Permalink
[State Sync] Add server side implementation of intelligent syncing mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Dec 6, 2022
1 parent 4bc0030 commit 76e743c
Show file tree
Hide file tree
Showing 9 changed files with 1,201 additions and 28 deletions.
2 changes: 2 additions & 0 deletions config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl Default for DataStreamingServiceConfig {
pub struct AptosDataClientConfig {
pub max_num_in_flight_priority_polls: u64, // Max num of in-flight polls for priority peers
pub max_num_in_flight_regular_polls: u64, // Max num of in-flight polls for regular peers
pub max_num_output_reductions: u64, // The max num of output reductions before transactions are returned
pub max_response_timeout_ms: u64, // Max timeout (in ms) when waiting for a response (after exponential increases)
pub response_timeout_ms: u64, // First timeout (in ms) when waiting for a response
pub subscription_timeout_ms: u64, // Timeout (in ms) when waiting for a subscription response
Expand All @@ -182,6 +183,7 @@ impl Default for AptosDataClientConfig {
Self {
max_num_in_flight_priority_polls: 10,
max_num_in_flight_regular_polls: 10,
max_num_output_reductions: 2,
max_response_timeout_ms: 60000, // 60 seconds
response_timeout_ms: 10000, // 10 seconds
subscription_timeout_ms: 5000, // 5 seconds
Expand Down
90 changes: 73 additions & 17 deletions state-sync/aptos-data-client/src/aptosnet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@ use std::{convert::TryFrom, fmt, sync::Arc, time::Duration};
use storage_service_client::StorageServiceClient;
use storage_service_types::requests::{
DataRequest, EpochEndingLedgerInfoRequest, NewTransactionOutputsWithProofRequest,
NewTransactionsWithProofRequest, StateValuesWithProofRequest, StorageServiceRequest,
TransactionOutputsWithProofRequest, TransactionsWithProofRequest,
NewTransactionsOrOutputsWithProofRequest, NewTransactionsWithProofRequest,
StateValuesWithProofRequest, StorageServiceRequest, TransactionOutputsWithProofRequest,
TransactionsOrOutputsWithProofRequest, TransactionsWithProofRequest,
};
use storage_service_types::responses::{
StorageServerSummary, StorageServiceResponse, TransactionOrOutputListWithProof,
};
use storage_service_types::responses::{StorageServerSummary, StorageServiceResponse};
use storage_service_types::Epoch;
use tokio::{runtime::Handle, task::JoinHandle};

Expand Down Expand Up @@ -128,6 +131,11 @@ impl AptosNetDataClient {
self.data_client_config.use_compression
}

/// Returns the max number of output reductions as defined by the config
fn get_max_num_output_reductions(&self) -> u64 {
self.data_client_config.max_num_output_reductions
}

/// Generates a new response id
fn next_response_id(&self) -> u64 {
self.response_id_generator.next()
Expand Down Expand Up @@ -468,6 +476,22 @@ impl AptosNetDataClient {
.write()
.update_score_error(peer, error_type);
}

/// Creates a storage service request using the given data request
/// and sends it across the network
async fn create_and_send_storage_request<T, E>(
&self,
request_timeout_ms: u64,
data_request: DataRequest,
) -> Result<Response<T>>
where
T: TryFrom<StorageServiceResponse, Error = E>,
E: Into<Error>,
{
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request, request_timeout_ms)
.await
}
}

#[async_trait]
Expand All @@ -486,9 +510,8 @@ impl AptosDataClient for AptosNetDataClient {
start_epoch,
expected_end_epoch,
});
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
let response: Response<EpochChangeProof> = self
.send_request_and_decode(storage_request, request_timeout_ms)
.create_and_send_storage_request(request_timeout_ms, data_request)
.await?;
Ok(response.map(|epoch_change| epoch_change.ledger_info_with_sigs))
}
Expand All @@ -504,8 +527,7 @@ impl AptosDataClient for AptosNetDataClient {
known_version,
known_epoch,
});
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request, request_timeout_ms)
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

Expand All @@ -522,8 +544,26 @@ impl AptosDataClient for AptosNetDataClient {
known_epoch,
include_events,
});
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request, request_timeout_ms)
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

async fn get_new_transactions_or_outputs_with_proof(
&self,
known_version: Version,
known_epoch: Epoch,
include_events: bool,
request_timeout_ms: u64,
) -> Result<Response<(TransactionOrOutputListWithProof, LedgerInfoWithSignatures)>> {
let data_request = DataRequest::GetNewTransactionsOrOutputsWithProof(
NewTransactionsOrOutputsWithProofRequest {
known_version,
known_epoch,
include_events,
max_num_output_reductions: self.get_max_num_output_reductions(),
},
);
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

Expand All @@ -533,8 +573,7 @@ impl AptosDataClient for AptosNetDataClient {
request_timeout_ms: u64,
) -> Result<Response<u64>> {
let data_request = DataRequest::GetNumberOfStatesAtVersion(version);
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request, request_timeout_ms)
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

Expand All @@ -550,8 +589,7 @@ impl AptosDataClient for AptosNetDataClient {
start_index,
end_index,
});
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request, request_timeout_ms)
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

Expand All @@ -568,8 +606,7 @@ impl AptosDataClient for AptosNetDataClient {
start_version,
end_version,
});
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request, request_timeout_ms)
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

Expand All @@ -587,8 +624,27 @@ impl AptosDataClient for AptosNetDataClient {
end_version,
include_events,
});
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request, request_timeout_ms)
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

async fn get_transactions_or_outputs_with_proof(
&self,
proof_version: Version,
start_version: Version,
end_version: Version,
include_events: bool,
request_timeout_ms: u64,
) -> Result<Response<TransactionOrOutputListWithProof>> {
let data_request =
DataRequest::GetTransactionsOrOutputsWithProof(TransactionsOrOutputsWithProofRequest {
proof_version,
start_version,
end_version,
include_events,
max_num_output_reductions: self.get_max_num_output_reductions(),
});
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}
}
Expand Down
71 changes: 71 additions & 0 deletions state-sync/aptos-data-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use async_trait::async_trait;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::{fmt, fmt::Display};
use storage_service_types::responses::TransactionOrOutputListWithProof;
use storage_service_types::{responses::CompleteDataRange, Epoch};
use thiserror::Error;

Expand Down Expand Up @@ -102,6 +103,18 @@ pub trait AptosDataClient {
request_timeout_ms: u64,
) -> Result<Response<(TransactionListWithProof, LedgerInfoWithSignatures)>>;

/// Fetches a new transaction or output list with proof. Versions start at
/// `known_version + 1` and `known_epoch` (inclusive). The end version
/// and proof version are specified by the server. If the data cannot be
/// fetched, an error is returned.
async fn get_new_transactions_or_outputs_with_proof(
&self,
known_version: Version,
known_epoch: Epoch,
include_events: bool,
request_timeout_ms: u64,
) -> Result<Response<(TransactionOrOutputListWithProof, LedgerInfoWithSignatures)>>;

/// Fetches the number of states at the specified version.
async fn get_number_of_states(
&self,
Expand Down Expand Up @@ -149,6 +162,21 @@ pub trait AptosDataClient {
include_events: bool,
request_timeout_ms: u64,
) -> Result<Response<TransactionListWithProof>>;

/// Fetches a transaction or output list with proof, with data from
/// start to end versions (inclusive). The proof is relative to the
/// specified `proof_version`. If `include_events` is true, events are
/// included in the proof. In some cases, fewer data items may be returned
/// (e.g., to tolerate network or chunk limits). If the data cannot
/// be fetched, an error is returned.
async fn get_transactions_or_outputs_with_proof(
&self,
proof_version: Version,
start_version: Version,
end_version: Version,
include_events: bool,
request_timeout_ms: u64,
) -> Result<Response<TransactionOrOutputListWithProof>>;
}

/// A response error that users of the Aptos Data Client can use to notify
Expand Down Expand Up @@ -271,11 +299,37 @@ impl From<(TransactionListWithProof, LedgerInfoWithSignatures)> for ResponsePayl
}
}

impl TryFrom<(TransactionOrOutputListWithProof, LedgerInfoWithSignatures)> for ResponsePayload {
type Error = Error;

fn try_from(
inner: (TransactionOrOutputListWithProof, LedgerInfoWithSignatures),
) -> Result<Self, Error> {
let ((transaction_list, output_list), ledger_info) = inner;
if let Some(transaction_list) = transaction_list {
Ok(Self::NewTransactionsWithProof((
transaction_list,
ledger_info,
)))
} else if let Some(output_list) = output_list {
Ok(Self::NewTransactionOutputsWithProof((
output_list,
ledger_info,
)))
} else {
Err(Error::InvalidResponse(
"Invalid response! No transaction or output list was returned!".into(),
))
}
}
}

impl From<u64> for ResponsePayload {
fn from(inner: u64) -> Self {
Self::NumberOfStates(inner)
}
}

impl From<TransactionOutputListWithProof> for ResponsePayload {
fn from(inner: TransactionOutputListWithProof) -> Self {
Self::TransactionOutputsWithProof(inner)
Expand All @@ -288,6 +342,23 @@ impl From<TransactionListWithProof> for ResponsePayload {
}
}

impl TryFrom<TransactionOrOutputListWithProof> for ResponsePayload {
type Error = Error;

fn try_from(inner: TransactionOrOutputListWithProof) -> Result<Self, Error> {
let (transaction_list, output_list) = inner;
if let Some(transaction_list) = transaction_list {
Ok(Self::TransactionsWithProof(transaction_list))
} else if let Some(output_list) = output_list {
Ok(Self::TransactionOutputsWithProof(output_list))
} else {
Err(Error::InvalidResponse(
"Invalid response! No transaction or output list was returned!".into(),
))
}
}
}

/// A snapshot of the global state of data available in the Aptos network.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct GlobalDataSummary {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use storage_service_types::requests::{
NewTransactionsWithProofRequest, StateValuesWithProofRequest,
TransactionOutputsWithProofRequest, TransactionsWithProofRequest,
};
use storage_service_types::responses::CompleteDataRange;
use storage_service_types::responses::{CompleteDataRange, TransactionOrOutputListWithProof};
use storage_service_types::Epoch;
use tokio::time::timeout;

Expand Down Expand Up @@ -435,6 +435,18 @@ impl AptosDataClient for MockAptosDataClient {
}
}

async fn get_new_transactions_or_outputs_with_proof(
&self,
_known_version: Version,
_known_epoch: Epoch,
_include_events: bool,
_request_timeout_ms: u64,
) -> aptos_data_client::Result<
Response<(TransactionOrOutputListWithProof, LedgerInfoWithSignatures)>,
> {
todo!() // Implement when we have a client side implementation
}

async fn get_number_of_states(
&self,
version: Version,
Expand Down Expand Up @@ -513,6 +525,17 @@ impl AptosDataClient for MockAptosDataClient {
// Return the transaction list with proofs
Ok(create_data_client_response(transaction_list_with_proof))
}

async fn get_transactions_or_outputs_with_proof(
&self,
_proof_version: Version,
_start_version: Version,
_end_version: Version,
_include_events: bool,
_request_timeout_ms: u64,
) -> aptos_data_client::Result<Response<TransactionOrOutputListWithProof>> {
todo!() // Implement when we have a client side implementation
}
}

#[derive(Debug)]
Expand Down
Loading

0 comments on commit 76e743c

Please sign in to comment.