From 13555fae05752ac35b95072abb0f79c24e58f10a Mon Sep 17 00:00:00 2001 From: Bo Wu Date: Fri, 3 May 2024 09:09:30 -0700 Subject: [PATCH] [api] add kv indexer db --- Cargo.lock | 1 + .../src/fake_context.rs | 1 + api/src/context.rs | 7 +- api/src/runtime.rs | 15 +- api/test-context/src/test_context.rs | 1 + aptos-node/src/lib.rs | 3 + aptos-node/src/services.rs | 24 +- aptos-node/src/storage.rs | 1 - config/src/config/index_db_tailer_config.rs | 19 ++ config/src/config/mod.rs | 1 + config/src/config/node_config.rs | 17 +- crates/indexer/src/runtime.rs | 1 + .../indexer-grpc-fullnode/src/runtime.rs | 1 + .../indexer-grpc-table-info/src/lib.rs | 1 + .../indexer-grpc-table-info/src/runtime.rs | 25 +- .../src/table_info_service.rs | 2 +- .../src/tailer_service.rs | 64 ++++ .../indexer-grpc-utils/src/counters.rs | 8 + .../aptosdb/src/db/include/aptosdb_reader.rs | 117 ++++--- storage/indexer/Cargo.toml | 1 + storage/indexer/src/db_tailer.rs | 289 ++++++++++++++++++ storage/indexer/src/db_tailer_reader.rs | 39 +++ storage/indexer/src/lib.rs | 3 + .../indexer/src/schema/event_by_key/mod.rs | 76 +++++ .../indexer/src/schema/event_by_key/test.rs | 21 ++ .../src/schema/event_by_version/mod.rs | 69 +++++ .../src/schema/event_by_version/test.rs | 21 ++ .../src/schema/indexer_metadata/mod.rs | 26 ++ storage/indexer/src/schema/mod.rs | 11 +- .../src/schema/transaction_by_account/mod.rs | 67 ++++ .../src/schema/transaction_by_account/test.rs | 20 ++ storage/indexer/src/utils.rs | 124 ++++++++ storage/storage-interface/src/lib.rs | 19 ++ types/src/state_store/state_key/prefix.rs | 6 + 34 files changed, 1049 insertions(+), 52 deletions(-) create mode 100644 config/src/config/index_db_tailer_config.rs create mode 100644 ecosystem/indexer-grpc/indexer-grpc-table-info/src/tailer_service.rs create mode 100644 storage/indexer/src/db_tailer.rs create mode 100644 storage/indexer/src/db_tailer_reader.rs create mode 100644 storage/indexer/src/schema/event_by_key/mod.rs create mode 100644 storage/indexer/src/schema/event_by_key/test.rs create mode 100644 storage/indexer/src/schema/event_by_version/mod.rs create mode 100644 storage/indexer/src/schema/event_by_version/test.rs create mode 100644 storage/indexer/src/schema/transaction_by_account/mod.rs create mode 100644 storage/indexer/src/schema/transaction_by_account/test.rs create mode 100644 storage/indexer/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 8c2049c485b99e..c3491d78fc137d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1160,6 +1160,7 @@ dependencies = [ "aptos-types", "aptos-vm", "bcs 0.1.4", + "byteorder", "bytes", "dashmap", "move-core-types", diff --git a/api/openapi-spec-generator/src/fake_context.rs b/api/openapi-spec-generator/src/fake_context.rs index 98dee1782b6412..4529ac829b0e28 100644 --- a/api/openapi-spec-generator/src/fake_context.rs +++ b/api/openapi-spec-generator/src/fake_context.rs @@ -17,5 +17,6 @@ pub fn get_fake_context() -> Context { mempool.ac_client, NodeConfig::default(), None, /* table info reader */ + None, ) } diff --git a/api/src/context.rs b/api/src/context.rs index 1bdd1c1b1c6462..5b28342eb1d324 100644 --- a/api/src/context.rs +++ b/api/src/context.rs @@ -18,7 +18,9 @@ use aptos_api_types::{ }; use aptos_config::config::{NodeConfig, RoleType}; use aptos_crypto::HashValue; -use aptos_db_indexer::table_info_reader::TableInfoReader; +use aptos_db_indexer::{ + db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader, +}; use aptos_gas_schedule::{AptosGasParameters, FromOnChainGasSchedule}; use aptos_logger::{error, info, Schema}; use aptos_mempool::{MempoolClientRequest, MempoolClientSender, SubmissionStatus}; @@ -78,6 +80,7 @@ pub struct Context { simulate_txn_stats: Arc, pub table_info_reader: Option>, pub wait_for_hash_active_connections: Arc, + pub txn_event_reader: Option>, } impl std::fmt::Debug for Context { @@ -93,6 +96,7 @@ impl Context { mp_sender: MempoolClientSender, node_config: NodeConfig, table_info_reader: Option>, + txn_event_reader: Option>, ) -> Self { let (view_function_stats, simulate_txn_stats) = { let log_per_call_stats = node_config.api.periodic_function_stats_sec.is_some(); @@ -131,6 +135,7 @@ impl Context { simulate_txn_stats, table_info_reader, wait_for_hash_active_connections: Arc::new(AtomicUsize::new(0)), + txn_event_reader, } } diff --git a/api/src/runtime.rs b/api/src/runtime.rs index 4673c087235a1c..7dc18449d56951 100644 --- a/api/src/runtime.rs +++ b/api/src/runtime.rs @@ -10,7 +10,9 @@ use crate::{ }; use anyhow::Context as AnyhowContext; use aptos_config::config::{ApiConfig, NodeConfig}; -use aptos_db_indexer::table_info_reader::TableInfoReader; +use aptos_db_indexer::{ + db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader, +}; use aptos_logger::info; use aptos_mempool::MempoolClientSender; use aptos_storage_interface::DbReader; @@ -36,11 +38,19 @@ pub fn bootstrap( db: Arc, mp_sender: MempoolClientSender, table_info_reader: Option>, + txn_event_reader: Option>, ) -> anyhow::Result { let max_runtime_workers = get_max_runtime_workers(&config.api); let runtime = aptos_runtimes::spawn_named_runtime("api".into(), Some(max_runtime_workers)); - let context = Context::new(chain_id, db, mp_sender, config.clone(), table_info_reader); + let context = Context::new( + chain_id, + db, + mp_sender, + config.clone(), + table_info_reader, + txn_event_reader, + ); attach_poem_to_runtime(runtime.handle(), context.clone(), config, false) .context("Failed to attach poem to runtime")?; @@ -342,6 +352,7 @@ mod tests { context.db.clone(), context.mempool.ac_client.clone(), None, + None, ); assert!(ret.is_ok()); diff --git a/api/test-context/src/test_context.rs b/api/test-context/src/test_context.rs index f161f72c09cf97..06ba8ac316a5e5 100644 --- a/api/test-context/src/test_context.rs +++ b/api/test-context/src/test_context.rs @@ -146,6 +146,7 @@ pub fn new_test_context( mempool.ac_client.clone(), node_config.clone(), None, /* table info reader */ + None, ); // Configure the testing depending on which API version we're testing. diff --git a/aptos-node/src/lib.rs b/aptos-node/src/lib.rs index f7ce9e938591be..ee7e71f7d19392 100644 --- a/aptos-node/src/lib.rs +++ b/aptos-node/src/lib.rs @@ -211,6 +211,7 @@ pub struct AptosHandle { _peer_monitoring_service_runtime: Runtime, _state_sync_runtimes: StateSyncRuntimes, _telemetry_runtime: Option, + _indexer_db_tailer_runtime: Option, } /// Start an Aptos node @@ -694,6 +695,7 @@ pub fn setup_environment_and_start_node( indexer_table_info_runtime, indexer_runtime, indexer_grpc_runtime, + db_tailer_runtime, ) = services::bootstrap_api_and_indexer(&node_config, db_rw.clone(), chain_id)?; // Create mempool and get the consensus to mempool sender @@ -835,6 +837,7 @@ pub fn setup_environment_and_start_node( _peer_monitoring_service_runtime: peer_monitoring_service_runtime, _state_sync_runtimes: state_sync_runtimes, _telemetry_runtime: telemetry_runtime, + _indexer_db_tailer_runtime: db_tailer_runtime, }) } diff --git a/aptos-node/src/services.rs b/aptos-node/src/services.rs index cea0de1b849634..e757aa4cd6bfbc 100644 --- a/aptos-node/src/services.rs +++ b/aptos-node/src/services.rs @@ -11,10 +11,14 @@ use aptos_consensus::{ }; use aptos_consensus_notifications::ConsensusNotifier; use aptos_data_client::client::AptosDataClient; -use aptos_db_indexer::table_info_reader::TableInfoReader; +use aptos_db_indexer::{ + db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader, +}; use aptos_event_notifications::{DbBackedOnChainConfig, ReconfigNotificationListener}; use aptos_indexer_grpc_fullnode::runtime::bootstrap as bootstrap_indexer_grpc; -use aptos_indexer_grpc_table_info::runtime::bootstrap as bootstrap_indexer_table_info; +use aptos_indexer_grpc_table_info::runtime::{ + bootstrap as bootstrap_indexer_table_info, bootstrap_db_tailer, +}; use aptos_logger::{debug, telemetry_log_writer::TelemetryLog, LoggerFilterUpdater}; use aptos_mempool::{network::MempoolSyncMsg, MempoolClientRequest, QuorumStoreRequest}; use aptos_mempool_notifications::MempoolNotificationListener; @@ -51,6 +55,7 @@ pub fn bootstrap_api_and_indexer( Option, Option, Option, + Option, )> { // Create the mempool client and sender let (mempool_client_sender, mempool_client_receiver) = @@ -66,11 +71,24 @@ pub fn bootstrap_api_and_indexer( None => (None, None), }; + let (db_tailer_runtime, txn_event_reader) = + match bootstrap_db_tailer(node_config, db_rw.clone()) { + Some((runtime, db_tailer)) => (Some(runtime), Some(db_tailer)), + None => (None, None), + }; + // Create the API runtime let table_info_reader: Option> = indexer_async_v2.map(|arc| { let trait_object: Arc = arc; trait_object }); + + let txn_event_reader: Option> = + txn_event_reader.map(|arc| { + let trait_object: Arc = arc; + trait_object + }); + let api_runtime = if node_config.api.enabled { Some(bootstrap_api( node_config, @@ -78,6 +96,7 @@ pub fn bootstrap_api_and_indexer( db_rw.reader.clone(), mempool_client_sender.clone(), table_info_reader.clone(), + txn_event_reader.clone(), )?) } else { None @@ -106,6 +125,7 @@ pub fn bootstrap_api_and_indexer( indexer_table_info_runtime, indexer_runtime, indexer_grpc, + db_tailer_runtime, )) } diff --git a/aptos-node/src/storage.rs b/aptos-node/src/storage.rs index b5f6780fbab392..ab196fa0162c9f 100644 --- a/aptos-node/src/storage.rs +++ b/aptos-node/src/storage.rs @@ -73,7 +73,6 @@ pub(crate) fn bootstrap_db( let db_backup_service = start_backup_service(node_config.storage.backup_service_address, fast_sync_db); - (db_arc as Arc, db_rw, Some(db_backup_service)) }, }; diff --git a/config/src/config/index_db_tailer_config.rs b/config/src/config/index_db_tailer_config.rs new file mode 100644 index 00000000000000..5258c36ad02096 --- /dev/null +++ b/config/src/config/index_db_tailer_config.rs @@ -0,0 +1,19 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +pub struct IndexDBTailerConfig { + pub enable: bool, + pub batch_size: usize, +} + +impl Default for IndexDBTailerConfig { + fn default() -> Self { + Self { + enable: false, + batch_size: 10_000, + } + } +} diff --git a/config/src/config/mod.rs b/config/src/config/mod.rs index 2ed9087a7b6d96..27f7bd90f28900 100644 --- a/config/src/config/mod.rs +++ b/config/src/config/mod.rs @@ -15,6 +15,7 @@ mod error; mod execution_config; mod gas_estimation_config; mod identity_config; +pub mod index_db_tailer_config; mod indexer_config; mod indexer_grpc_config; mod indexer_table_info_config; diff --git a/config/src/config/node_config.rs b/config/src/config/node_config.rs index 008343fb538963..162a3dfd8a3725 100644 --- a/config/src/config/node_config.rs +++ b/config/src/config/node_config.rs @@ -4,13 +4,14 @@ use super::{DagConsensusConfig, IndexerTableInfoConfig}; use crate::{ config::{ - dkg_config::DKGConfig, jwk_consensus_config::JWKConsensusConfig, - netbench_config::NetbenchConfig, node_config_loader::NodeConfigLoader, - node_startup_config::NodeStartupConfig, observer_config::ObserverConfig, - persistable_config::PersistableConfig, utils::RootPath, AdminServiceConfig, ApiConfig, - BaseConfig, ConsensusConfig, Error, ExecutionConfig, IndexerConfig, IndexerGrpcConfig, - InspectionServiceConfig, LoggerConfig, MempoolConfig, NetworkConfig, - PeerMonitoringServiceConfig, SafetyRulesTestConfig, StateSyncConfig, StorageConfig, + dkg_config::DKGConfig, index_db_tailer_config::IndexDBTailerConfig, + jwk_consensus_config::JWKConsensusConfig, netbench_config::NetbenchConfig, + node_config_loader::NodeConfigLoader, node_startup_config::NodeStartupConfig, + observer_config::ObserverConfig, persistable_config::PersistableConfig, utils::RootPath, + AdminServiceConfig, ApiConfig, BaseConfig, ConsensusConfig, Error, ExecutionConfig, + IndexerConfig, IndexerGrpcConfig, InspectionServiceConfig, LoggerConfig, MempoolConfig, + NetworkConfig, PeerMonitoringServiceConfig, SafetyRulesTestConfig, StateSyncConfig, + StorageConfig, }, network_id::NetworkId, }; @@ -83,6 +84,8 @@ pub struct NodeConfig { pub storage: StorageConfig, #[serde(default)] pub validator_network: Option, + #[serde(default)] + pub index_db_tailer: IndexDBTailerConfig, } impl NodeConfig { diff --git a/crates/indexer/src/runtime.rs b/crates/indexer/src/runtime.rs index c3bf5a0516c0d4..e8a825b7c5561c 100644 --- a/crates/indexer/src/runtime.rs +++ b/crates/indexer/src/runtime.rs @@ -96,6 +96,7 @@ pub fn bootstrap( mp_sender, node_config, None, /* table info reader */ + None, )); run_forever(indexer_config, context).await; }); diff --git a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs index 39345992ac8fca..e099377917eb0f 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs @@ -58,6 +58,7 @@ pub fn bootstrap( mp_sender, node_config, table_info_reader, + None, )); let service_context = ServiceContext { context: context.clone(), diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs index 2d311d0902e484..5e45b34f255618 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs @@ -4,3 +4,4 @@ pub mod backup_restore; pub mod runtime; pub mod table_info_service; +pub mod tailer_service; diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs index cb5aa9531e8803..13be3b25c579ea 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs @@ -1,10 +1,10 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::table_info_service::TableInfoService; +use crate::{table_info_service::TableInfoService, tailer_service::TailerService}; use aptos_api::context::Context; use aptos_config::config::NodeConfig; -use aptos_db_indexer::{db_ops::open_db, db_v2::IndexerAsyncV2}; +use aptos_db_indexer::{db_ops::open_db, db_tailer::DBTailer, db_v2::IndexerAsyncV2}; use aptos_mempool::MempoolClientSender; use aptos_storage_interface::DbReaderWriter; use aptos_types::chain_id::ChainId; @@ -13,6 +13,26 @@ use tokio::runtime::Runtime; const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db"; +pub fn bootstrap_db_tailer( + config: &NodeConfig, + db_rw: DbReaderWriter, +) -> Option<(Runtime, Arc)> { + if !config.indexer_table_info.enabled { + return None; + } + let runtime = aptos_runtimes::spawn_named_runtime("index-db-tailer".to_string(), None); + // Set up db config and open up the db initially to read metadata + let node_config = config.clone(); + let mut tailer_service = TailerService::new(db_rw.reader, &node_config); + let db_tailer = tailer_service.get_db_tailer(); + // Spawn the runtime for db tailer + runtime.spawn(async move { + tailer_service.run().await; + }); + + Some((runtime, db_tailer)) +} + /// Creates a runtime which creates a thread pool which sets up fullnode indexer table info service /// Returns corresponding Tokio runtime pub fn bootstrap( @@ -50,6 +70,7 @@ pub fn bootstrap( mp_sender, node_config.clone(), None, + None, )); let mut parser = TableInfoService::new( diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs index 95dbda43e74716..0e768cf4fa96ee 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs @@ -54,7 +54,7 @@ impl TableInfoService { /// 6. retry all the txns in the loop sequentially to clean up the pending on items pub async fn run(&mut self) { loop { - let start_time = std::time::Instant::now(); + let start_time: std::time::Instant = std::time::Instant::now(); let ledger_version = self.get_highest_known_version().await.unwrap_or_default(); let batches = self.get_batches(ledger_version).await; let results = self diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/tailer_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/tailer_service.rs new file mode 100644 index 00000000000000..af048722e9f12b --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/tailer_service.rs @@ -0,0 +1,64 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_config::config::NodeConfig; +use aptos_db_indexer::{db_ops::open_db, db_tailer::DBTailer}; +use aptos_indexer_grpc_utils::counters::{log_grpc_step, IndexerGrpcStep}; +use aptos_storage_interface::DbReader; +use std::sync::Arc; + +const SERVICE_TYPE: &str = "db_tailer_service"; +const INDEX_ASYNC_DB_TAILER: &str = "index_async_db_tailer"; + +pub struct TailerService { + pub db_tailer: Arc, +} + +impl TailerService { + pub fn new(db_reader: Arc, node_config: &NodeConfig) -> Self { + let db_path = node_config + .storage + .get_dir_paths() + .default_root_path() + .join(INDEX_ASYNC_DB_TAILER); + let rocksdb_config = node_config.storage.rocksdb_configs.index_db_config; + let db = Arc::new( + open_db(db_path, &rocksdb_config) + .expect("Failed to open up indexer db tailer initially"), + ); + + let indexer_db_tailer = + Arc::new(DBTailer::new(db, db_reader, &node_config.index_db_tailer)); + Self { + db_tailer: indexer_db_tailer, + } + } + + pub fn get_db_tailer(&self) -> Arc { + Arc::clone(&self.db_tailer) + } + + pub async fn run(&mut self) { + let mut start_version = self.db_tailer.get_last_version(); + loop { + let start_time: std::time::Instant = std::time::Instant::now(); + let cur_version = self + .db_tailer + .process_a_batch(Some(start_version)) + .expect("Failed to run indexer db tailer"); + start_version = cur_version; + log_grpc_step( + SERVICE_TYPE, + IndexerGrpcStep::DBTailerProcessed, + None, + None, + None, + None, + Some(start_time.elapsed().as_secs_f64()), + None, + Some(cur_version as i64), + None, + ); + } + } +} diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs index 434bdb5cf613c2..eb9c11f7116948 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs @@ -58,6 +58,8 @@ pub enum IndexerGrpcStep { TableInfoProcessedBatch, // [Indexer Table Info] Processed transactions from fullnode TableInfoProcessed, + // [Indexer DB Tailer] Tailed AptosDB and write to indexer DB + DBTailerProcessed, } impl IndexerGrpcStep { @@ -91,6 +93,8 @@ impl IndexerGrpcStep { // Table info service steps IndexerGrpcStep::TableInfoProcessedBatch => "1", IndexerGrpcStep::TableInfoProcessed => "2", + // DB tailer service steps + IndexerGrpcStep::DBTailerProcessed => "1", } } @@ -136,6 +140,10 @@ impl IndexerGrpcStep { IndexerGrpcStep::TableInfoProcessed => { "[Indexer Table Info] Processed successfully" } + // DB tailer service steps + IndexerGrpcStep::DBTailerProcessed => { + "[Indexer DB Tailer] Tailed AptosDB and write to indexer DB" + } } } } diff --git a/storage/aptosdb/src/db/include/aptosdb_reader.rs b/storage/aptosdb/src/db/include/aptosdb_reader.rs index 3886bf05f256cf..ab3b7f6083d015 100644 --- a/storage/aptosdb/src/db/include/aptosdb_reader.rs +++ b/storage/aptosdb/src/db/include/aptosdb_reader.rs @@ -284,6 +284,7 @@ impl DbReader for AptosDB { }) } + /// TODO(bowu): Deprecate after internal index migration fn get_events( &self, event_key: &EventKey, @@ -807,6 +808,86 @@ impl DbReader for AptosDB { self.state_store.get_usage(version) }) } + + fn get_db_backup_iter( + &self, + start_version: Version, + num_transactions: usize, + ) -> Result)>> + '_,>> { + gauged_api("get_db_backup_iter", || { + self.error_if_ledger_pruned("Transaction", start_version)?; + + let txn_iter = self + .ledger_db + .transaction_db() + .get_transaction_iter(start_version, num_transactions)?; + let mut event_vec_iter = self + .ledger_db + .event_db() + .get_events_by_version_iter(start_version, num_transactions)?; + let zipped = txn_iter.enumerate().map(move |(idx, txn_res)| { + let version = start_version + idx as u64; // overflow is impossible since it's check upon txn_iter construction. + + let txn = txn_res?; + let event_vec = event_vec_iter.next().ok_or_else(|| { + AptosDbError::NotFound(format!( + "Events not found when Transaction exists., version {}", + version + )) + })??; + Ok((txn, event_vec)) + }); + Ok(Box::new(zipped) as Box)>> + '_,> ) + + }) + } + + /// Returns the transaction with proof for a given version, or error if the transaction is not + /// found. + fn get_transaction_with_proof( + &self, + version: Version, + ledger_version: Version, + fetch_events: bool, + ) -> Result { + self.error_if_ledger_pruned("Transaction", version)?; + + let proof = self + .ledger_db + .transaction_info_db() + .get_transaction_info_with_proof( + version, + ledger_version, + self.ledger_db.transaction_accumulator_db(), + )?; + let transaction = self.ledger_db.transaction_db().get_transaction(version)?; + + // If events were requested, also fetch those. + let events = if fetch_events { + Some(self.ledger_db.event_db().get_events_by_version(version)?) + } else { + None + }; + + Ok(TransactionWithProof { + version, + transaction, + events, + proof, + }) + } + + fn get_event_by_version_and_index( + &self, + version: Version, + index: u64, + ) -> Result { + gauged_api("get_event_by_version_and_index", || { + self.error_if_ledger_pruned("Event", version)?; + self.event_store.get_event_by_version_and_index(version, index) + }) + + } } impl AptosDB { @@ -876,41 +957,7 @@ impl AptosDB { Ok((lis, more)) } - /// Returns the transaction with proof for a given version, or error if the transaction is not - /// found. - fn get_transaction_with_proof( - &self, - version: Version, - ledger_version: Version, - fetch_events: bool, - ) -> Result { - self.error_if_ledger_pruned("Transaction", version)?; - - let proof = self - .ledger_db - .transaction_info_db() - .get_transaction_info_with_proof( - version, - ledger_version, - self.ledger_db.transaction_accumulator_db(), - )?; - let transaction = self.ledger_db.transaction_db().get_transaction(version)?; - - // If events were requested, also fetch those. - let events = if fetch_events { - Some(self.ledger_db.event_db().get_events_by_version(version)?) - } else { - None - }; - - Ok(TransactionWithProof { - version, - transaction, - events, - proof, - }) - } - + /// TODO(bowu): Deprecate after internal index migration fn get_events_by_event_key( &self, event_key: &EventKey, diff --git a/storage/indexer/Cargo.toml b/storage/indexer/Cargo.toml index f5908422e70b5d..3e86a7ac2f5204 100644 --- a/storage/indexer/Cargo.toml +++ b/storage/indexer/Cargo.toml @@ -23,6 +23,7 @@ aptos-types = { workspace = true } aptos-vm = { workspace = true } bcs = { workspace = true } bytes = { workspace = true } +byteorder = { workspace = true } dashmap = { workspace = true } move-core-types = { workspace = true } move-resource-viewer = { workspace = true } diff --git a/storage/indexer/src/db_tailer.rs b/storage/indexer/src/db_tailer.rs new file mode 100644 index 00000000000000..2d642bd8aa753f --- /dev/null +++ b/storage/indexer/src/db_tailer.rs @@ -0,0 +1,289 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + db_tailer_reader::IndexerTransactionEventReader, + schema::{ + event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema, + indexer_metadata::TailerMetadataSchema, transaction_by_account::TransactionByAccountSchema, + }, + utils::{ + error_if_too_many_requested, get_first_seq_num_and_limit, AccountTransactionVersionIter, + MAX_REQUEST_LIMIT, + }, +}; +use aptos_config::config::index_db_tailer_config::IndexDBTailerConfig; +use aptos_schemadb::{ReadOptions, SchemaBatch, DB}; +use aptos_storage_interface::{ + db_ensure as ensure, db_other_bail as bail, AptosDbError, DbReader, Order, Result, +}; +use aptos_types::{ + account_address::AccountAddress, + contract_event::{ContractEvent, EventWithVersion}, + event::EventKey, + transaction::{AccountTransactionsWithProof, Version}, +}; +use std::sync::Arc; + +pub struct DBTailer { + pub last_version: Version, + pub db: Arc, + pub main_db_reader: Arc, + batch_size: usize, +} + +impl DBTailer { + pub fn new(db: Arc, db_reader: Arc, config: &IndexDBTailerConfig) -> Self { + let last_version = Self::initialize(db.clone()); + Self { + last_version, + db, + main_db_reader: db_reader, + batch_size: config.batch_size, + } + } + + fn initialize(db: Arc) -> Version { + // read the latest key from the db + let mut rev_iter_res = db + .rev_iter::(Default::default()) + .expect("Cannot create db tailer metadata iterator"); + rev_iter_res + .next() + .map(|res| res.map_or(0, |(version, _)| version)) + .unwrap_or_default() + } + + pub fn process_a_batch(&self, start_version: Option) -> Result { + let db_iter = self + .main_db_reader + .get_db_backup_iter(start_version.unwrap_or(self.last_version), self.batch_size) + .expect("Cannot create db tailer iterator"); + let batch = SchemaBatch::new(); + let metadata_batch = SchemaBatch::new(); + let mut version = self.last_version; + db_iter.for_each(|res| { + res.map(|(txn, events)| { + if let Some(txn) = txn.try_as_signed_user_txn() { + batch + .put::( + &(txn.sender(), txn.sequence_number()), + &version, + ) + .expect("Failed to put txn to db tailer batch"); + + events.iter().enumerate().for_each(|(idx, event)| { + if let ContractEvent::V1(v1) = event { + batch + .put::( + &(*v1.key(), v1.sequence_number()), + &(version, idx as u64), + ) + .expect("Failed to event by key to db tailer batch"); + batch + .put::( + &(*v1.key(), version, v1.sequence_number()), + &(idx as u64), + ) + .expect("Failed to event by version to db tailer batch"); + } + }); + } + version += 1; + }) + .expect("Failed to iterate db tailer iterator"); + }); + // write to index db + self.db.write_schemas(batch)?; + // update the metadata + metadata_batch.put::(&version, &())?; + self.db.write_schemas(metadata_batch)?; + Ok(version) + } + + pub fn get_last_version(&self) -> Version { + self.last_version + } + + pub fn get_account_transaction_version_iter( + &self, + address: AccountAddress, + min_seq_num: u64, + num_versions: u64, + ledger_version: Version, + ) -> Result { + let mut iter = self + .db + .iter::(ReadOptions::default())?; + iter.seek(&(address, min_seq_num))?; + Ok(AccountTransactionVersionIter::new( + iter, + address, + min_seq_num + .checked_add(num_versions) + .ok_or(AptosDbError::TooManyRequested(min_seq_num, num_versions))?, + ledger_version, + )) + } + + pub fn get_latest_sequence_number( + &self, + ledger_version: Version, + event_key: &EventKey, + ) -> Result> { + let mut iter = self + .db + .iter::(ReadOptions::default())?; + iter.seek_for_prev(&(*event_key, ledger_version, u64::max_value()))?; + + Ok(iter.next().transpose()?.and_then( + |((key, _version, seq), _idx)| if &key == event_key { Some(seq) } else { None }, + )) + } + + /// Given `event_key` and `start_seq_num`, returns events identified by transaction version and + /// index among all events emitted by the same transaction. Result won't contain records with a + /// transaction version > `ledger_version` and is in ascending order. + pub fn lookup_events_by_key( + &self, + event_key: &EventKey, + start_seq_num: u64, + limit: u64, + ledger_version: u64, + ) -> Result< + Vec<( + u64, // sequence number + Version, // transaction version it belongs to + u64, // index among events for the same transaction + )>, + > { + let mut iter = self.db.iter::(ReadOptions::default())?; + iter.seek(&(*event_key, start_seq_num))?; + + let mut result = Vec::new(); + let mut cur_seq = start_seq_num; + for res in iter.take(limit as usize) { + let ((path, seq), (ver, idx)) = res?; + if path != *event_key || ver > ledger_version { + break; + } + if seq != cur_seq { + let msg = if cur_seq == start_seq_num { + "First requested event is probably pruned." + } else { + "DB corruption: Sequence number not continuous." + }; + bail!("{} expected: {}, actual: {}", msg, cur_seq, seq); + } + result.push((seq, ver, idx)); + cur_seq += 1; + } + + Ok(result) + } +} + +impl IndexerTransactionEventReader for DBTailer { + fn get_events( + &self, + event_key: &EventKey, + start: u64, + order: Order, + limit: u64, + ledger_version: Version, + ) -> Result> { + self.get_events_by_event_key(event_key, start, order, limit, ledger_version) + } + + fn get_events_by_event_key( + &self, + event_key: &EventKey, + start_seq_num: u64, + order: Order, + limit: u64, + ledger_version: Version, + ) -> Result> { + error_if_too_many_requested(limit, MAX_REQUEST_LIMIT)?; + let get_latest = order == Order::Descending && start_seq_num == u64::max_value(); + + let cursor = if get_latest { + // Caller wants the latest, figure out the latest seq_num. + // In the case of no events on that path, use 0 and expect empty result below. + self.get_latest_sequence_number(ledger_version, event_key)? + .unwrap_or(0) + } else { + start_seq_num + }; + + // Convert requested range and order to a range in ascending order. + let (first_seq, real_limit) = get_first_seq_num_and_limit(order, cursor, limit)?; + + // Query the index. + let mut event_indices = + self.lookup_events_by_key(event_key, first_seq, real_limit, ledger_version)?; + + // When descending, it's possible that user is asking for something beyond the latest + // sequence number, in which case we will consider it a bad request and return an empty + // list. + // For example, if the latest sequence number is 100, and the caller is asking for 110 to + // 90, we will get 90 to 100 from the index lookup above. Seeing that the last item + // is 100 instead of 110 tells us 110 is out of bound. + if order == Order::Descending { + if let Some((seq_num, _, _)) = event_indices.last() { + if *seq_num < cursor { + event_indices = Vec::new(); + } + } + } + + let mut events_with_version = event_indices + .into_iter() + .map(|(seq, ver, idx)| { + let event = self + .main_db_reader + .get_event_by_version_and_index(ver, idx)?; + let v0 = match &event { + ContractEvent::V1(event) => event, + ContractEvent::V2(_) => bail!("Unexpected module event"), + }; + ensure!( + seq == v0.sequence_number(), + "Index broken, expected seq:{}, actual:{}", + seq, + v0.sequence_number() + ); + Ok(EventWithVersion::new(ver, event)) + }) + .collect::>>()?; + if order == Order::Descending { + events_with_version.reverse(); + } + + Ok(events_with_version) + } + + fn get_account_transactions( + &self, + address: AccountAddress, + start_seq_num: u64, + limit: u64, + include_events: bool, + ledger_version: Version, + ) -> Result { + error_if_too_many_requested(limit, MAX_REQUEST_LIMIT)?; + + let txns_with_proofs = self + .get_account_transaction_version_iter(address, start_seq_num, limit, ledger_version)? + .map(|result| { + let (_seq_num, txn_version) = result?; + self.main_db_reader.get_transaction_with_proof( + txn_version, + ledger_version, + include_events, + ) + }) + .collect::>>()?; + + Ok(AccountTransactionsWithProof::new(txns_with_proofs)) + } +} diff --git a/storage/indexer/src/db_tailer_reader.rs b/storage/indexer/src/db_tailer_reader.rs new file mode 100644 index 00000000000000..048dd71325ece9 --- /dev/null +++ b/storage/indexer/src/db_tailer_reader.rs @@ -0,0 +1,39 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_storage_interface::{Order, Result}; +use aptos_types::{ + account_address::AccountAddress, + contract_event::EventWithVersion, + event::EventKey, + transaction::{AccountTransactionsWithProof, Version}, +}; + +pub trait IndexerTransactionEventReader: Send + Sync { + fn get_events( + &self, + event_key: &EventKey, + start: u64, + order: Order, + limit: u64, + ledger_version: Version, + ) -> Result>; + + fn get_events_by_event_key( + &self, + event_key: &EventKey, + start_seq_num: u64, + order: Order, + limit: u64, + ledger_version: Version, + ) -> Result>; + + fn get_account_transactions( + &self, + address: AccountAddress, + start_seq_num: u64, + limit: u64, + include_events: bool, + ledger_version: Version, + ) -> Result; +} diff --git a/storage/indexer/src/lib.rs b/storage/indexer/src/lib.rs index 4d90f4f24f973f..85d20ec524e8f7 100644 --- a/storage/indexer/src/lib.rs +++ b/storage/indexer/src/lib.rs @@ -4,10 +4,13 @@ /// TODO(jill): deprecate Indexer once Indexer Async V2 is ready mod db; pub mod db_ops; +pub mod db_tailer; +pub mod db_tailer_reader; pub mod db_v2; mod metadata; mod schema; pub mod table_info_reader; +mod utils; use crate::{ db::INDEX_DB_NAME, diff --git a/storage/indexer/src/schema/event_by_key/mod.rs b/storage/indexer/src/schema/event_by_key/mod.rs new file mode 100644 index 00000000000000..12ecc845d8c68e --- /dev/null +++ b/storage/indexer/src/schema/event_by_key/mod.rs @@ -0,0 +1,76 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! This module defines physical storage schema for an event index via which a ContractEvent ( +//! represented by a tuple so that it can be fetched from `EventSchema`) +//! can be found by tuple. +//! +//! ```text +//! |<---------key------->|<----value---->| +//! | event_key | seq_num | txn_ver | idx | +//! ``` + +use crate::{schema::EVENT_BY_KEY_CF_NAME, utils::ensure_slice_len_eq}; +use anyhow::Result; +use aptos_schemadb::{ + define_schema, + schema::{KeyCodec, ValueCodec}, +}; +use aptos_types::{event::EventKey, transaction::Version}; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use std::mem::size_of; + +define_schema!(EventByKeySchema, Key, Value, EVENT_BY_KEY_CF_NAME); + +type SeqNum = u64; +type Key = (EventKey, SeqNum); + +type Index = u64; +type Value = (Version, Index); + +impl KeyCodec for Key { + fn encode_key(&self) -> Result> { + let (ref event_key, seq_num) = *self; + + let mut encoded = event_key.to_bytes(); + encoded.write_u64::(seq_num)?; + + Ok(encoded) + } + + fn decode_key(data: &[u8]) -> Result { + ensure_slice_len_eq(data, size_of::())?; + + const EVENT_KEY_LEN: usize = size_of::(); + let event_key = bcs::from_bytes(&data[..EVENT_KEY_LEN])?; + let seq_num = (&data[EVENT_KEY_LEN..]).read_u64::()?; + + Ok((event_key, seq_num)) + } +} + +impl ValueCodec for Value { + fn encode_value(&self) -> Result> { + let (version, index) = *self; + + let mut encoded = Vec::with_capacity(size_of::() + size_of::()); + encoded.write_u64::(version)?; + encoded.write_u64::(index)?; + + Ok(encoded) + } + + fn decode_value(data: &[u8]) -> Result { + ensure_slice_len_eq(data, size_of::())?; + + const VERSION_SIZE: usize = size_of::(); + let version = (&data[..VERSION_SIZE]).read_u64::()?; + let index = (&data[VERSION_SIZE..]).read_u64::()?; + + Ok((version, index)) + } +} + +#[cfg(test)] +mod test; diff --git a/storage/indexer/src/schema/event_by_key/test.rs b/storage/indexer/src/schema/event_by_key/test.rs new file mode 100644 index 00000000000000..9e345fb26fefcc --- /dev/null +++ b/storage/indexer/src/schema/event_by_key/test.rs @@ -0,0 +1,21 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; +use aptos_schemadb::{schema::fuzzing::assert_encode_decode, test_no_panic_decoding}; +use proptest::prelude::*; + +proptest! { + #[test] + fn test_encode_decode( + event_key in any::(), + seq_num in any::(), + version in any::(), + index in any::(), + ) { + assert_encode_decode::(&(event_key, seq_num), &(version, index)); + } +} + +test_no_panic_decoding!(EventByKeySchema); diff --git a/storage/indexer/src/schema/event_by_version/mod.rs b/storage/indexer/src/schema/event_by_version/mod.rs new file mode 100644 index 00000000000000..aa3824d9f04c93 --- /dev/null +++ b/storage/indexer/src/schema/event_by_version/mod.rs @@ -0,0 +1,69 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! This module defines physical storage schema for an event index via which a ContractEvent ( +//! represented by a tuple so that it can be fetched from `EventSchema`) +//! can be found by tuple. +//! +//! ```text +//! |<--------------key------------>|<-value->| +//! | event_key | txn_ver | seq_num | idx | +//! ``` + +use crate::{schema::EVENT_BY_VERSION_CF_NAME, utils::ensure_slice_len_eq}; +use anyhow::Result; +use aptos_schemadb::{ + define_schema, + schema::{KeyCodec, ValueCodec}, +}; +use aptos_types::{event::EventKey, transaction::Version}; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use std::mem::size_of; + +define_schema!(EventByVersionSchema, Key, Value, EVENT_BY_VERSION_CF_NAME); + +type SeqNum = u64; +type Key = (EventKey, Version, SeqNum); + +type Index = u64; +type Value = Index; + +impl KeyCodec for Key { + fn encode_key(&self) -> Result> { + let (ref event_key, version, seq_num) = *self; + + let mut encoded = event_key.to_bytes(); + encoded.write_u64::(version)?; + encoded.write_u64::(seq_num)?; + + Ok(encoded) + } + + fn decode_key(data: &[u8]) -> Result { + ensure_slice_len_eq(data, size_of::())?; + + const EVENT_KEY_LEN: usize = size_of::(); + const EVENT_KEY_AND_VER_LEN: usize = size_of::<(EventKey, Version)>(); + let event_key = bcs::from_bytes(&data[..EVENT_KEY_LEN])?; + let version = (&data[EVENT_KEY_LEN..]).read_u64::()?; + let seq_num = (&data[EVENT_KEY_AND_VER_LEN..]).read_u64::()?; + + Ok((event_key, version, seq_num)) + } +} + +impl ValueCodec for Value { + fn encode_value(&self) -> Result> { + Ok(self.to_be_bytes().to_vec()) + } + + fn decode_value(mut data: &[u8]) -> Result { + ensure_slice_len_eq(data, size_of::())?; + + Ok(data.read_u64::()?) + } +} + +#[cfg(test)] +mod test; diff --git a/storage/indexer/src/schema/event_by_version/test.rs b/storage/indexer/src/schema/event_by_version/test.rs new file mode 100644 index 00000000000000..a6d78bd7baf88c --- /dev/null +++ b/storage/indexer/src/schema/event_by_version/test.rs @@ -0,0 +1,21 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; +use aptos_schemadb::{schema::fuzzing::assert_encode_decode, test_no_panic_decoding}; +use proptest::prelude::*; + +proptest! { + #[test] + fn test_encode_decode( + event_key in any::(), + seq_num in any::(), + version in any::(), + index in any::(), + ) { + assert_encode_decode::(&(event_key, version, seq_num), &index); + } +} + +test_no_panic_decoding!(EventByVersionSchema); diff --git a/storage/indexer/src/schema/indexer_metadata/mod.rs b/storage/indexer/src/schema/indexer_metadata/mod.rs index f8615ba5b8ee92..652fcde44469b7 100644 --- a/storage/indexer/src/schema/indexer_metadata/mod.rs +++ b/storage/indexer/src/schema/indexer_metadata/mod.rs @@ -4,15 +4,18 @@ //! This module defines physical storage schema storing metadata for the internal indexer //! +use super::TAILER_METADATA_CF_NAME; use crate::{ metadata::{MetadataKey, MetadataValue}, schema::INDEXER_METADATA_CF_NAME, + utils::ensure_slice_len_eq, }; use anyhow::Result; use aptos_schemadb::{ define_schema, schema::{KeyCodec, ValueCodec}, }; +use aptos_types::transaction::Version; define_schema!( IndexerMetadataSchema, @@ -41,5 +44,28 @@ impl ValueCodec for MetadataValue { } } +define_schema!(TailerMetadataSchema, Version, (), TAILER_METADATA_CF_NAME); + +impl KeyCodec for Version { + fn encode_key(&self) -> Result> { + Ok(bcs::to_bytes(self)?) + } + + fn decode_key(data: &[u8]) -> Result { + Ok(bcs::from_bytes(data)?) + } +} + +impl ValueCodec for () { + fn encode_value(&self) -> Result> { + Ok(Vec::new()) + } + + fn decode_value(data: &[u8]) -> Result { + ensure_slice_len_eq(data, 0)?; + Ok(()) + } +} + #[cfg(test)] mod test; diff --git a/storage/indexer/src/schema/mod.rs b/storage/indexer/src/schema/mod.rs index 90ba38374788d9..4f7dea73c74c66 100644 --- a/storage/indexer/src/schema/mod.rs +++ b/storage/indexer/src/schema/mod.rs @@ -6,19 +6,28 @@ //! //! All schemas are `pub(crate)` so not shown in rustdoc, refer to the source code to see details. +pub(crate) mod event_by_key; +pub(crate) mod event_by_version; pub(crate) mod indexer_metadata; pub(crate) mod table_info; - +pub(crate) mod transaction_by_account; use aptos_schemadb::ColumnFamilyName; pub const DEFAULT_COLUMN_FAMILY_NAME: ColumnFamilyName = "default"; pub const INDEXER_METADATA_CF_NAME: ColumnFamilyName = "indexer_metadata"; +pub const TAILER_METADATA_CF_NAME: ColumnFamilyName = "event_metadata"; pub const TABLE_INFO_CF_NAME: ColumnFamilyName = "table_info"; +pub const EVENT_BY_KEY_CF_NAME: ColumnFamilyName = "event_by_key"; +pub const EVENT_BY_VERSION_CF_NAME: ColumnFamilyName = "event_by_version"; +pub const TRANSACTION_BY_ACCOUNT_CF_NAME: ColumnFamilyName = "transaction_by_account"; pub fn column_families() -> Vec { vec![ /* empty cf */ DEFAULT_COLUMN_FAMILY_NAME, INDEXER_METADATA_CF_NAME, TABLE_INFO_CF_NAME, + EVENT_BY_KEY_CF_NAME, + EVENT_BY_VERSION_CF_NAME, + TRANSACTION_BY_ACCOUNT_CF_NAME, ] } diff --git a/storage/indexer/src/schema/transaction_by_account/mod.rs b/storage/indexer/src/schema/transaction_by_account/mod.rs new file mode 100644 index 00000000000000..c4e1d50e553a8f --- /dev/null +++ b/storage/indexer/src/schema/transaction_by_account/mod.rs @@ -0,0 +1,67 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! This module defines physical storage schema for a transaction index via which the version of a +//! transaction sent by `account_address` with `sequence_number` can be found. With the version one +//! can resort to `TransactionSchema` for the transaction content. +//! +//! ```text +//! |<-------key------->|<-value->| +//! | address | seq_num | txn_ver | +//! ``` + +use crate::{schema::TRANSACTION_BY_ACCOUNT_CF_NAME, utils::ensure_slice_len_eq}; +use anyhow::Result; +use aptos_schemadb::{ + define_schema, + schema::{KeyCodec, ValueCodec}, +}; +use aptos_types::{account_address::AccountAddress, transaction::Version}; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use std::{convert::TryFrom, mem::size_of}; + +define_schema!( + TransactionByAccountSchema, + Key, + Version, + TRANSACTION_BY_ACCOUNT_CF_NAME +); + +type SeqNum = u64; +type Key = (AccountAddress, SeqNum); + +impl KeyCodec for Key { + fn encode_key(&self) -> Result> { + let (ref account_address, seq_num) = *self; + + let mut encoded = account_address.to_vec(); + encoded.write_u64::(seq_num)?; + + Ok(encoded) + } + + fn decode_key(data: &[u8]) -> Result { + ensure_slice_len_eq(data, size_of::())?; + + let address = AccountAddress::try_from(&data[..AccountAddress::LENGTH])?; + let seq_num = (&data[AccountAddress::LENGTH..]).read_u64::()?; + + Ok((address, seq_num)) + } +} + +impl ValueCodec for Version { + fn encode_value(&self) -> Result> { + Ok(self.to_be_bytes().to_vec()) + } + + fn decode_value(mut data: &[u8]) -> Result { + ensure_slice_len_eq(data, size_of::())?; + + Ok(data.read_u64::()?) + } +} + +#[cfg(test)] +mod test; diff --git a/storage/indexer/src/schema/transaction_by_account/test.rs b/storage/indexer/src/schema/transaction_by_account/test.rs new file mode 100644 index 00000000000000..dc04cff3b64533 --- /dev/null +++ b/storage/indexer/src/schema/transaction_by_account/test.rs @@ -0,0 +1,20 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; +use aptos_schemadb::{schema::fuzzing::assert_encode_decode, test_no_panic_decoding}; +use proptest::prelude::*; + +proptest! { + #[test] + fn test_encode_decode( + address in any::(), + seq_num in any::(), + version in any::(), + ) { + assert_encode_decode::(&(address, seq_num), &version); + } +} + +test_no_panic_decoding!(TransactionByAccountSchema); diff --git a/storage/indexer/src/utils.rs b/storage/indexer/src/utils.rs new file mode 100644 index 00000000000000..d16c615f7e0a2d --- /dev/null +++ b/storage/indexer/src/utils.rs @@ -0,0 +1,124 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::schema::transaction_by_account::TransactionByAccountSchema; +use aptos_schemadb::iterator::SchemaIterator; +use aptos_storage_interface::{db_ensure as ensure, AptosDbError, Order, Result}; +use aptos_types::{account_address::AccountAddress, transaction::Version}; + +pub const MAX_REQUEST_LIMIT: u64 = 10_000; + +pub fn ensure_slice_len_eq(data: &[u8], len: usize) -> Result<()> { + ensure!( + data.len() == len, + "Unexpected data len {}, expected {}.", + data.len(), + len, + ); + Ok(()) +} + +pub fn error_if_too_many_requested(num_requested: u64, max_allowed: u64) -> Result<()> { + if num_requested > max_allowed { + Err(AptosDbError::TooManyRequested(num_requested, max_allowed)) + } else { + Ok(()) + } +} + +// Convert requested range and order to a range in ascending order. +pub fn get_first_seq_num_and_limit(order: Order, cursor: u64, limit: u64) -> Result<(u64, u64)> { + ensure!(limit > 0, "limit should > 0, got {}", limit); + + Ok(if order == Order::Ascending { + (cursor, limit) + } else if limit <= cursor { + (cursor - limit + 1, limit) + } else { + (0, cursor + 1) + }) +} + +// This is a replicate of the AccountTransactionVersionIter from storage/aptosdb crate. +pub struct AccountTransactionVersionIter<'a> { + inner: SchemaIterator<'a, TransactionByAccountSchema>, + address: AccountAddress, + expected_next_seq_num: Option, + end_seq_num: u64, + prev_version: Option, + ledger_version: Version, +} + +impl<'a> AccountTransactionVersionIter<'a> { + pub(crate) fn new( + inner: SchemaIterator<'a, TransactionByAccountSchema>, + address: AccountAddress, + end_seq_num: u64, + ledger_version: Version, + ) -> Self { + Self { + inner, + address, + end_seq_num, + ledger_version, + expected_next_seq_num: None, + prev_version: None, + } + } +} + +impl<'a> AccountTransactionVersionIter<'a> { + fn next_impl(&mut self) -> Result> { + Ok(match self.inner.next().transpose()? { + Some(((address, seq_num), version)) => { + // No more transactions sent by this account. + if address != self.address { + return Ok(None); + } + if seq_num >= self.end_seq_num { + return Ok(None); + } + + // Ensure seq_num_{i+1} == seq_num_{i} + 1 + if let Some(expected_seq_num) = self.expected_next_seq_num { + ensure!( + seq_num == expected_seq_num, + "DB corruption: account transactions sequence numbers are not contiguous: \ + actual: {}, expected: {}", + seq_num, + expected_seq_num, + ); + }; + + // Ensure version_{i+1} > version_{i} + if let Some(prev_version) = self.prev_version { + ensure!( + prev_version < version, + "DB corruption: account transaction versions are not strictly increasing: \ + previous version: {}, current version: {}", + prev_version, + version, + ); + } + + // No more transactions (in this view of the ledger). + if version > self.ledger_version { + return Ok(None); + } + + self.expected_next_seq_num = Some(seq_num + 1); + self.prev_version = Some(version); + Some((seq_num, version)) + }, + None => None, + }) + } +} + +impl<'a> Iterator for AccountTransactionVersionIter<'a> { + type Item = Result<(u64, Version)>; + + fn next(&mut self) -> Option { + self.next_impl().transpose() + } +} diff --git a/storage/storage-interface/src/lib.rs b/storage/storage-interface/src/lib.rs index ae8e43256106d4..c78d48ada9d6de 100644 --- a/storage/storage-interface/src/lib.rs +++ b/storage/storage-interface/src/lib.rs @@ -453,6 +453,25 @@ pub trait DbReader: Send + Sync { /// Returns state storage usage at the end of an epoch. fn get_state_storage_usage(&self, version: Option) -> Result; + + fn get_db_backup_iter( + &self, + start_version: Version, + num_transactions: usize, + ) -> Result)>> + '_>>; + + fn get_transaction_with_proof( + &self, + version: Version, + ledger_version: Version, + fetch_events: bool, + ) -> Result; + + fn get_event_by_version_and_index( + &self, + version: Version, + index: u64, + ) -> Result; ); // end delegated /// Returns the latest ledger info. diff --git a/types/src/state_store/state_key/prefix.rs b/types/src/state_store/state_key/prefix.rs index a29bfd146d3636..731dee462bc62f 100644 --- a/types/src/state_store/state_key/prefix.rs +++ b/types/src/state_store/state_key/prefix.rs @@ -42,6 +42,12 @@ impl From for StateKeyPrefix { } } +impl From> for StateKeyPrefix { + fn from(bytes: Vec) -> Self { + Self::new(StateKeyTag::AccessPath, bytes) + } +} + #[cfg(test)] mod tests { use crate::{