diff --git a/api/src/accounts.rs b/api/src/accounts.rs index 1f597ce8552e17..47926689673003 100644 --- a/api/src/accounts.rs +++ b/api/src/accounts.rs @@ -8,8 +8,9 @@ use crate::{ failpoint::fail_point_poem, page::determine_limit, response::{ - account_not_found, resource_not_found, struct_field_not_found, BadRequestError, - BasicErrorWith404, BasicResponse, BasicResponseStatus, BasicResultWith404, InternalError, + account_not_found, resource_not_found, struct_field_not_found, version_not_found, + version_pruned, BadRequestError, BasicErrorWith404, BasicResponse, BasicResponseStatus, + BasicResultWith404, InternalError, NotFoundError, }, ApiTags, }; @@ -22,6 +23,7 @@ use aptos_types::{ account_config::{AccountResource, ObjectGroupResource}, event::{EventHandle, EventKey}, state_store::state_key::StateKey, + transaction::Version, }; use move_core_types::{ identifier::Identifier, language_storage::StructTag, move_resource::MoveStructType, @@ -66,7 +68,7 @@ impl AccountsApi { let context = self.context.clone(); api_spawn_blocking(move || { - let account = Account::new(context, address.0, ledger_version.0, None, None)?; + let account = Account::new(context, address.0, ledger_version.0, None, None, true)?; account.account(&accept_type) }) .await @@ -118,7 +120,19 @@ impl AccountsApi { ledger_version.0, start.0.map(StateKey::from), limit.0, + false, )?; + if let Some(version) = ledger_version.0 { + if account.ledger_version < Version::from(version) { + return Err(BasicErrorWith404::not_found_with_code_no_info( + format!( + "Request version {} larger than latest indexer version {}", + version, account.ledger_version + ), + AptosErrorCode::VersionNotFound, + )); + } + } account.resources(&accept_type) }) .await @@ -170,6 +184,7 @@ impl AccountsApi { ledger_version.0, start.0.map(StateKey::from), limit.0, + false, )?; account.modules(&accept_type) }) @@ -193,25 +208,67 @@ pub struct Account { } impl Account { - /// Creates a new account struct and determines the current ledger info, and determines the - /// ledger version to query pub fn new( context: Arc, address: Address, requested_ledger_version: Option, start: Option, limit: Option, + no_internal_indexer_required: bool, ) -> Result { - // Use the latest ledger version, or the requested associated version - let (latest_ledger_info, requested_ledger_version) = context - .get_latest_ledger_info_and_verify_lookup_version( - requested_ledger_version.map(|inner| inner.0), - )?; + let sharding_enabled = context + .node_config + .storage + .rocksdb_configs + .enable_storage_sharding; + let internal_indexer_enabled = context + .node_config + .indexer_db_config + .is_internal_indexer_db_enabled(); + + let (latest_ledger_info, requested_version) = + if sharding_enabled && !no_internal_indexer_required { + if !internal_indexer_enabled { + return Err(BasicErrorWith404::internal_with_code_no_info( + "Internal indexer is not enabled", + AptosErrorCode::InternalError, + )); + } + let (latest_internal_indexer_ledger_info, latest_internal_indexer_ledger_version) = + context.get_latest_internal_indexer_ledger_version_and_info()?; + if let Some(version) = requested_ledger_version { + let request_ledger_version = Version::from(version); + if latest_internal_indexer_ledger_version < request_ledger_version { + return Err(version_not_found( + request_ledger_version, + &latest_internal_indexer_ledger_info, + )); + } else if request_ledger_version + < latest_internal_indexer_ledger_info.oldest_ledger_version.0 + { + return Err(version_pruned( + request_ledger_version, + &latest_internal_indexer_ledger_info, + )); + } + (latest_internal_indexer_ledger_info, request_ledger_version) + } else { + ( + latest_internal_indexer_ledger_info, + latest_internal_indexer_ledger_version, + ) + } + } else { + // Use the latest ledger version, or the requested associated version + context.get_latest_ledger_info_and_verify_lookup_version( + requested_ledger_version.map(|inner| inner.0), + )? + }; Ok(Self { context, address, - ledger_version: requested_ledger_version, + ledger_version: requested_version, start, limit, latest_ledger_info, diff --git a/api/src/context.rs b/api/src/context.rs index c91a3cf42bc3f5..c915767bcdffa8 100644 --- a/api/src/context.rs +++ b/api/src/context.rs @@ -277,6 +277,26 @@ impl Context { Ok((latest_ledger_info, requested_ledger_version)) } + pub fn get_latest_internal_indexer_ledger_version_and_info( + &self, + ) -> Result<(LedgerInfo, Version), E> { + if self.indexer_reader.is_none() { + return Err(E::internal_with_code_no_info( + "Indexer reader doesn't exist", + AptosErrorCode::InternalError, + )); + } + let latest_version = self + .indexer_reader + .as_ref() + .unwrap() + .get_latest_internal_indexer_ledger_version() + .map_err(|err| E::internal_with_code_no_info(err, AptosErrorCode::InternalError))?; + let mut latest_ledger_info = self.get_latest_ledger_info()?; + latest_ledger_info.ledger_version = aptos_api_types::U64(latest_version); + Ok((latest_ledger_info, latest_version)) + } + pub fn get_latest_ledger_info_with_signatures(&self) -> Result { Ok(self.db.get_latest_ledger_info()?) } diff --git a/api/src/events.rs b/api/src/events.rs index 49c4fad21ce9f2..c9be5dcc95d65c 100644 --- a/api/src/events.rs +++ b/api/src/events.rs @@ -77,7 +77,7 @@ impl EventsApi { // Ensure that account exists let api = self.clone(); api_spawn_blocking(move || { - let account = Account::new(api.context.clone(), address.0, None, None, None)?; + let account = Account::new(api.context.clone(), address.0, None, None, None, false)?; account.verify_account_or_object_resource()?; api.list( account.latest_ledger_info, @@ -144,7 +144,7 @@ impl EventsApi { let api = self.clone(); api_spawn_blocking(move || { - let account = Account::new(api.context.clone(), address.0, None, None, None)?; + let account = Account::new(api.context.clone(), address.0, None, None, None, false)?; let key = account.find_event_key(event_handle.0, field_name.0.into())?; api.list(account.latest_ledger_info, accept_type, page, key) }) diff --git a/api/src/tests/accounts_test.rs b/api/src/tests/accounts_test.rs index 55b40ddef8967e..93634efa1fc6c4 100644 --- a/api/src/tests/accounts_test.rs +++ b/api/src/tests/accounts_test.rs @@ -148,10 +148,11 @@ async fn test_get_account_resources_by_ledger_version() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_get_account_resources_by_too_large_ledger_version() { let mut context = new_test_context(current_function_name!()); + let account = context.root_account().await; let resp = context .expect_status_code(404) .get(&account_resources_with_ledger_version( - &context.root_account().await.address().to_hex_literal(), + &account.address().to_hex_literal(), 1000000000000000000, )) .await; diff --git a/api/src/transactions.rs b/api/src/transactions.rs index b7d2d75f0290f4..1e1139daf81321 100644 --- a/api/src/transactions.rs +++ b/api/src/transactions.rs @@ -986,7 +986,7 @@ impl TransactionsApi { address: Address, ) -> BasicResultWith404> { // Verify the account exists - let account = Account::new(self.context.clone(), address, None, None, None)?; + let account = Account::new(self.context.clone(), address, None, None, None, false)?; account.get_account_resource()?; let latest_ledger_info = account.latest_ledger_info; diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs index 0f430543f81837..7066fdf8b16180 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs @@ -8,7 +8,7 @@ use aptos_db_indexer::{ use aptos_indexer_grpc_utils::counters::{log_grpc_step, IndexerGrpcStep}; use aptos_schemadb::DB; use aptos_storage_interface::DbReader; -use aptos_types::indexer::indexer_db_reader::IndexerReader; +use aptos_types::{indexer::indexer_db_reader::IndexerReader, transaction::Version}; use std::sync::Arc; use tokio::runtime::Handle; @@ -59,8 +59,53 @@ impl InternalIndexerDBService { Arc::clone(&self.db_indexer) } - pub async fn run(&mut self) { - let mut start_version = self.db_indexer.get_persisted_version().unwrap_or(0); + pub fn get_start_version(&self, node_config: &NodeConfig) -> Version { + let indexer_version = self + .db_indexer + .get_persisted_version() + .unwrap_or_else(|err| { + panic!( + "Failed to get persisted version from internal indexer db: {}", + err + ) + }); + let fast_sync_enabled = node_config + .state_sync + .state_sync_driver + .bootstrapping_mode + .is_fast_sync(); + let mut db_min_version = self + .db_indexer + .get_main_db_lowest_viable_version() + .unwrap_or_else(|err| panic!("Failed to get main db min viable version: {}", err)); + + // Wait till fast sync is done + while fast_sync_enabled && db_min_version == 0 { + db_min_version = self + .db_indexer + .get_main_db_lowest_viable_version() + .unwrap_or_else(|err| panic!("Failed to get main db min viable version: {}", err)); + std::thread::sleep(std::time::Duration::from_millis(100)); + } + let fast_sync_version_opt: Option = self.db_indexer.get_fast_sync_version(); + if indexer_version >= db_min_version { + indexer_version + } else if node_config.indexer_db_config.enable_statekeys() { + // if fast sync fill to the min viable version it is Ok, otherwise panic + if let Some(fast_sync_version) = fast_sync_version_opt { + if fast_sync_version >= db_min_version { + return db_min_version; + } + } + panic!("Internal indexer version is lower than main db min viable version. Internal state keys indexer cannot catch up"); + } else { + db_min_version + } + } + + pub async fn run(&mut self, node_config: &NodeConfig) { + let mut start_version = self.get_start_version(node_config); + loop { let start_time: std::time::Instant = std::time::Instant::now(); let next_version = self @@ -111,8 +156,9 @@ impl MockInternalIndexerDBService { let mut internal_indexer_db_service = InternalIndexerDBService::new(db_reader, node_config, db); let db_indexer = internal_indexer_db_service.get_db_indexer(); + let config_clone = node_config.to_owned(); handle.spawn(async move { - internal_indexer_db_service.run().await; + internal_indexer_db_service.run(&config_clone).await; }); Self { indexer_readers: IndexerReaders::new(None, Some(db_indexer)), diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs index 5f70542177f3f7..ed0d863502635e 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs @@ -30,8 +30,9 @@ pub fn bootstrap_internal_indexer_db( InternalIndexerDBService::new(db_rw.reader, config, internal_indexer_db.unwrap()); let db_indexer = indexer_service.get_db_indexer(); // Spawn task for db indexer + let config_clone = config.to_owned(); runtime.spawn(async move { - indexer_service.run().await; + indexer_service.run(&config_clone).await; }); Some((runtime, db_indexer)) diff --git a/state-sync/state-sync-driver/src/storage_synchronizer.rs b/state-sync/state-sync-driver/src/storage_synchronizer.rs index 21b0d6552534e1..470225adc9cfd3 100644 --- a/state-sync/state-sync-driver/src/storage_synchronizer.rs +++ b/state-sync/state-sync-driver/src/storage_synchronizer.rs @@ -15,7 +15,10 @@ use crate::{ }; use aptos_config::config::StateSyncDriverConfig; use aptos_data_streaming_service::data_notification::NotificationId; -use aptos_db_indexer_schemas::schema::state_keys::StateKeysSchema; +use aptos_db_indexer_schemas::{ + metadata::{MetadataKey, MetadataValue}, + schema::{indexer_metadata::InternalIndexerMetadataSchema, state_keys::StateKeysSchema}, +}; use aptos_event_notifications::EventSubscriptionService; use aptos_executor_types::{ChunkCommitNotification, ChunkExecutorTrait}; use aptos_infallible::Mutex; @@ -799,6 +802,8 @@ fn spawn_commit_post_processor< fn write_kv_to_indexer_db( internal_indexer_db: &Option>, kvs: &Vec<(StateKey, StateValue)>, + snapshot_version: Version, + write_version: bool, ) -> aptos_storage_interface::Result<()> { // add state value to internal indexer if let Some(indexer_db) = internal_indexer_db.as_ref() { @@ -806,6 +811,12 @@ fn write_kv_to_indexer_db( for (state_key, _) in kvs { batch.put::(state_key, &())?; } + if write_version { + batch.put::( + &MetadataKey::FastSyncVersion, + &MetadataValue::Version(snapshot_version), + )?; + } indexer_db.write_schemas(batch)?; } Ok(()) @@ -861,8 +872,12 @@ fn spawn_state_snapshot_receiver< let all_states_synced = states_with_proof.is_last_chunk(); let last_committed_state_index = states_with_proof.last_index; let num_state_values = states_with_proof.raw_values.len(); - let indexer_results: Result<(), AptosDbError> = - write_kv_to_indexer_db(&internal_indexer_db, &states_with_proof.raw_values); + let indexer_results: Result<(), AptosDbError> = write_kv_to_indexer_db( + &internal_indexer_db, + &states_with_proof.raw_values, + version, + all_states_synced, + ); let result = state_snapshot_receiver.add_chunk( states_with_proof.raw_values, diff --git a/storage/indexer/src/db_indexer.rs b/storage/indexer/src/db_indexer.rs index 94c29ca553e4ae..7e5c26ce8f3d85 100644 --- a/storage/indexer/src/db_indexer.rs +++ b/storage/indexer/src/db_indexer.rs @@ -111,6 +111,13 @@ impl DBIndexer { } } + pub fn get_main_db_lowest_viable_version(&self) -> Result { + self.main_db_reader + .get_first_txn_version() + .transpose() + .expect("main db lowest viable version doesn't exist") + } + pub fn ensure_cover_ledger_version(&self, ledger_version: Version) -> Result<()> { let indexer_latest_version = self.get_persisted_version()?; ensure!( @@ -458,4 +465,19 @@ impl DBIndexer { ledger_version, ) } + + pub fn get_fast_sync_version(&self) -> Option { + // read the latest key from the db + match self + .db + .get::(&MetadataKey::FastSyncVersion) + { + Ok(Some(metavalue)) => Some(metavalue.expect_version()), + Ok(None) => None, + Err(e) => panic!( + "Failed to get fast sync version from internal indexer db: {}", + e + ), + } + } } diff --git a/storage/indexer/src/indexer_reader.rs b/storage/indexer/src/indexer_reader.rs index ad032ee88dc7dd..e8c8e5237c11c8 100644 --- a/storage/indexer/src/indexer_reader.rs +++ b/storage/indexer/src/indexer_reader.rs @@ -47,6 +47,13 @@ impl IndexerReader for IndexerReaders { anyhow::bail!("Table info reader is not available") } + fn get_latest_internal_indexer_ledger_version(&self) -> anyhow::Result { + if let Some(db_indexer) = &self.db_indexer_reader { + return Ok(db_indexer.get_persisted_version()?); + } + anyhow::bail!("DB indexer reader is not available") + } + fn get_events( &self, event_key: &EventKey, diff --git a/storage/indexer_schemas/src/metadata.rs b/storage/indexer_schemas/src/metadata.rs index f0af9f42b51b61..4fdad2dfc4c412 100644 --- a/storage/indexer_schemas/src/metadata.rs +++ b/storage/indexer_schemas/src/metadata.rs @@ -18,8 +18,9 @@ impl MetadataValue { } } -#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize, Hash, PartialOrd, Ord)] #[cfg_attr(any(test, feature = "fuzzing"), derive(proptest_derive::Arbitrary))] pub enum MetadataKey { LatestVersion, + FastSyncVersion, } diff --git a/testsuite/smoke-test/src/fullnode.rs b/testsuite/smoke-test/src/fullnode.rs index 35b22ec579ab1c..d0e83fb6c08afa 100644 --- a/testsuite/smoke-test/src/fullnode.rs +++ b/testsuite/smoke-test/src/fullnode.rs @@ -10,18 +10,15 @@ use crate::{ use anyhow::bail; use aptos_cached_packages::aptos_stdlib; use aptos_config::config::{BootstrappingMode, NodeConfig, OverrideNodeConfig}; -use aptos_db::AptosDB; -use aptos_db_indexer_schemas::schema::state_keys::StateKeysSchema; +use aptos_db_indexer_schemas::{ + metadata::MetadataKey, + schema::{indexer_metadata::InternalIndexerMetadataSchema, state_keys::StateKeysSchema}, +}; use aptos_forge::{NodeExt, Result, Swarm, SwarmExt}; use aptos_indexer_grpc_table_info::internal_indexer_db_service::InternalIndexerDBService; use aptos_rest_client::Client as RestClient; use aptos_schemadb::DB; -use aptos_storage_interface::DbReader; -use aptos_types::{ - account_address::AccountAddress, - state_store::state_key::{prefix::StateKeyPrefix, StateKey}, - transaction::Version, -}; +use aptos_types::{account_address::AccountAddress, state_store::state_key::StateKey}; use std::{ collections::HashSet, sync::Arc, @@ -104,6 +101,7 @@ async fn wait_for_account(client: &RestClient, address: AccountAddress) -> Resul } fn enable_internal_indexer(node_config: &mut NodeConfig) { + node_config.storage.rocksdb_configs.enable_storage_sharding = true; node_config.indexer_db_config.enable_event = true; node_config.indexer_db_config.enable_transaction = true; node_config.indexer_db_config.enable_statekeys = true; @@ -153,30 +151,14 @@ async fn test_internal_indexer_with_fast_sync() { } fn check_indexer_db(vfn_config: &NodeConfig) { - let aptos_db_dir = vfn_config - .storage - .get_dir_paths() - .default_root_path() - .to_owned(); - let path = aptos_db_dir.as_path(); - let aptos_db = AptosDB::new_for_test(path); - let internal_indexer_db = InternalIndexerDBService::get_indexer_db(vfn_config).unwrap(); - let prefix = StateKeyPrefix::from(AccountAddress::from_hex_literal("0x1").unwrap()); - let main_db_iter = aptos_db - .get_prefixed_state_value_iterator(&prefix, None, Version::MAX) - .unwrap(); - let main_db_keys: HashSet = main_db_iter.map(|iter| iter.unwrap().0).collect(); let indexer_keys: HashSet = get_indexer_db_content::(internal_indexer_db.clone()); - println!( - "Total state keys: {}, {}", - main_db_keys.len(), - indexer_keys.len() + let meta_keys = get_indexer_db_content::( + internal_indexer_db.clone(), ); - assert!(!main_db_keys.is_empty()); - // 0x1 statekeys are synced and is subset of indexer statekeys - assert!(main_db_keys.is_subset(&indexer_keys)); + assert!(meta_keys.contains(&MetadataKey::FastSyncVersion)); + assert!(!indexer_keys.is_empty()); } fn get_indexer_db_content(internal_indexer_db: Arc) -> HashSet diff --git a/types/src/indexer/indexer_db_reader.rs b/types/src/indexer/indexer_db_reader.rs index b98ca4dcbc1dc7..3c8f7b79536d34 100644 --- a/types/src/indexer/indexer_db_reader.rs +++ b/types/src/indexer/indexer_db_reader.rs @@ -56,4 +56,6 @@ pub trait IndexerReader: Send + Sync { cursor: Option<&StateKey>, version: Version, ) -> Result> + '_>>; + + fn get_latest_internal_indexer_ledger_version(&self) -> Result; }