Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[State Sync] Add server side implementation of semi-intelligent syncing mode. #5742

Merged
merged 1 commit into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ impl Default for DataStreamingServiceConfig {
pub struct AptosDataClientConfig {
pub max_num_in_flight_priority_polls: u64, // Max num of in-flight polls for priority peers
pub max_num_in_flight_regular_polls: u64, // Max num of in-flight polls for regular peers
pub max_num_output_reductions: u64, // The max num of output reductions before transactions are returned
pub max_response_timeout_ms: u64, // Max timeout (in ms) when waiting for a response (after exponential increases)
pub response_timeout_ms: u64, // First timeout (in ms) when waiting for a response
pub subscription_timeout_ms: u64, // Timeout (in ms) when waiting for a subscription response
Expand All @@ -182,6 +183,7 @@ impl Default for AptosDataClientConfig {
Self {
max_num_in_flight_priority_polls: 10,
max_num_in_flight_regular_polls: 10,
max_num_output_reductions: 2,
max_response_timeout_ms: 60000, // 60 seconds
response_timeout_ms: 10000, // 10 seconds
subscription_timeout_ms: 5000, // 5 seconds
Expand Down
90 changes: 73 additions & 17 deletions state-sync/aptos-data-client/src/aptosnet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@ use std::{convert::TryFrom, fmt, sync::Arc, time::Duration};
use storage_service_client::StorageServiceClient;
use storage_service_types::requests::{
DataRequest, EpochEndingLedgerInfoRequest, NewTransactionOutputsWithProofRequest,
NewTransactionsWithProofRequest, StateValuesWithProofRequest, StorageServiceRequest,
TransactionOutputsWithProofRequest, TransactionsWithProofRequest,
NewTransactionsOrOutputsWithProofRequest, NewTransactionsWithProofRequest,
StateValuesWithProofRequest, StorageServiceRequest, TransactionOutputsWithProofRequest,
TransactionsOrOutputsWithProofRequest, TransactionsWithProofRequest,
};
use storage_service_types::responses::{
StorageServerSummary, StorageServiceResponse, TransactionOrOutputListWithProof,
};
use storage_service_types::responses::{StorageServerSummary, StorageServiceResponse};
use storage_service_types::Epoch;
use tokio::{runtime::Handle, task::JoinHandle};

Expand Down Expand Up @@ -128,6 +131,11 @@ impl AptosNetDataClient {
self.data_client_config.use_compression
}

/// Returns the max number of output reductions as defined by the config
fn get_max_num_output_reductions(&self) -> u64 {
self.data_client_config.max_num_output_reductions
}

/// Generates a new response id
fn next_response_id(&self) -> u64 {
self.response_id_generator.next()
Expand Down Expand Up @@ -468,6 +476,22 @@ impl AptosNetDataClient {
.write()
.update_score_error(peer, error_type);
}

/// Creates a storage service request using the given data request
/// and sends it across the network
async fn create_and_send_storage_request<T, E>(
&self,
request_timeout_ms: u64,
data_request: DataRequest,
) -> Result<Response<T>>
where
T: TryFrom<StorageServiceResponse, Error = E>,
E: Into<Error>,
{
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request, request_timeout_ms)
.await
}
}

#[async_trait]
Expand All @@ -486,9 +510,8 @@ impl AptosDataClient for AptosNetDataClient {
start_epoch,
expected_end_epoch,
});
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
let response: Response<EpochChangeProof> = self
.send_request_and_decode(storage_request, request_timeout_ms)
.create_and_send_storage_request(request_timeout_ms, data_request)
.await?;
Ok(response.map(|epoch_change| epoch_change.ledger_info_with_sigs))
}
Expand All @@ -504,8 +527,7 @@ impl AptosDataClient for AptosNetDataClient {
known_version,
known_epoch,
});
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request, request_timeout_ms)
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

Expand All @@ -522,8 +544,26 @@ impl AptosDataClient for AptosNetDataClient {
known_epoch,
include_events,
});
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request, request_timeout_ms)
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

async fn get_new_transactions_or_outputs_with_proof(
&self,
known_version: Version,
known_epoch: Epoch,
include_events: bool,
request_timeout_ms: u64,
) -> Result<Response<(TransactionOrOutputListWithProof, LedgerInfoWithSignatures)>> {
let data_request = DataRequest::GetNewTransactionsOrOutputsWithProof(
NewTransactionsOrOutputsWithProofRequest {
known_version,
known_epoch,
include_events,
max_num_output_reductions: self.get_max_num_output_reductions(),
},
);
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

Expand All @@ -533,8 +573,7 @@ impl AptosDataClient for AptosNetDataClient {
request_timeout_ms: u64,
) -> Result<Response<u64>> {
let data_request = DataRequest::GetNumberOfStatesAtVersion(version);
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request, request_timeout_ms)
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

Expand All @@ -550,8 +589,7 @@ impl AptosDataClient for AptosNetDataClient {
start_index,
end_index,
});
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request, request_timeout_ms)
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

Expand All @@ -568,8 +606,7 @@ impl AptosDataClient for AptosNetDataClient {
start_version,
end_version,
});
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request, request_timeout_ms)
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

Expand All @@ -587,8 +624,27 @@ impl AptosDataClient for AptosNetDataClient {
end_version,
include_events,
});
let storage_request = StorageServiceRequest::new(data_request, self.use_compression());
self.send_request_and_decode(storage_request, request_timeout_ms)
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}

