Skip to content

Commit

Permalink
[sharded] use internal indexer ledger version in API context and add …
Browse files Browse the repository at this point in the history
…state indices restore (#13883)

* [sharded] use internal indexer ledger version in API context

* add restore state indices

* Fix progress.

---------

Co-authored-by: Guoteng Rao <[email protected]>
  • Loading branch information
areshand and grao1991 authored Jul 25, 2024
1 parent 8e75c7c commit 589066d
Show file tree
Hide file tree
Showing 48 changed files with 486 additions and 183 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 20 additions & 8 deletions api/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl AccountsApi {

let context = self.context.clone();
api_spawn_blocking(move || {
let account = Account::new(context, address.0, ledger_version.0, None, None)?;
let account = Account::new(context, address.0, ledger_version.0, None, None, false)?;
account.account(&accept_type)
})
.await
Expand Down Expand Up @@ -118,6 +118,7 @@ impl AccountsApi {
ledger_version.0,
start.0.map(StateKey::from),
limit.0,
true,
)?;
account.resources(&accept_type)
})
Expand Down Expand Up @@ -170,6 +171,7 @@ impl AccountsApi {
ledger_version.0,
start.0.map(StateKey::from),
limit.0,
true,
)?;
account.modules(&accept_type)
})
Expand All @@ -193,25 +195,35 @@ pub struct Account {
}

