Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sharded] use internal indexer ledger version in API context and add state indices restore #13883

Merged
merged 3 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 20 additions & 8 deletions api/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl AccountsApi {

let context = self.context.clone();
api_spawn_blocking(move || {
let account = Account::new(context, address.0, ledger_version.0, None, None)?;
let account = Account::new(context, address.0, ledger_version.0, None, None, false)?;
account.account(&accept_type)
})
.await
Expand Down Expand Up @@ -118,6 +118,7 @@ impl AccountsApi {
ledger_version.0,
start.0.map(StateKey::from),
limit.0,
true,
)?;
account.resources(&accept_type)
})
Expand Down Expand Up @@ -170,6 +171,7 @@ impl AccountsApi {
ledger_version.0,
start.0.map(StateKey::from),
limit.0,
true,
)?;
account.modules(&accept_type)
})
Expand All @@ -193,25 +195,35 @@ pub struct Account {
}

impl Account {
/// Creates a new account struct and determines the current ledger info, and determines the
/// ledger version to query
pub fn new(
context: Arc<Context>,
address: Address,
requested_ledger_version: Option<U64>,
start: Option<StateKey>,
limit: Option<u16>,
require_state_indices: bool,
) -> Result<Self, BasicErrorWith404> {
// Use the latest ledger version, or the requested associated version
let (latest_ledger_info, requested_ledger_version) = context
.get_latest_ledger_info_and_verify_lookup_version(
let sharding_enabled = context
.node_config
.storage
.rocksdb_configs
.enable_storage_sharding;

let (latest_ledger_info, requested_version) = if sharding_enabled && require_state_indices {
context.get_latest_ledger_info_and_verify_internal_indexer_lookup_version(
requested_ledger_version.map(|inner| inner.0),
)?;
)?
} else {
// Use the latest ledger version, or the requested associated version
context.get_latest_ledger_info_and_verify_lookup_version(
requested_ledger_version.map(|inner| inner.0),
)?
};

Ok(Self {
context,
address,
ledger_version: requested_ledger_version,
ledger_version: requested_version,
start,
limit,
latest_ledger_info,
Expand Down
48 changes: 48 additions & 0 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,35 @@ impl Context {
))
}

pub fn get_latest_ledger_info_and_verify_internal_indexer_lookup_version<E: StdApiError>(
&self,
requested_ledger_version: Option<Version>,
) -> Result<(LedgerInfo, Version), E> {
if self.indexer_reader.is_none() {
return Err(E::internal_with_code_no_info(
"Indexer reader doesn't exist",
AptosErrorCode::InternalError,
));
}

let (latest_ledger_info, latest_internal_indexer_ledger_version) =
self.get_latest_internal_indexer_ledger_version_and_main_db_info()?;
if let Some(version) = requested_ledger_version {
let request_ledger_version = Version::from(version);
if latest_internal_indexer_ledger_version < request_ledger_version {
return Err(version_not_found(
request_ledger_version,
&latest_ledger_info,
));
} else if request_ledger_version < latest_ledger_info.oldest_ledger_version.0 {
return Err(version_pruned(request_ledger_version, &latest_ledger_info));
}
Ok((latest_ledger_info, request_ledger_version))
} else {
Ok((latest_ledger_info, latest_internal_indexer_ledger_version))
}
}

pub fn get_latest_ledger_info_and_verify_lookup_version<E: StdApiError>(
&self,
requested_ledger_version: Option<Version>,
Expand All @@ -277,6 +306,25 @@ impl Context {
Ok((latest_ledger_info, requested_ledger_version))
}

pub fn get_latest_internal_indexer_ledger_version_and_main_db_info<E: StdApiError>(
&self,
) -> Result<(LedgerInfo, Version), E> {
if let Some(indexer_reader) = self.indexer_reader.as_ref() {
if let Some(latest_version) = indexer_reader
.get_latest_internal_indexer_ledger_version()
.map_err(|err| E::internal_with_code_no_info(err, AptosErrorCode::InternalError))?
{
let latest_ledger_info = self.get_latest_ledger_info()?;
return Ok((latest_ledger_info, latest_version));
}
}

Err(E::internal_with_code_no_info(
"Indexer reader doesn't exist, or doesn't have data.",
AptosErrorCode::InternalError,
))
}

pub fn get_latest_ledger_info_with_signatures(&self) -> Result<LedgerInfoWithSignatures> {
Ok(self.db.get_latest_ledger_info()?)
}
Expand Down
4 changes: 2 additions & 2 deletions api/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl EventsApi {
// Ensure that account exists
let api = self.clone();
api_spawn_blocking(move || {
let account = Account::new(api.context.clone(), address.0, None, None, None)?;
let account = Account::new(api.context.clone(), address.0, None, None, None, true)?;
account.verify_account_or_object_resource()?;
api.list(
account.latest_ledger_info,
Expand Down Expand Up @@ -144,7 +144,7 @@ impl EventsApi {

let api = self.clone();
api_spawn_blocking(move || {
let account = Account::new(api.context.clone(), address.0, None, None, None)?;
let account = Account::new(api.context.clone(), address.0, None, None, None, true)?;
let key = account.find_event_key(event_handle.0, field_name.0.into())?;
api.list(account.latest_ledger_info, accept_type, page, key)
})
Expand Down
9 changes: 7 additions & 2 deletions api/src/tests/accounts_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use aptos_api_test_context::{current_function_name, find_value, TestContext};
use aptos_api_types::{MoveModuleBytecode, MoveResource, MoveStructTag, StateKeyWrapper};
use aptos_cached_packages::aptos_stdlib;
use serde_json::json;
use std::str::FromStr;
use std::{str::FromStr, time::Duration};

/* TODO: reactivate once cause of failure for `"8"` vs `8` in the JSON output is known.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand Down Expand Up @@ -114,6 +114,8 @@ async fn test_account_resources_by_ledger_version_with_context(mut context: Test
let txn = context.create_user_account(&account).await;
context.commit_block(&vec![txn.clone()]).await;

tokio::time::sleep(Duration::from_millis(200)).await;

let ledger_version_1_resources = context
.get(&account_resources(
&context.root_account().await.address().to_hex_literal(),
Expand Down Expand Up @@ -148,10 +150,11 @@ async fn test_get_account_resources_by_ledger_version() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_get_account_resources_by_too_large_ledger_version() {
let mut context = new_test_context(current_function_name!());
let account = context.root_account().await;
let resp = context
.expect_status_code(404)
.get(&account_resources_with_ledger_version(
&context.root_account().await.address().to_hex_literal(),
&account.address().to_hex_literal(),
1000000000000000000,
))
.await;
Expand Down Expand Up @@ -180,6 +183,8 @@ async fn test_get_account_modules_by_ledger_version_with_context(mut context: Te
root_account.sign_with_transaction_builder(context.transaction_factory().payload(payload));
context.commit_block(&vec![txn.clone()]).await;

tokio::time::sleep(Duration::from_millis(200)).await;

let modules = context
.get(&account_modules(
&context.root_account().await.address().to_hex_literal(),
Expand Down
2 changes: 1 addition & 1 deletion api/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ impl TransactionsApi {
address: Address,
) -> BasicResultWith404<Vec<Transaction>> {
// Verify the account exists
let account = Account::new(self.context.clone(), address, None, None, None)?;
let account = Account::new(self.context.clone(), address, None, None, None, true)?;
account.get_account_resource()?;

let latest_ledger_info = account.latest_ledger_info;
Expand Down
1 change: 0 additions & 1 deletion aptos-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ aptos-peer-monitoring-service-server = { workspace = true }
aptos-peer-monitoring-service-types = { workspace = true }
aptos-runtimes = { workspace = true }
aptos-safety-rules = { workspace = true }
aptos-schemadb = { workspace = true }
aptos-state-sync-driver = { workspace = true }
aptos-storage-interface = { workspace = true }
aptos-storage-service-client = { workspace = true }
Expand Down
3 changes: 0 additions & 3 deletions aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,9 +672,6 @@ pub fn setup_environment_and_start_node(
genesis_waypoint,
event_subscription_service,
db_rw.clone(),
indexer_db_opt
.as_ref()
.map(|inner| inner.get_inner_db_clone()),
)?;

// Start the node inspection service
Expand Down
3 changes: 0 additions & 3 deletions aptos-node/src/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use aptos_network::application::{
interface::{NetworkClient, NetworkClientInterface, NetworkServiceEvents},
storage::PeersAndMetadata,
};
use aptos_schemadb::DB;
use aptos_state_sync_driver::{
driver_factory::{DriverFactory, StateSyncRuntimes},
metadata_storage::PersistentMetadataStorage,
Expand Down Expand Up @@ -132,7 +131,6 @@ pub fn start_state_sync_and_get_notification_handles(
waypoint: Waypoint,
event_subscription_service: EventSubscriptionService,
db_rw: DbReaderWriter,
internal_indexer_db: Option<Arc<DB>>,
) -> anyhow::Result<(
AptosDataClient,
StateSyncRuntimes,
Expand Down Expand Up @@ -197,7 +195,6 @@ pub fn start_state_sync_and_get_notification_handles(
aptos_data_client.clone(),
streaming_service_client,
TimeService::real(),
internal_indexer_db,
);

// Create a new state sync runtime handle
Expand Down
1 change: 0 additions & 1 deletion aptos-node/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ pub(crate) fn bootstrap_db(
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
};
//
Ok((aptos_db_reader, db_rw, backup_service, internal_indexer_db))
}

Expand Down
Loading
Loading