async fn get_transactions_or_outputs_with_proof(
&self,
proof_version: Version,
start_version: Version,
end_version: Version,
include_events: bool,
request_timeout_ms: u64,
) -> Result<Response<TransactionOrOutputListWithProof>> {
let data_request =
DataRequest::GetTransactionsOrOutputsWithProof(TransactionsOrOutputsWithProofRequest {
proof_version,
start_version,
end_version,
include_events,
max_num_output_reductions: self.get_max_num_output_reductions(),
});
self.create_and_send_storage_request(request_timeout_ms, data_request)
.await
}
}
Expand Down
71 changes: 71 additions & 0 deletions state-sync/aptos-data-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use async_trait::async_trait;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::{fmt, fmt::Display};
use storage_service_types::responses::TransactionOrOutputListWithProof;
use storage_service_types::{responses::CompleteDataRange, Epoch};
use thiserror::Error;

Expand Down Expand Up @@ -102,6 +103,18 @@ pub trait AptosDataClient {
request_timeout_ms: u64,
) -> Result<Response<(TransactionListWithProof, LedgerInfoWithSignatures)>>;

/// Fetches a new transaction or output list with proof. Versions start at
/// `known_version + 1` and `known_epoch` (inclusive). The end version
/// and proof version are specified by the server. If the data cannot be
/// fetched, an error is returned.
async fn get_new_transactions_or_outputs_with_proof(
&self,
known_version: Version,
known_epoch: Epoch,
include_events: bool,
request_timeout_ms: u64,
) -> Result<Response<(TransactionOrOutputListWithProof, LedgerInfoWithSignatures)>>;

/// Fetches the number of states at the specified version.
async fn get_number_of_states(
&self,
Expand Down Expand Up @@ -149,6 +162,21 @@ pub trait AptosDataClient {
include_events: bool,
request_timeout_ms: u64,
) -> Result<Response<TransactionListWithProof>>;

/// Fetches a transaction or output list with proof, with data from
/// start to end versions (inclusive). The proof is relative to the
/// specified `proof_version`. If `include_events` is true, events are
/// included in the proof. In some cases, fewer data items may be returned
/// (e.g., to tolerate network or chunk limits). If the data cannot
/// be fetched, an error is returned.
async fn get_transactions_or_outputs_with_proof(
&self,
proof_version: Version,
start_version: Version,
end_version: Version,
include_events: bool,
request_timeout_ms: u64,
) -> Result<Response<TransactionOrOutputListWithProof>>;
}

/// A response error that users of the Aptos Data Client can use to notify
Expand Down Expand Up @@ -271,11 +299,37 @@ impl From<(TransactionListWithProof, LedgerInfoWithSignatures)> for ResponsePayl
}
}

impl TryFrom<(TransactionOrOutputListWithProof, LedgerInfoWithSignatures)> for ResponsePayload {
type Error = Error;

fn try_from(
inner: (TransactionOrOutputListWithProof, LedgerInfoWithSignatures),
) -> Result<Self, Error> {
let ((transaction_list, output_list), ledger_info) = inner;
if let Some(transaction_list) = transaction_list {
Ok(Self::NewTransactionsWithProof((
transaction_list,
ledger_info,
)))
} else if let Some(output_list) = output_list {
Ok(Self::NewTransactionOutputsWithProof((
output_list,
ledger_info,
)))
} else {
Err(Error::InvalidResponse(
"Invalid response! No transaction or output list was returned!".into(),
))
}
}
}

impl From<u64> for ResponsePayload {
fn from(inner: u64) -> Self {
Self::NumberOfStates(inner)
}
}

impl From<TransactionOutputListWithProof> for ResponsePayload {
fn from(inner: TransactionOutputListWithProof) -> Self {
Self::TransactionOutputsWithProof(inner)
Expand All @@ -288,6 +342,23 @@ impl From<TransactionListWithProof> for ResponsePayload {
}
}

impl TryFrom<TransactionOrOutputListWithProof> for ResponsePayload {
type Error = Error;

fn try_from(inner: TransactionOrOutputListWithProof) -> Result<Self, Error> {
let (transaction_list, output_list) = inner;
if let Some(transaction_list) = transaction_list {
Ok(Self::TransactionsWithProof(transaction_list))
} else if let Some(output_list) = output_list {
Ok(Self::TransactionOutputsWithProof(output_list))
} else {
Err(Error::InvalidResponse(
"Invalid response! No transaction or output list was returned!".into(),
))
}
}
}

/// A snapshot of the global state of data available in the Aptos network.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct GlobalDataSummary {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use storage_service_types::requests::{
NewTransactionsWithProofRequest, StateValuesWithProofRequest,
TransactionOutputsWithProofRequest, TransactionsWithProofRequest,
};
use storage_service_types::responses::CompleteDataRange;
use storage_service_types::responses::{CompleteDataRange, TransactionOrOutputListWithProof};
use storage_service_types::Epoch;
use tokio::time::timeout;

Expand Down Expand Up @@ -435,6 +435,18 @@ impl AptosDataClient for MockAptosDataClient {
}
}

async fn get_new_transactions_or_outputs_with_proof(
&self,
_known_version: Version,
_known_epoch: Epoch,
_include_events: bool,
_request_timeout_ms: u64,
) -> aptos_data_client::Result<
Response<(TransactionOrOutputListWithProof, LedgerInfoWithSignatures)>,
> {
todo!() // Implement when we have a client side implementation
}

async fn get_number_of_states(
&self,
version: Version,
Expand Down Expand Up @@ -513,6 +525,17 @@ impl AptosDataClient for MockAptosDataClient {
// Return the transaction list with proofs
Ok(create_data_client_response(transaction_list_with_proof))
}

async fn get_transactions_or_outputs_with_proof(
&self,
_proof_version: Version,
_start_version: Version,
_end_version: Version,
_include_events: bool,
_request_timeout_ms: u64,
) -> aptos_data_client::Result<Response<TransactionOrOutputListWithProof>> {
todo!() // Implement when we have a client side implementation
}
}

#[derive(Debug)]
Expand Down
Loading