impl Account {
/// Creates a new account struct and determines the current ledger info, and determines the
/// ledger version to query
pub fn new(
context: Arc<Context>,
address: Address,
requested_ledger_version: Option<U64>,
start: Option<StateKey>,
limit: Option<u16>,
require_state_indices: bool,
) -> Result<Self, BasicErrorWith404> {
// Use the latest ledger version, or the requested associated version
let (latest_ledger_info, requested_ledger_version) = context
.get_latest_ledger_info_and_verify_lookup_version(
let sharding_enabled = context
.node_config
.storage
.rocksdb_configs
.enable_storage_sharding;

let (latest_ledger_info, requested_version) = if sharding_enabled && require_state_indices {
context.get_latest_ledger_info_and_verify_internal_indexer_lookup_version(
requested_ledger_version.map(|inner| inner.0),
)?;
)?
} else {
// Use the latest ledger version, or the requested associated version
context.get_latest_ledger_info_and_verify_lookup_version(
requested_ledger_version.map(|inner| inner.0),
)?
};

Ok(Self {
context,
address,
ledger_version: requested_ledger_version,
ledger_version: requested_version,
start,
limit,
latest_ledger_info,
Expand Down
48 changes: 48 additions & 0 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,35 @@ impl Context {
))
}

pub fn get_latest_ledger_info_and_verify_internal_indexer_lookup_version<E: StdApiError>(
&self,
requested_ledger_version: Option<Version>,
) -> Result<(LedgerInfo, Version), E> {
if self.indexer_reader.is_none() {
return Err(E::internal_with_code_no_info(
"Indexer reader doesn't exist",
AptosErrorCode::InternalError,
));
}

let (latest_ledger_info, latest_internal_indexer_ledger_version) =
self.get_latest_internal_indexer_ledger_version_and_main_db_info()?;
if let Some(version) = requested_ledger_version {
let request_ledger_version = Version::from(version);
if latest_internal_indexer_ledger_version < request_ledger_version {
return Err(version_not_found(
request_ledger_version,
&latest_ledger_info,
));
} else if request_ledger_version < latest_ledger_info.oldest_ledger_version.0 {
return Err(version_pruned(request_ledger_version, &latest_ledger_info));
}
Ok((latest_ledger_info, request_ledger_version))
} else {
Ok((latest_ledger_info, latest_internal_indexer_ledger_version))
}
}

pub fn get_latest_ledger_info_and_verify_lookup_version<E: StdApiError>(
&self,
requested_ledger_version: Option<Version>,
Expand All @@ -277,6 +306,25 @@ impl Context {
Ok((latest_ledger_info, requested_ledger_version))
}

pub fn get_latest_internal_indexer_ledger_version_and_main_db_info<E: StdApiError>(
&self,
) -> Result<(LedgerInfo, Version), E> {
if let Some(indexer_reader) = self.indexer_reader.as_ref() {
if let Some(latest_version) = indexer_reader
.get_latest_internal_indexer_ledger_version()
.map_err(|err| E::internal_with_code_no_info(err, AptosErrorCode::InternalError))?
{
let latest_ledger_info = self.get_latest_ledger_info()?;
return Ok((latest_ledger_info, latest_version));
}
}

Err(E::internal_with_code_no_info(
"Indexer reader doesn't exist, or doesn't have data.",
AptosErrorCode::InternalError,
))
}

pub fn get_latest_ledger_info_with_signatures(&self) -> Result<LedgerInfoWithSignatures> {
Ok(self.db.get_latest_ledger_info()?)
}
Expand Down
4 changes: 2 additions & 2 deletions api/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl EventsApi {
// Ensure that account exists
let api = self.clone();
api_spawn_blocking(move || {
let account = Account::new(api.context.clone(), address.0, None, None, None)?;
let account = Account::new(api.context.clone(), address.0, None, None, None, true)?;
account.verify_account_or_object_resource()?;
api.list(
account.latest_ledger_info,
Expand Down Expand Up @@ -144,7 +144,7 @@ impl EventsApi {

let api = self.clone();
api_spawn_blocking(move || {
let account = Account::new(api.context.clone(), address.0, None, None, None)?;
let account = Account::new(api.context.clone(), address.0, None, None, None, true)?;
let key = account.find_event_key(event_handle.0, field_name.0.into())?;
api.list(account.latest_ledger_info, accept_type, page, key)
})
Expand Down
9 changes: 7 additions & 2 deletions api/src/tests/accounts_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use aptos_api_test_context::{current_function_name, find_value, TestContext};
use aptos_api_types::{MoveModuleBytecode, MoveResource, MoveStructTag, StateKeyWrapper};
use aptos_cached_packages::aptos_stdlib;
use serde_json::json;
use std::str::FromStr;
use std::{str::FromStr, time::Duration};

/* TODO: reactivate once cause of failure for `"8"` vs `8` in the JSON output is known.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand Down Expand Up @@ -114,6 +114,8 @@ async fn test_account_resources_by_ledger_version_with_context(mut context: Test
let txn = context.create_user_account(&account).await;
context.commit_block(&vec![txn.clone()]).await;

tokio::time::sleep(Duration::from_millis(200)).await;

let ledger_version_1_resources = context
.get(&account_resources(
&context.root_account().await.address().to_hex_literal(),
Expand Down Expand Up @@ -148,10 +150,11 @@ async fn test_get_account_resources_by_ledger_version() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_get_account_resources_by_too_large_ledger_version() {
let mut context = new_test_context(current_function_name!());
let account = context.root_account().await;
let resp = context
.expect_status_code(404)
.get(&account_resources_with_ledger_version(
&context.root_account().await.address().to_hex_literal(),
&account.address().to_hex_literal(),
1000000000000000000,
))
.await;
Expand Down Expand Up @@ -180,6 +183,8 @@ async fn test_get_account_modules_by_ledger_version_with_context(mut context: Te
root_account.sign_with_transaction_builder(context.transaction_factory().payload(payload));
context.commit_block(&vec![txn.clone()]).await;

tokio::time::sleep(Duration::from_millis(200)).await;

let modules = context
.get(&account_modules(
&context.root_account().await.address().to_hex_literal(),
Expand Down
2 changes: 1 addition & 1 deletion api/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ impl TransactionsApi {
address: Address,
) -> BasicResultWith404<Vec<Transaction>> {
// Verify the account exists
let account = Account::new(self.context.clone(), address, None, None, None)?;
let account = Account::new(self.context.clone(), address, None, None, None, true)?;
account.get_account_resource()?;

let latest_ledger_info = account.latest_ledger_info;
Expand Down
1 change: 0 additions & 1 deletion aptos-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ aptos-peer-monitoring-service-server = { workspace = true }
aptos-peer-monitoring-service-types = { workspace = true }
aptos-runtimes = { workspace = true }
aptos-safety-rules = { workspace = true }
aptos-schemadb = { workspace = true }
aptos-state-sync-driver = { workspace = true }
aptos-storage-interface = { workspace = true }
aptos-storage-service-client = { workspace = true }
Expand Down
3 changes: 0 additions & 3 deletions aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,9 +672,6 @@ pub fn setup_environment_and_start_node(
genesis_waypoint,
event_subscription_service,
db_rw.clone(),
indexer_db_opt
.as_ref()
.map(|inner| inner.get_inner_db_clone()),
)?;

// Start the node inspection service
Expand Down
3 changes: 0 additions & 3 deletions aptos-node/src/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use aptos_network::application::{
interface::{NetworkClient, NetworkClientInterface, NetworkServiceEvents},
storage::PeersAndMetadata,
};
use aptos_schemadb::DB;
use aptos_state_sync_driver::{
driver_factory::{DriverFactory, StateSyncRuntimes},
metadata_storage::PersistentMetadataStorage,
Expand Down Expand Up @@ -132,7 +131,6 @@ pub fn start_state_sync_and_get_notification_handles(
waypoint: Waypoint,
event_subscription_service: EventSubscriptionService,
db_rw: DbReaderWriter,
internal_indexer_db: Option<Arc<DB>>,
) -> anyhow::Result<(
AptosDataClient,
StateSyncRuntimes,
Expand Down Expand Up @@ -197,7 +195,6 @@ pub fn start_state_sync_and_get_notification_handles(
aptos_data_client.clone(),
streaming_service_client,
TimeService::real(),
internal_indexer_db,
);

// Create a new state sync runtime handle
Expand Down
1 change: 0 additions & 1 deletion aptos-node/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ pub(crate) fn bootstrap_db(
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
};
//
Ok((aptos_db_reader, db_rw, backup_service, internal_indexer_db))
}

Expand Down
Loading

0 comments on commit 589066d

Please sign in to comment.