diff --git a/Cargo.lock b/Cargo.lock index 0a6b899179e535..cb30f7d5723c0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -510,7 +510,6 @@ dependencies = [ "anyhow", "aptos-config", "aptos-crypto", - "aptos-db-indexer", "aptos-framework", "aptos-logger", "aptos-openapi", @@ -1159,8 +1158,10 @@ dependencies = [ "aptos-resource-viewer", "aptos-rocksdb-options", "aptos-schemadb", + "aptos-sdk", "aptos-storage-interface", "aptos-types", + "aptos-vm-genesis", "bcs 0.1.4", "byteorder", "bytes", @@ -1348,6 +1349,7 @@ dependencies = [ "aptos-consensus-types", "aptos-crypto", "aptos-db", + "aptos-db-indexer", "aptos-drop-helper", "aptos-executor-service", "aptos-executor-test-helpers", @@ -1358,6 +1360,7 @@ dependencies = [ "aptos-logger", "aptos-metrics-core", "aptos-scratchpad", + "aptos-sdk", "aptos-storage-interface", "aptos-temppath", "aptos-types", diff --git a/api/src/context.rs b/api/src/context.rs index 728bece4bd89b6..508ef6b6a721d7 100644 --- a/api/src/context.rs +++ b/api/src/context.rs @@ -18,9 +18,6 @@ use aptos_api_types::{ }; use aptos_config::config::{NodeConfig, RoleType}; use aptos_crypto::HashValue; -use aptos_db_indexer::{ - db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader, -}; use aptos_gas_schedule::{AptosGasParameters, FromOnChainGasSchedule}; use aptos_logger::{error, info, Schema}; use aptos_mempool::{MempoolClientRequest, MempoolClientSender, SubmissionStatus}; @@ -36,6 +33,9 @@ use aptos_types::{ chain_id::ChainId, contract_event::EventWithVersion, event::EventKey, + indexer::{ + db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader, + }, ledger_info::LedgerInfoWithSignatures, on_chain_config::{GasSchedule, GasScheduleV2, OnChainConfig, OnChainExecutionConfig}, state_store::{ diff --git a/api/src/runtime.rs b/api/src/runtime.rs index 7dc18449d56951..3e74927aaf7f84 100644 --- a/api/src/runtime.rs +++ b/api/src/runtime.rs @@ -10,13 +10,15 @@ use crate::{ }; use anyhow::Context as AnyhowContext; use aptos_config::config::{ApiConfig, NodeConfig}; -use aptos_db_indexer::{ - db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader, -}; use aptos_logger::info; use aptos_mempool::MempoolClientSender; use aptos_storage_interface::DbReader; -use aptos_types::chain_id::ChainId; +use aptos_types::{ + chain_id::ChainId, + indexer::{ + db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader, + }, +}; use poem::{ handler, http::Method, diff --git a/api/types/Cargo.toml b/api/types/Cargo.toml index 1ff09c771eed3e..4f395c7457dd97 100644 --- a/api/types/Cargo.toml +++ b/api/types/Cargo.toml @@ -16,7 +16,6 @@ rust-version = { workspace = true } anyhow = { workspace = true } aptos-config = { workspace = true } aptos-crypto = { workspace = true } -aptos-db-indexer = { workspace = true } aptos-framework = { workspace = true } aptos-logger = { workspace = true } aptos-openapi = { workspace = true } diff --git a/api/types/src/convert.rs b/api/types/src/convert.rs index 09798fb76a4a58..fc8f895977a757 100644 --- a/api/types/src/convert.rs +++ b/api/types/src/convert.rs @@ -18,7 +18,6 @@ use crate::{ }; use anyhow::{bail, ensure, format_err, Context as AnyhowContext, Result}; use aptos_crypto::{hash::CryptoHash, HashValue}; -use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_logger::{sample, sample::SampleRate}; use aptos_resource_viewer::AptosValueAnnotator; use aptos_storage_interface::DbReader; @@ -26,6 +25,7 @@ use aptos_types::{ access_path::{AccessPath, Path}, chain_id::ChainId, contract_event::{ContractEvent, EventWithVersion}, + indexer::table_info_reader::TableInfoReader, state_store::{ state_key::{inner::StateKeyInner, StateKey}, table::{TableHandle, TableInfo}, diff --git a/aptos-node/src/services.rs b/aptos-node/src/services.rs index e757aa4cd6bfbc..01ea24968451bd 100644 --- a/aptos-node/src/services.rs +++ b/aptos-node/src/services.rs @@ -11,9 +11,6 @@ use aptos_consensus::{ }; use aptos_consensus_notifications::ConsensusNotifier; use aptos_data_client::client::AptosDataClient; -use aptos_db_indexer::{ - db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader, -}; use aptos_event_notifications::{DbBackedOnChainConfig, ReconfigNotificationListener}; use aptos_indexer_grpc_fullnode::runtime::bootstrap as bootstrap_indexer_grpc; use aptos_indexer_grpc_table_info::runtime::{ @@ -34,7 +31,12 @@ use aptos_peer_monitoring_service_server::{ use aptos_peer_monitoring_service_types::PeerMonitoringServiceMessage; use aptos_storage_interface::{DbReader, DbReaderWriter}; use aptos_time_service::TimeService; -use aptos_types::chain_id::ChainId; +use aptos_types::{ + chain_id::ChainId, + indexer::{ + db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader, + }, +}; use aptos_validator_transaction_pool::VTxnPoolState; use futures::channel::{mpsc, mpsc::Sender}; use std::{sync::Arc, time::Instant}; diff --git a/config/src/config/index_db_tailer_config.rs b/config/src/config/index_db_tailer_config.rs index 5258c36ad02096..c7fd78c2a40578 100644 --- a/config/src/config/index_db_tailer_config.rs +++ b/config/src/config/index_db_tailer_config.rs @@ -9,6 +9,20 @@ pub struct IndexDBTailerConfig { pub batch_size: usize, } +impl IndexDBTailerConfig { + pub fn new(enable: bool, batch_size: usize) -> Self { + Self { enable, batch_size } + } + + pub fn enable(&self) -> bool { + self.enable + } + + pub fn batch_size(&self) -> usize { + self.batch_size + } +} + impl Default for IndexDBTailerConfig { fn default() -> Self { Self { diff --git a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs index e099377917eb0f..1fbfdf3a765f71 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs @@ -7,7 +7,6 @@ use crate::{ }; use aptos_api::context::Context; use aptos_config::config::NodeConfig; -use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_logger::info; use aptos_mempool::MempoolClientSender; use aptos_protos::{ @@ -19,7 +18,7 @@ use aptos_protos::{ util::timestamp::FILE_DESCRIPTOR_SET as UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET, }; use aptos_storage_interface::DbReader; -use aptos_types::chain_id::ChainId; +use aptos_types::{chain_id::ChainId, indexer::table_info_reader::TableInfoReader}; use std::{net::ToSocketAddrs, sync::Arc}; use tokio::runtime::Runtime; use tonic::{codec::CompressionEncoding, transport::Server}; diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/tailer_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/tailer_service.rs index af048722e9f12b..37bfdc3de74742 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/tailer_service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/tailer_service.rs @@ -39,13 +39,18 @@ impl TailerService { } pub async fn run(&mut self) { - let mut start_version = self.db_tailer.get_last_version(); + let mut start_version = self.db_tailer.get_persisted_version(); loop { let start_time: std::time::Instant = std::time::Instant::now(); let cur_version = self .db_tailer .process_a_batch(Some(start_version)) .expect("Failed to run indexer db tailer"); + + if cur_version == start_version { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + continue; + } start_version = cur_version; log_grpc_step( SERVICE_TYPE, diff --git a/execution/executor/Cargo.toml b/execution/executor/Cargo.toml index aec6c3ca9ae542..4c1239884863b0 100644 --- a/execution/executor/Cargo.toml +++ b/execution/executor/Cargo.toml @@ -20,6 +20,7 @@ aptos-drop-helper = { workspace = true } aptos-executor-service = { workspace = true } aptos-executor-types = { workspace = true } aptos-experimental-runtimes = { workspace = true } +aptos-sdk = { workspace = true } aptos-infallible = { workspace = true } aptos-logger = { workspace = true } aptos-metrics-core = { workspace = true } @@ -42,6 +43,7 @@ serde = { workspace = true } aptos-cached-packages = { workspace = true } aptos-config = { workspace = true } aptos-db = { workspace = true } +aptos-db-indexer = { workspace = true, features = ["test"] } aptos-executor-test-helpers = { workspace = true } aptos-genesis = { workspace = true } aptos-storage-interface = { workspace = true } diff --git a/execution/executor/tests/internal_indexer_test.rs b/execution/executor/tests/internal_indexer_test.rs new file mode 100644 index 00000000000000..6675bb0231bbfd --- /dev/null +++ b/execution/executor/tests/internal_indexer_test.rs @@ -0,0 +1,161 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_cached_packages::aptos_stdlib; +use aptos_config::config::{index_db_tailer_config::IndexDBTailerConfig, RocksdbConfig}; +use aptos_db::AptosDB; +use aptos_db_indexer::{db_ops::open_tailer_db, db_tailer::DBTailer}; +use aptos_executor_test_helpers::{ + gen_block_id, gen_ledger_info_with_sigs, integration_test_impl::create_db_and_executor, +}; +use aptos_executor_types::BlockExecutorTrait; +use aptos_sdk::{ + transaction_builder::TransactionFactory, + types::{AccountKey, LocalAccount}, +}; +use aptos_storage_interface::DbReader; +use aptos_temppath::TempPath; +use aptos_types::{ + account_config::aptos_test_root_address, + block_metadata::BlockMetadata, + chain_id::ChainId, + test_helpers::transaction_test_helpers::TEST_BLOCK_EXECUTOR_ONCHAIN_CONFIG, + transaction::{ + signature_verified_transaction::into_signature_verified_block, Transaction, + Transaction::UserTransaction, WriteSetPayload, + }, +}; +use rand::SeedableRng; +use std::sync::Arc; + +const B: u64 = 1_000_000_000; + +#[cfg(test)] +pub fn create_test_db() -> (Arc, LocalAccount) { + // create test db + let path = aptos_temppath::TempPath::new(); + let (genesis, validators) = aptos_vm_genesis::test_genesis_change_set_and_validators(Some(1)); + let genesis_txn = Transaction::GenesisTransaction(WriteSetPayload::Direct(genesis)); + let core_resources_account: LocalAccount = LocalAccount::new( + aptos_test_root_address(), + AccountKey::from_private_key(aptos_vm_genesis::GENESIS_KEYPAIR.0.clone()), + 0, + ); + let (aptos_db, _db, executor, _waypoint) = + create_db_and_executor(path.path(), &genesis_txn, true); + let parent_block_id = executor.committed_block_id(); + + // This generates accounts that do not overlap with genesis + let seed = [3u8; 32]; + let mut rng = ::rand::rngs::StdRng::from_seed(seed); + let signer = aptos_types::validator_signer::ValidatorSigner::new( + validators[0].data.owner_address, + validators[0].consensus_key.clone(), + ); + let account1 = LocalAccount::generate(&mut rng); + let account2 = LocalAccount::generate(&mut rng); + let account3 = LocalAccount::generate(&mut rng); + + let txn_factory = TransactionFactory::new(ChainId::test()); + + let block1_id = gen_block_id(1); + let block1_meta = Transaction::BlockMetadata(BlockMetadata::new( + block1_id, + 1, + 0, + signer.author(), + vec![0], + vec![], + 1, + )); + let tx1 = core_resources_account + .sign_with_transaction_builder(txn_factory.create_user_account(account1.public_key())); + let tx2 = core_resources_account + .sign_with_transaction_builder(txn_factory.create_user_account(account2.public_key())); + let tx3 = core_resources_account + .sign_with_transaction_builder(txn_factory.create_user_account(account3.public_key())); + // Create account1 with 2T coins. + let txn1 = core_resources_account + .sign_with_transaction_builder(txn_factory.mint(account1.address(), 2_000 * B)); + // Create account2 with 1.2T coins. + let txn2 = core_resources_account + .sign_with_transaction_builder(txn_factory.mint(account2.address(), 1_200 * B)); + // Create account3 with 1T coins. + let txn3 = core_resources_account + .sign_with_transaction_builder(txn_factory.mint(account3.address(), 1_000 * B)); + + // Transfer 20B coins from account1 to account2. + // balance: <1.98T, 1.22T, 1T + let txn4 = + account1.sign_with_transaction_builder(txn_factory.transfer(account2.address(), 20 * B)); + + // Transfer 10B coins from account2 to account3. + // balance: <1.98T, <1.21T, 1.01T + let txn5 = + account2.sign_with_transaction_builder(txn_factory.transfer(account3.address(), 10 * B)); + + // Transfer 70B coins from account1 to account3. + // balance: <1.91T, <1.21T, 1.08T + let txn6 = + account1.sign_with_transaction_builder(txn_factory.transfer(account3.address(), 70 * B)); + + let reconfig1 = core_resources_account.sign_with_transaction_builder( + txn_factory.payload(aptos_stdlib::aptos_governance_force_end_epoch_test_only()), + ); + + let block1: Vec<_> = into_signature_verified_block(vec![ + block1_meta, + UserTransaction(tx1), + UserTransaction(tx2), + UserTransaction(tx3), + UserTransaction(txn1), + UserTransaction(txn2), + UserTransaction(txn3), + UserTransaction(txn4), + UserTransaction(txn5), + UserTransaction(txn6), + UserTransaction(reconfig1), + ]); + let output1 = executor + .execute_block( + (block1_id, block1.clone()).into(), + parent_block_id, + TEST_BLOCK_EXECUTOR_ONCHAIN_CONFIG, + ) + .unwrap(); + let li1 = gen_ledger_info_with_sigs(1, &output1, block1_id, &[signer.clone()]); + executor.commit_blocks(vec![block1_id], li1).unwrap(); + (aptos_db, core_resources_account) +} + +#[test] +fn test_db_tailer_data() { + // create test db + let (aptos_db, core_account) = create_test_db(); + let total_version = aptos_db.get_latest_version().unwrap(); + // create db tailer + let rocksdb_config = RocksdbConfig::default(); + let temp_path = TempPath::new(); + let db = Arc::new( + open_tailer_db(temp_path.as_ref(), &rocksdb_config) + .expect("Failed to open up indexer db tailer initially"), + ); + let tailer = DBTailer::new(db, aptos_db, &IndexDBTailerConfig::new(true, 2)); + // assert the data matches the expected data + let mut version = tailer.get_persisted_version(); + assert_eq!(version, 0); + while version < total_version { + version = tailer.process_a_batch(Some(version)).unwrap(); + } + let txn_iter = tailer + .get_account_transaction_version_iter(core_account.address(), 0, 1000, 1000) + .unwrap(); + let res: Vec<_> = txn_iter.collect(); + // core account submitted 7 transactions, and the first transaction is version 2 + assert!(res.len() == 7); + assert!(res[0].as_ref().unwrap().1 == 2); + + let x = tailer.get_event_by_key_iter().unwrap(); + let res: Vec<_> = x.collect(); + assert!(!res.is_empty()); +} diff --git a/storage/indexer/Cargo.toml b/storage/indexer/Cargo.toml index 36e704b064e309..0c111007b9aa14 100644 --- a/storage/indexer/Cargo.toml +++ b/storage/indexer/Cargo.toml @@ -19,7 +19,9 @@ aptos-logger = { workspace = true } aptos-resource-viewer = { workspace = true } aptos-rocksdb-options = { workspace = true } aptos-schemadb = { workspace = true } +aptos-sdk = { workspace = true } aptos-storage-interface = { workspace = true } +aptos-vm-genesis = { workspace = true } aptos-types = { workspace = true } bcs = { workspace = true } bytes = { workspace = true } @@ -40,4 +42,5 @@ rand = { workspace = true } [features] default = [] +test = [] fuzzing = ["proptest", "proptest-derive", "aptos-types/fuzzing", "aptos-schemadb/fuzzing"] diff --git a/storage/indexer/src/db_ops.rs b/storage/indexer/src/db_ops.rs index 2f109bed876f04..000997ab32f4a8 100644 --- a/storage/indexer/src/db_ops.rs +++ b/storage/indexer/src/db_ops.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::schema::column_families; +use crate::schema::{column_families, tailer_column_families}; use anyhow::Result; use aptos_config::config::RocksdbConfig; use aptos_rocksdb_options::gen_rocksdb_options; @@ -17,6 +17,15 @@ pub fn open_db>(db_path: P, rocksdb_config: &RocksdbConfig) -> Re )?) } +pub fn open_tailer_db>(db_path: P, rocksdb_config: &RocksdbConfig) -> Result { + Ok(DB::open( + db_path, + "tailer_db", + tailer_column_families(), + &gen_rocksdb_options(rocksdb_config, false), + )?) +} + pub fn close_db(db: DB) { mem::drop(db) } diff --git a/storage/indexer/src/db_tailer.rs b/storage/indexer/src/db_tailer.rs index 2d642bd8aa753f..014fda1ae3514a 100644 --- a/storage/indexer/src/db_tailer.rs +++ b/storage/indexer/src/db_tailer.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - db_tailer_reader::IndexerTransactionEventReader, schema::{ event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema, indexer_metadata::TailerMetadataSchema, transaction_by_account::TransactionByAccountSchema, @@ -15,18 +14,18 @@ use crate::{ use aptos_config::config::index_db_tailer_config::IndexDBTailerConfig; use aptos_schemadb::{ReadOptions, SchemaBatch, DB}; use aptos_storage_interface::{ - db_ensure as ensure, db_other_bail as bail, AptosDbError, DbReader, Order, Result, + db_ensure as ensure, db_other_bail as bail, AptosDbError, DbReader, Result, }; use aptos_types::{ account_address::AccountAddress, contract_event::{ContractEvent, EventWithVersion}, event::EventKey, + indexer::db_tailer_reader::{IndexerTransactionEventReader, Order}, transaction::{AccountTransactionsWithProof, Version}, }; use std::sync::Arc; pub struct DBTailer { - pub last_version: Version, pub db: Arc, pub main_db_reader: Arc, batch_size: usize, @@ -34,18 +33,17 @@ pub struct DBTailer { impl DBTailer { pub fn new(db: Arc, db_reader: Arc, config: &IndexDBTailerConfig) -> Self { - let last_version = Self::initialize(db.clone()); Self { - last_version, db, main_db_reader: db_reader, batch_size: config.batch_size, } } - fn initialize(db: Arc) -> Version { + pub fn get_persisted_version(&self) -> Version { // read the latest key from the db - let mut rev_iter_res = db + let mut rev_iter_res = self + .db .rev_iter::(Default::default()) .expect("Cannot create db tailer metadata iterator"); rev_iter_res @@ -55,13 +53,20 @@ impl DBTailer { } pub fn process_a_batch(&self, start_version: Option) -> Result { - let db_iter = self + let mut version = start_version.unwrap_or(0); + let db_iter: Box< + dyn Iterator< + Item = std::prelude::v1::Result< + (aptos_types::transaction::Transaction, Vec), + AptosDbError, + >, + >, + > = self .main_db_reader - .get_db_backup_iter(start_version.unwrap_or(self.last_version), self.batch_size) + .get_db_backup_iter(version, self.batch_size) .expect("Cannot create db tailer iterator"); let batch = SchemaBatch::new(); let metadata_batch = SchemaBatch::new(); - let mut version = self.last_version; db_iter.for_each(|res| { res.map(|(txn, events)| { if let Some(txn) = txn.try_as_signed_user_txn() { @@ -101,10 +106,6 @@ impl DBTailer { Ok(version) } - pub fn get_last_version(&self) -> Version { - self.last_version - } - pub fn get_account_transaction_version_iter( &self, address: AccountAddress, @@ -181,6 +182,18 @@ impl DBTailer { Ok(result) } + + #[cfg(any(test, feature = "test"))] + pub fn get_event_by_key_iter( + &self, + ) -> Result + '_>> { + let mut iter = self.db.iter::(ReadOptions::default())?; + iter.seek_to_first(); + Ok(Box::new(iter.map(|res| { + let ((event_key, seq_num), (txn_version, idx)) = res.unwrap(); + (event_key, txn_version, seq_num, idx) + }))) + } } impl IndexerTransactionEventReader for DBTailer { @@ -191,7 +204,7 @@ impl IndexerTransactionEventReader for DBTailer { order: Order, limit: u64, ledger_version: Version, - ) -> Result> { + ) -> anyhow::Result> { self.get_events_by_event_key(event_key, start, order, limit, ledger_version) } @@ -202,7 +215,7 @@ impl IndexerTransactionEventReader for DBTailer { order: Order, limit: u64, ledger_version: Version, - ) -> Result> { + ) -> anyhow::Result> { error_if_too_many_requested(limit, MAX_REQUEST_LIMIT)?; let get_latest = order == Order::Descending && start_seq_num == u64::max_value(); @@ -269,7 +282,7 @@ impl IndexerTransactionEventReader for DBTailer { limit: u64, include_events: bool, ledger_version: Version, - ) -> Result { + ) -> anyhow::Result { error_if_too_many_requested(limit, MAX_REQUEST_LIMIT)?; let txns_with_proofs = self diff --git a/storage/indexer/src/lib.rs b/storage/indexer/src/lib.rs index 7ee476accc1474..01990ae2475cab 100644 --- a/storage/indexer/src/lib.rs +++ b/storage/indexer/src/lib.rs @@ -5,7 +5,6 @@ mod db; pub mod db_ops; pub mod db_tailer; -pub mod db_tailer_reader; pub mod db_v2; mod metadata; mod schema; diff --git a/storage/indexer/src/schema/indexer_metadata/test.rs b/storage/indexer/src/schema/indexer_metadata/test.rs index a1117d2b17ea31..0876f4f8180515 100644 --- a/storage/indexer/src/schema/indexer_metadata/test.rs +++ b/storage/indexer/src/schema/indexer_metadata/test.rs @@ -13,6 +13,13 @@ proptest! { ) { assert_encode_decode::(&tag, &metadata); } + + #[test] + fn test_encode_decode_tailer_metadata( + version in any::(), + ) { + assert_encode_decode::(&version, &()); + } } test_no_panic_decoding!(IndexerMetadataSchema); diff --git a/storage/indexer/src/schema/mod.rs b/storage/indexer/src/schema/mod.rs index 4f7dea73c74c66..844e8e9487e392 100644 --- a/storage/indexer/src/schema/mod.rs +++ b/storage/indexer/src/schema/mod.rs @@ -6,11 +6,11 @@ //! //! All schemas are `pub(crate)` so not shown in rustdoc, refer to the source code to see details. -pub(crate) mod event_by_key; -pub(crate) mod event_by_version; -pub(crate) mod indexer_metadata; -pub(crate) mod table_info; -pub(crate) mod transaction_by_account; +pub mod event_by_key; +pub mod event_by_version; +pub mod indexer_metadata; +pub mod table_info; +pub mod transaction_by_account; use aptos_schemadb::ColumnFamilyName; pub const DEFAULT_COLUMN_FAMILY_NAME: ColumnFamilyName = "default"; @@ -26,6 +26,13 @@ pub fn column_families() -> Vec { /* empty cf */ DEFAULT_COLUMN_FAMILY_NAME, INDEXER_METADATA_CF_NAME, TABLE_INFO_CF_NAME, + ] +} + +pub fn tailer_column_families() -> Vec { + vec![ + /* empty cf */ DEFAULT_COLUMN_FAMILY_NAME, + TAILER_METADATA_CF_NAME, EVENT_BY_KEY_CF_NAME, EVENT_BY_VERSION_CF_NAME, TRANSACTION_BY_ACCOUNT_CF_NAME, diff --git a/storage/indexer/src/table_info_reader.rs b/storage/indexer/src/table_info_reader.rs index f0ddcff4aa1ff8..5cb8b05897c451 100644 --- a/storage/indexer/src/table_info_reader.rs +++ b/storage/indexer/src/table_info_reader.rs @@ -2,18 +2,18 @@ // SPDX-License-Identifier: Apache-2.0 use crate::db_v2::IndexerAsyncV2; -use aptos_storage_interface::Result; -use aptos_types::state_store::table::{TableHandle, TableInfo}; +use anyhow::Result; +use aptos_types::{ + indexer::table_info_reader::TableInfoReader, + state_store::table::{TableHandle, TableInfo}, +}; /// Table info reader is to create a thin interface for other services to read the db data, /// this standalone db is officially not part of the AptosDB anymore. /// For services that need table info mapping, they need to acquire this reader in the FN bootstrapping stage. -pub trait TableInfoReader: Send + Sync { - fn get_table_info(&self, handle: TableHandle) -> Result>; -} impl TableInfoReader for IndexerAsyncV2 { fn get_table_info(&self, handle: TableHandle) -> Result> { - self.get_table_info_with_retry(handle) + Ok(self.get_table_info_with_retry(handle)?) } } diff --git a/storage/indexer/src/utils.rs b/storage/indexer/src/utils.rs index d16c615f7e0a2d..f0ccadd63d0476 100644 --- a/storage/indexer/src/utils.rs +++ b/storage/indexer/src/utils.rs @@ -3,8 +3,10 @@ use crate::schema::transaction_by_account::TransactionByAccountSchema; use aptos_schemadb::iterator::SchemaIterator; -use aptos_storage_interface::{db_ensure as ensure, AptosDbError, Order, Result}; -use aptos_types::{account_address::AccountAddress, transaction::Version}; +use aptos_storage_interface::{db_ensure as ensure, AptosDbError, Result}; +use aptos_types::{ + account_address::AccountAddress, indexer::db_tailer_reader::Order, transaction::Version, +}; pub const MAX_REQUEST_LIMIT: u64 = 10_000; diff --git a/storage/storage-interface/src/lib.rs b/storage/storage-interface/src/lib.rs index 29c94d45639f2e..5432dd1c3ddf3f 100644 --- a/storage/storage-interface/src/lib.rs +++ b/storage/storage-interface/src/lib.rs @@ -103,6 +103,15 @@ pub enum Order { Descending, } +impl From for Order { + fn from(order: aptos_types::indexer::db_tailer_reader::Order) -> Self { + match order { + aptos_types::indexer::db_tailer_reader::Order::Ascending => Self::Ascending, + aptos_types::indexer::db_tailer_reader::Order::Descending => Self::Descending, + } + } +} + macro_rules! delegate_read { ($( $(#[$($attr:meta)*])* diff --git a/storage/indexer/src/db_tailer_reader.rs b/types/src/indexer/db_tailer_reader.rs similarity index 88% rename from storage/indexer/src/db_tailer_reader.rs rename to types/src/indexer/db_tailer_reader.rs index 048dd71325ece9..9253c39db139b3 100644 --- a/storage/indexer/src/db_tailer_reader.rs +++ b/types/src/indexer/db_tailer_reader.rs @@ -1,13 +1,19 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use aptos_storage_interface::{Order, Result}; -use aptos_types::{ +use crate::{ account_address::AccountAddress, contract_event::EventWithVersion, event::EventKey, transaction::{AccountTransactionsWithProof, Version}, }; +use anyhow::Result; + +#[derive(Clone, Copy, Eq, PartialEq)] +pub enum Order { + Ascending, + Descending, +} pub trait IndexerTransactionEventReader: Send + Sync { fn get_events( diff --git a/types/src/indexer/mod.rs b/types/src/indexer/mod.rs new file mode 100644 index 00000000000000..2793462e3844f8 --- /dev/null +++ b/types/src/indexer/mod.rs @@ -0,0 +1,5 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod db_tailer_reader; +pub mod table_info_reader; diff --git a/types/src/indexer/table_info_reader.rs b/types/src/indexer/table_info_reader.rs new file mode 100644 index 00000000000000..5d6910e24b9190 --- /dev/null +++ b/types/src/indexer/table_info_reader.rs @@ -0,0 +1,12 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::state_store::table::{TableHandle, TableInfo}; +use anyhow::Result; + +/// Table info reader is to create a thin interface for other services to read the db data, +/// this standalone db is officially not part of the AptosDB anymore. +/// For services that need table info mapping, they need to acquire this reader in the FN bootstrapping stage. +pub trait TableInfoReader: Send + Sync { + fn get_table_info(&self, handle: TableHandle) -> Result>; +} diff --git a/types/src/lib.rs b/types/src/lib.rs index f76c5a467865a5..5cae297bb17ba3 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -20,6 +20,7 @@ pub mod event; pub mod executable; pub mod fee_statement; pub mod governance; +pub mod indexer; pub mod jwks; pub mod ledger_info; pub mod mempool_status; diff --git a/types/src/state_store/state_key/prefix.rs b/types/src/state_store/state_key/prefix.rs index 731dee462bc62f..a29bfd146d3636 100644 --- a/types/src/state_store/state_key/prefix.rs +++ b/types/src/state_store/state_key/prefix.rs @@ -42,12 +42,6 @@ impl From for StateKeyPrefix { } } -impl From> for StateKeyPrefix { - fn from(bytes: Vec) -> Self { - Self::new(StateKeyTag::AccessPath, bytes) - } -} - #[cfg(test)] mod tests { use crate::{