diff --git a/Cargo.lock b/Cargo.lock index e62dcf742b86e5..1efaca7cc9d301 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3683,7 +3683,6 @@ dependencies = [ "aptos-data-client", "aptos-data-streaming-service", "aptos-db", - "aptos-db-indexer", "aptos-event-notifications", "aptos-executor", "aptos-executor-test-helpers", diff --git a/api/src/context.rs b/api/src/context.rs index f9897a7e7ee222..aa9e59848544f4 100644 --- a/api/src/context.rs +++ b/api/src/context.rs @@ -309,20 +309,20 @@ impl Context { pub fn get_latest_internal_indexer_ledger_version_and_main_db_info( &self, ) -> Result<(LedgerInfo, Version), E> { - if self.indexer_reader.is_none() { - return Err(E::internal_with_code_no_info( - "Indexer reader doesn't exist", - AptosErrorCode::InternalError, - )); + if let Some(indexer_reader) = self.indexer_reader.as_ref() { + if let Some(latest_version) = indexer_reader + .get_latest_internal_indexer_ledger_version() + .map_err(|err| E::internal_with_code_no_info(err, AptosErrorCode::InternalError))? + { + let latest_ledger_info = self.get_latest_ledger_info()?; + return Ok((latest_ledger_info, latest_version)); + } } - let latest_version = self - .indexer_reader - .as_ref() - .unwrap() - .get_latest_internal_indexer_ledger_version() - .map_err(|err| E::internal_with_code_no_info(err, AptosErrorCode::InternalError))?; - let latest_ledger_info = self.get_latest_ledger_info()?; - Ok((latest_ledger_info, latest_version)) + + Err(E::internal_with_code_no_info( + "Indexer reader doesn't exist, or doesn't have data.", + AptosErrorCode::InternalError, + )) } pub fn get_latest_ledger_info_with_signatures(&self) -> Result { diff --git a/api/src/tests/accounts_test.rs b/api/src/tests/accounts_test.rs index 93634efa1fc6c4..021fa549f3b359 100644 --- a/api/src/tests/accounts_test.rs +++ b/api/src/tests/accounts_test.rs @@ -8,7 +8,7 @@ use aptos_api_test_context::{current_function_name, find_value, TestContext}; use aptos_api_types::{MoveModuleBytecode, MoveResource, MoveStructTag, StateKeyWrapper}; use aptos_cached_packages::aptos_stdlib; use serde_json::json; -use std::str::FromStr; +use std::{str::FromStr, time::Duration}; /* TODO: reactivate once cause of failure for `"8"` vs `8` in the JSON output is known. #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -114,6 +114,8 @@ async fn test_account_resources_by_ledger_version_with_context(mut context: Test let txn = context.create_user_account(&account).await; context.commit_block(&vec![txn.clone()]).await; + tokio::time::sleep(Duration::from_millis(200)).await; + let ledger_version_1_resources = context .get(&account_resources( &context.root_account().await.address().to_hex_literal(), @@ -181,6 +183,8 @@ async fn test_get_account_modules_by_ledger_version_with_context(mut context: Te root_account.sign_with_transaction_builder(context.transaction_factory().payload(payload)); context.commit_block(&vec![txn.clone()]).await; + tokio::time::sleep(Duration::from_millis(200)).await; + let modules = context .get(&account_modules( &context.root_account().await.address().to_hex_literal(), diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs index 2dea7a1c91d393..4fc49928c9a4c0 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs @@ -77,13 +77,11 @@ impl InternalIndexerDBService { } pub async fn get_start_version(&self, node_config: &NodeConfig) -> Result { - let indexer_version = self.db_indexer.indexer_db.get_persisted_version()?; let fast_sync_enabled = node_config .state_sync .state_sync_driver .bootstrapping_mode .is_fast_sync(); - let mut db_min_version = self.db_indexer.get_main_db_lowest_viable_version()?; let mut main_db_synced_version = self.db_indexer.main_db_reader.get_synced_version()?; // Wait till fast sync is done @@ -91,24 +89,47 @@ impl InternalIndexerDBService { tokio::time::sleep(std::time::Duration::from_secs(1)).await; main_db_synced_version = self.db_indexer.main_db_reader.get_synced_version()?; } - - let fast_sync_version_opt = self + + let start_version = self .db_indexer .indexer_db - .get_restore_progress(db_min_version)?; + .get_persisted_version()? + .map_or(0, |v| v + 1); + + if node_config.indexer_db_config.enable_statekeys() { + let state_start_version = self + .db_indexer + .indexer_db + .get_state_version()? + .map_or(0, |v| v + 1); + if start_version != state_start_version { + panic!("Cannot start state indexer because the progress doesn't match."); + } + } - if node_config.indexer_db_config.enable_statekeys() - && fast_sync_enabled - && fast_sync_version_opt.is_none() - { - panic!("Internal indexer db don't have state keys restored. Please run state sync with state keys enabled."); + if node_config.indexer_db_config.enable_transaction() { + let transaction_start_version = self + .db_indexer + .indexer_db + .get_transaction_version()? + .map_or(0, |v| v + 1); + if start_version != transaction_start_version { + panic!("Cannot start transaction indexer because the progress doesn't match."); + } } - if indexer_version >= db_min_version { - Ok(indexer_version) - } else { - Ok(db_min_version) + if node_config.indexer_db_config.enable_event() { + let event_start_version = self + .db_indexer + .indexer_db + .get_event_version()? + .map_or(0, |v| v + 1); + if start_version != event_start_version { + panic!("Cannot start event indexer because the progress doesn't match."); + } } + + Ok(start_version) } pub async fn run(&mut self, node_config: &NodeConfig) -> Result<()> { @@ -116,10 +137,10 @@ impl InternalIndexerDBService { loop { let start_time: std::time::Instant = std::time::Instant::now(); - let next_version = self.db_indexer.process_a_batch(Some(start_version))?; + let next_version = self.db_indexer.process_a_batch(start_version)?; if next_version == start_version { - tokio::time::sleep(std::time::Duration::from_millis(10)).await; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; continue; } log_grpc_step( diff --git a/ecosystem/indexer-grpc/transaction-filter/src/boolean_transaction_filter.rs b/ecosystem/indexer-grpc/transaction-filter/src/boolean_transaction_filter.rs index 5eb6727faf04b4..4adaad60e5e0a9 100644 --- a/ecosystem/indexer-grpc/transaction-filter/src/boolean_transaction_filter.rs +++ b/ecosystem/indexer-grpc/transaction-filter/src/boolean_transaction_filter.rs @@ -286,13 +286,10 @@ impl Filterable for APIFilter { #[cfg(test)] mod test { use super::*; - use crate::{ - filters::{ - event::EventFilterBuilder, move_module::MoveStructTagFilterBuilder, - /*user_transaction::EntryFunctionFilter,*/ TransactionRootFilterBuilder, - UserTransactionFilterBuilder, /*UserTransactionPayloadFilterBuilder,*/ - }, - /*test_lib::load_graffio_fixture,*/ + use crate::filters::{ + event::EventFilterBuilder, move_module::MoveStructTagFilterBuilder, + /*user_transaction::EntryFunctionFilter,*/ TransactionRootFilterBuilder, + UserTransactionFilterBuilder, /*UserTransactionPayloadFilterBuilder,*/ }; // Disabled for now while we investigate an issue with lz4 in aptos-core: diff --git a/execution/executor/tests/internal_indexer_test.rs b/execution/executor/tests/internal_indexer_test.rs index 61ed3465ecd472..7e85cc90c97527 100644 --- a/execution/executor/tests/internal_indexer_test.rs +++ b/execution/executor/tests/internal_indexer_test.rs @@ -149,17 +149,18 @@ fn test_db_indexer_data() { let db_indexer = DBIndexer::new(internal_indexer_db.clone(), aptos_db.clone()); // assert the data matches the expected data - let mut version = internal_indexer_db.get_persisted_version().unwrap(); - assert_eq!(version, 0); - while version < total_version { - version = db_indexer.process_a_batch(Some(version)).unwrap(); + let version = internal_indexer_db.get_persisted_version().unwrap(); + assert_eq!(version, None); + let mut start_version = version.map_or(0, |v| v + 1); + while start_version < total_version { + start_version = db_indexer.process_a_batch(start_version).unwrap(); } // wait for the commit to finish thread::sleep(Duration::from_millis(100)); // indexer has process all the transactions assert_eq!( internal_indexer_db.get_persisted_version().unwrap(), - total_version + Some(total_version) ); let txn_iter = internal_indexer_db diff --git a/state-sync/state-sync-driver/Cargo.toml b/state-sync/state-sync-driver/Cargo.toml index 09ca3e39b9ea61..8b0d8dc97948ff 100644 --- a/state-sync/state-sync-driver/Cargo.toml +++ b/state-sync/state-sync-driver/Cargo.toml @@ -19,7 +19,6 @@ aptos-consensus-notifications = { workspace = true } aptos-crypto = { workspace = true } aptos-data-client = { workspace = true } aptos-data-streaming-service = { workspace = true } -aptos-db-indexer = { workspace = true } aptos-event-notifications = { workspace = true } aptos-executor-types = { workspace = true } aptos-infallible = { workspace = true } diff --git a/storage/aptosdb/src/state_restore/mod.rs b/storage/aptosdb/src/state_restore/mod.rs index 0442ed7e990628..cd09dec6adb371 100644 --- a/storage/aptosdb/src/state_restore/mod.rs +++ b/storage/aptosdb/src/state_restore/mod.rs @@ -40,7 +40,7 @@ pub trait StateValueWriter: Send + Sync { progress: StateSnapshotProgress, ) -> Result<()>; - fn write_usage(&self, version: Version, usage: StateStorageUsage) -> Result<()>; + fn kv_finish(&self, version: Version, usage: StateStorageUsage) -> Result<()>; fn get_progress(&self, version: Version) -> Result>; } @@ -127,7 +127,7 @@ impl StateValueRestore { pub fn finish(self) -> Result<()> { let progress = self.db.get_progress(self.version)?; - self.db.write_usage( + self.db.kv_finish( self.version, progress.map_or(StateStorageUsage::zero(), |p| p.usage), ) @@ -277,17 +277,6 @@ impl StateSnapshotReceiver } fn finish_box(self: Box) -> Result<()> { - match self.restore_mode { - StateSnapshotRestoreMode::KvOnly => self.kv_restore.lock().take().unwrap().finish()?, - StateSnapshotRestoreMode::TreeOnly => { - self.tree_restore.lock().take().unwrap().finish_impl()? - }, - StateSnapshotRestoreMode::Default => { - // for tree only mode, we also need to write the usage to DB - self.kv_restore.lock().take().unwrap().finish()?; - self.tree_restore.lock().take().unwrap().finish_impl()? - }, - } - Ok(()) + self.finish() } } diff --git a/storage/aptosdb/src/state_restore/restore_test.rs b/storage/aptosdb/src/state_restore/restore_test.rs index bbd9f55664bd39..e2f54589d771e5 100644 --- a/storage/aptosdb/src/state_restore/restore_test.rs +++ b/storage/aptosdb/src/state_restore/restore_test.rs @@ -88,7 +88,7 @@ where Ok(()) } - fn write_usage(&self, version: Version, usage: StateStorageUsage) -> Result<()> { + fn kv_finish(&self, version: Version, usage: StateStorageUsage) -> Result<()> { self.usage_store.write().insert(version, usage); Ok(()) } diff --git a/storage/aptosdb/src/state_store/mod.rs b/storage/aptosdb/src/state_store/mod.rs index c849eb425f5940..2af2839dd939f3 100644 --- a/storage/aptosdb/src/state_store/mod.rs +++ b/storage/aptosdb/src/state_store/mod.rs @@ -36,7 +36,10 @@ use aptos_crypto::{ HashValue, }; use aptos_db_indexer::db_indexer::InternalIndexerDB; -use aptos_db_indexer_schemas::metadata::StateSnapshotProgress; +use aptos_db_indexer_schemas::{ + metadata::{MetadataKey, MetadataValue, StateSnapshotProgress}, + schema::indexer_metadata::InternalIndexerMetadataSchema, +}; use aptos_executor::components::in_memory_state_calculator_v2::InMemoryStateCalculatorV2; use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER; use aptos_infallible::Mutex; @@ -1189,8 +1192,40 @@ impl StateValueWriter for StateStore { .commit(version, batch, sharded_schema_batch) } - fn write_usage(&self, version: Version, usage: StateStorageUsage) -> Result<()> { - self.ledger_db.metadata_db().put_usage(version, usage) + fn kv_finish(&self, version: Version, usage: StateStorageUsage) -> Result<()> { + self.ledger_db.metadata_db().put_usage(version, usage)?; + if let Some(internal_indexer_db) = self.internal_indexer_db.as_ref() { + if version > 0 { + let batch = SchemaBatch::new(); + batch.put::( + &MetadataKey::LatestVersion, + &MetadataValue::Version(version - 1), + )?; + if internal_indexer_db.statekeys_enabled() { + batch.put::( + &MetadataKey::StateVersion, + &MetadataValue::Version(version - 1), + )?; + } + if internal_indexer_db.transaction_enabled() { + batch.put::( + &MetadataKey::TransactionVersion, + &MetadataValue::Version(version - 1), + )?; + } + if internal_indexer_db.event_enabled() { + batch.put::( + &MetadataKey::EventVersion, + &MetadataValue::Version(version - 1), + )?; + } + internal_indexer_db + .get_inner_db_ref() + .write_schemas(batch)?; + } + } + + Ok(()) } fn get_progress(&self, version: Version) -> Result> { diff --git a/storage/backup/backup-cli/src/utils/mod.rs b/storage/backup/backup-cli/src/utils/mod.rs index 75039f2d8ee282..f814890df5e76e 100644 --- a/storage/backup/backup-cli/src/utils/mod.rs +++ b/storage/backup/backup-cli/src/utils/mod.rs @@ -186,7 +186,7 @@ impl StateValueWriter for MockStore { Ok(()) } - fn write_usage(&self, _version: Version, _usage: StateStorageUsage) -> Result<()> { + fn kv_finish(&self, _version: Version, _usage: StateStorageUsage) -> Result<()> { Ok(()) } diff --git a/storage/indexer/src/db_indexer.rs b/storage/indexer/src/db_indexer.rs index 55cf980fdffa28..6cd5355bb9075f 100644 --- a/storage/indexer/src/db_indexer.rs +++ b/storage/indexer/src/db_indexer.rs @@ -98,11 +98,20 @@ impl InternalIndexerDB { Ok(()) } - pub fn get_persisted_version(&self) -> Result { - // read the latest key from the db - self.db - .get::(&MetadataKey::LatestVersion)? - .map_or(Ok(0), |metavalue| Ok(metavalue.expect_version())) + pub fn get_persisted_version(&self) -> Result> { + self.get_version(&MetadataKey::LatestVersion) + } + + pub fn get_event_version(&self) -> Result> { + self.get_version(&MetadataKey::EventVersion) + } + + pub fn get_state_version(&self) -> Result> { + self.get_version(&MetadataKey::StateVersion) + } + + pub fn get_transaction_version(&self) -> Result> { + self.get_version(&MetadataKey::TransactionVersion) } pub fn event_enabled(&self) -> bool { @@ -136,11 +145,13 @@ impl InternalIndexerDB { pub fn ensure_cover_ledger_version(&self, ledger_version: Version) -> Result<()> { let indexer_latest_version = self.get_persisted_version()?; - ensure!( - indexer_latest_version >= ledger_version, - "ledger version too new" - ); - Ok(()) + if let Some(indexer_latest_version) = indexer_latest_version { + if indexer_latest_version >= ledger_version { + return Ok(()); + } + } + + bail!("ledger version too new") } pub fn get_account_transaction_version_iter( @@ -255,6 +266,13 @@ impl InternalIndexerDB { (event_key, txn_version, seq_num, idx) }))) } + + fn get_version(&self, key: &MetadataKey) -> Result> { + Ok(self + .db + .get::(key)? + .map(|v| v.expect_version())) + } } pub struct DBIndexer { @@ -302,15 +320,6 @@ impl DBIndexer { .expect("main db lowest viable version doesn't exist") } - pub fn ensure_cover_ledger_version(&self, ledger_version: Version) -> Result<()> { - let indexer_latest_version = self.indexer_db.get_persisted_version()?; - ensure!( - indexer_latest_version >= ledger_version, - "ledger version too new" - ); - Ok(()) - } - fn get_main_db_iter( &self, start_version: Version, @@ -345,15 +354,14 @@ impl DBIndexer { } // we want to include the last transaction since the iterator interface will is right exclusive. let num_of_transaction = min( - (self.indexer_db.config.batch_size + 1) as u64, + self.indexer_db.config.batch_size as u64, highest_version + 1 - version, ); Ok(num_of_transaction) } - pub fn process_a_batch(&self, start_version: Option) -> Result { - let mut version = start_version.unwrap_or(0); - + pub fn process_a_batch(&self, start_version: Version) -> Result { + let mut version = start_version; let num_transactions = self.get_num_of_transactions(version)?; let mut db_iter = self.get_main_db_iter(version, num_transactions)?; let batch = SchemaBatch::new(); @@ -399,7 +407,25 @@ impl DBIndexer { version += 1; Ok::<(), AptosDbError>(()) })?; - assert_eq!(num_transactions, version - start_version.unwrap_or(0)); + assert_eq!(num_transactions, version - start_version); + if self.indexer_db.transaction_enabled() { + batch.put::( + &MetadataKey::TransactionVersion, + &MetadataValue::Version(version), + )?; + } + if self.indexer_db.event_enabled() { + batch.put::( + &MetadataKey::EventVersion, + &MetadataValue::Version(version), + )?; + } + if self.indexer_db.statekeys_enabled() { + batch.put::( + &MetadataKey::StateVersion, + &MetadataValue::Version(version), + )?; + } batch.put::( &MetadataKey::LatestVersion, &MetadataValue::Version(version - 1), diff --git a/storage/indexer/src/indexer_reader.rs b/storage/indexer/src/indexer_reader.rs index 11b8cbbcc4786f..0368d80c612a52 100644 --- a/storage/indexer/src/indexer_reader.rs +++ b/storage/indexer/src/indexer_reader.rs @@ -47,7 +47,7 @@ impl IndexerReader for IndexerReaders { anyhow::bail!("Table info reader is not available") } - fn get_latest_internal_indexer_ledger_version(&self) -> anyhow::Result { + fn get_latest_internal_indexer_ledger_version(&self) -> anyhow::Result> { if let Some(db_indexer) = &self.db_indexer_reader { return Ok(db_indexer.indexer_db.get_persisted_version()?); } diff --git a/storage/indexer_schemas/src/metadata.rs b/storage/indexer_schemas/src/metadata.rs index ada7f08f9d9381..940f724da79d08 100644 --- a/storage/indexer_schemas/src/metadata.rs +++ b/storage/indexer_schemas/src/metadata.rs @@ -35,6 +35,9 @@ pub enum MetadataKey { EventPrunerProgress, TransactionPrunerProgress, StateSnapshotRestoreProgress(Version), + EventVersion, + StateVersion, + TransactionVersion, } #[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] diff --git a/testsuite/smoke-test/Cargo.toml b/testsuite/smoke-test/Cargo.toml index ce1b97913e5c8d..716ab990c37c2f 100644 --- a/testsuite/smoke-test/Cargo.toml +++ b/testsuite/smoke-test/Cargo.toml @@ -21,7 +21,6 @@ aptos-config = { workspace = true } aptos-consensus = { workspace = true } aptos-crypto = { workspace = true } aptos-db = { workspace = true } -aptos-db-indexer = { workspace = true, features = ["fuzzing"]} aptos-db-indexer-schemas = { workspace = true } aptos-debugger = { workspace = true } aptos-dkg = { workspace = true } @@ -66,6 +65,7 @@ walkdir = { workspace = true } [dev-dependencies] aptos-backup-cli = { workspace = true } +aptos-db-indexer = { workspace = true, features = ["fuzzing"]} aptos-genesis = { workspace = true } aptos-infallible = { workspace = true } aptos-logger = { workspace = true } diff --git a/types/src/indexer/indexer_db_reader.rs b/types/src/indexer/indexer_db_reader.rs index 3c8f7b79536d34..d5d6e8a8922908 100644 --- a/types/src/indexer/indexer_db_reader.rs +++ b/types/src/indexer/indexer_db_reader.rs @@ -57,5 +57,5 @@ pub trait IndexerReader: Send + Sync { version: Version, ) -> Result> + '_>>; - fn get_latest_internal_indexer_ledger_version(&self) -> Result; + fn get_latest_internal_indexer_ledger_version(&self) -> Result>; }