From 7b8c2b953feaa4f5306c80ab6720ebe16cf8b9dd Mon Sep 17 00:00:00 2001 From: Bo Wu Date: Fri, 3 May 2024 09:09:30 -0700 Subject: [PATCH] [api] migrate event and transaction schemas Add test for internal indexer --- Cargo.lock | 6 +- .../src/fake_context.rs | 1 + api/src/context.rs | 7 +- api/src/runtime.rs | 19 +- api/test-context/src/test_context.rs | 1 + api/types/Cargo.toml | 1 - api/types/src/convert.rs | 2 +- aptos-node/src/lib.rs | 3 + aptos-node/src/services.rs | 28 +- aptos-node/src/storage.rs | 1 - config/src/config/index_db_tailer_config.rs | 33 ++ config/src/config/mod.rs | 1 + config/src/config/node_config.rs | 17 +- crates/indexer/src/runtime.rs | 1 + .../indexer-grpc-fullnode/src/runtime.rs | 4 +- .../indexer-grpc-table-info/src/lib.rs | 1 + .../indexer-grpc-table-info/src/runtime.rs | 25 +- .../src/table_info_service.rs | 2 +- .../src/tailer_service.rs | 69 ++++ .../indexer-grpc-utils/src/counters.rs | 8 + execution/executor/Cargo.toml | 2 + .../executor/tests/internal_indexer_test.rs | 161 ++++++++++ .../aptosdb/src/db/include/aptosdb_reader.rs | 82 +++++ storage/indexer/Cargo.toml | 4 + storage/indexer/src/db_ops.rs | 11 +- storage/indexer/src/db_tailer.rs | 302 ++++++++++++++++++ storage/indexer/src/lib.rs | 2 + .../indexer/src/schema/event_by_key/mod.rs | 76 +++++ .../indexer/src/schema/event_by_key/test.rs | 21 ++ .../src/schema/event_by_version/mod.rs | 69 ++++ .../src/schema/event_by_version/test.rs | 21 ++ .../src/schema/indexer_metadata/mod.rs | 26 ++ .../src/schema/indexer_metadata/test.rs | 7 + storage/indexer/src/schema/mod.rs | 22 +- .../src/schema/transaction_by_account/mod.rs | 67 ++++ .../src/schema/transaction_by_account/test.rs | 20 ++ storage/indexer/src/table_info_reader.rs | 12 +- storage/indexer/src/utils.rs | 126 ++++++++ storage/storage-interface/src/lib.rs | 28 ++ types/src/indexer/db_tailer_reader.rs | 45 +++ types/src/indexer/mod.rs | 5 + types/src/indexer/table_info_reader.rs | 12 + types/src/lib.rs | 1 + 43 files changed, 1319 insertions(+), 33 deletions(-) create mode 100644 config/src/config/index_db_tailer_config.rs create mode 100644 ecosystem/indexer-grpc/indexer-grpc-table-info/src/tailer_service.rs create mode 100644 execution/executor/tests/internal_indexer_test.rs create mode 100644 storage/indexer/src/db_tailer.rs create mode 100644 storage/indexer/src/schema/event_by_key/mod.rs create mode 100644 storage/indexer/src/schema/event_by_key/test.rs create mode 100644 storage/indexer/src/schema/event_by_version/mod.rs create mode 100644 storage/indexer/src/schema/event_by_version/test.rs create mode 100644 storage/indexer/src/schema/transaction_by_account/mod.rs create mode 100644 storage/indexer/src/schema/transaction_by_account/test.rs create mode 100644 storage/indexer/src/utils.rs create mode 100644 types/src/indexer/db_tailer_reader.rs create mode 100644 types/src/indexer/mod.rs create mode 100644 types/src/indexer/table_info_reader.rs diff --git a/Cargo.lock b/Cargo.lock index 4241159d3d242..cb30f7d5723c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -510,7 +510,6 @@ dependencies = [ "anyhow", "aptos-config", "aptos-crypto", - "aptos-db-indexer", "aptos-framework", "aptos-logger", "aptos-openapi", @@ -1159,9 +1158,12 @@ dependencies = [ "aptos-resource-viewer", "aptos-rocksdb-options", "aptos-schemadb", + "aptos-sdk", "aptos-storage-interface", "aptos-types", + "aptos-vm-genesis", "bcs 0.1.4", + "byteorder", "bytes", "dashmap", "move-core-types", @@ -1347,6 +1349,7 @@ dependencies = [ "aptos-consensus-types", "aptos-crypto", "aptos-db", + "aptos-db-indexer", "aptos-drop-helper", "aptos-executor-service", "aptos-executor-test-helpers", @@ -1357,6 +1360,7 @@ dependencies = [ "aptos-logger", "aptos-metrics-core", "aptos-scratchpad", + "aptos-sdk", "aptos-storage-interface", "aptos-temppath", "aptos-types", diff --git a/api/openapi-spec-generator/src/fake_context.rs b/api/openapi-spec-generator/src/fake_context.rs index 98dee1782b641..4529ac829b0e2 100644 --- a/api/openapi-spec-generator/src/fake_context.rs +++ b/api/openapi-spec-generator/src/fake_context.rs @@ -17,5 +17,6 @@ pub fn get_fake_context() -> Context { mempool.ac_client, NodeConfig::default(), None, /* table info reader */ + None, ) } diff --git a/api/src/context.rs b/api/src/context.rs index 7535e8faceb76..508ef6b6a721d 100644 --- a/api/src/context.rs +++ b/api/src/context.rs @@ -18,7 +18,6 @@ use aptos_api_types::{ }; use aptos_config::config::{NodeConfig, RoleType}; use aptos_crypto::HashValue; -use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_gas_schedule::{AptosGasParameters, FromOnChainGasSchedule}; use aptos_logger::{error, info, Schema}; use aptos_mempool::{MempoolClientRequest, MempoolClientSender, SubmissionStatus}; @@ -34,6 +33,9 @@ use aptos_types::{ chain_id::ChainId, contract_event::EventWithVersion, event::EventKey, + indexer::{ + db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader, + }, ledger_info::LedgerInfoWithSignatures, on_chain_config::{GasSchedule, GasScheduleV2, OnChainConfig, OnChainExecutionConfig}, state_store::{ @@ -76,6 +78,7 @@ pub struct Context { simulate_txn_stats: Arc, pub table_info_reader: Option>, pub wait_for_hash_active_connections: Arc, + pub txn_event_reader: Option>, } impl std::fmt::Debug for Context { @@ -91,6 +94,7 @@ impl Context { mp_sender: MempoolClientSender, node_config: NodeConfig, table_info_reader: Option>, + txn_event_reader: Option>, ) -> Self { let (view_function_stats, simulate_txn_stats) = { let log_per_call_stats = node_config.api.periodic_function_stats_sec.is_some(); @@ -129,6 +133,7 @@ impl Context { simulate_txn_stats, table_info_reader, wait_for_hash_active_connections: Arc::new(AtomicUsize::new(0)), + txn_event_reader, } } diff --git a/api/src/runtime.rs b/api/src/runtime.rs index 4673c087235a1..3e74927aaf7f8 100644 --- a/api/src/runtime.rs +++ b/api/src/runtime.rs @@ -10,11 +10,15 @@ use crate::{ }; use anyhow::Context as AnyhowContext; use aptos_config::config::{ApiConfig, NodeConfig}; -use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_logger::info; use aptos_mempool::MempoolClientSender; use aptos_storage_interface::DbReader; -use aptos_types::chain_id::ChainId; +use aptos_types::{ + chain_id::ChainId, + indexer::{ + db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader, + }, +}; use poem::{ handler, http::Method, @@ -36,11 +40,19 @@ pub fn bootstrap( db: Arc, mp_sender: MempoolClientSender, table_info_reader: Option>, + txn_event_reader: Option>, ) -> anyhow::Result { let max_runtime_workers = get_max_runtime_workers(&config.api); let runtime = aptos_runtimes::spawn_named_runtime("api".into(), Some(max_runtime_workers)); - let context = Context::new(chain_id, db, mp_sender, config.clone(), table_info_reader); + let context = Context::new( + chain_id, + db, + mp_sender, + config.clone(), + table_info_reader, + txn_event_reader, + ); attach_poem_to_runtime(runtime.handle(), context.clone(), config, false) .context("Failed to attach poem to runtime")?; @@ -342,6 +354,7 @@ mod tests { context.db.clone(), context.mempool.ac_client.clone(), None, + None, ); assert!(ret.is_ok()); diff --git a/api/test-context/src/test_context.rs b/api/test-context/src/test_context.rs index c8786900d73d4..b13c5fe731b4b 100644 --- a/api/test-context/src/test_context.rs +++ b/api/test-context/src/test_context.rs @@ -146,6 +146,7 @@ pub fn new_test_context( mempool.ac_client.clone(), node_config.clone(), None, /* table info reader */ + None, ); // Configure the testing depending on which API version we're testing. diff --git a/api/types/Cargo.toml b/api/types/Cargo.toml index 1ff09c771eed3..4f395c7457dd9 100644 --- a/api/types/Cargo.toml +++ b/api/types/Cargo.toml @@ -16,7 +16,6 @@ rust-version = { workspace = true } anyhow = { workspace = true } aptos-config = { workspace = true } aptos-crypto = { workspace = true } -aptos-db-indexer = { workspace = true } aptos-framework = { workspace = true } aptos-logger = { workspace = true } aptos-openapi = { workspace = true } diff --git a/api/types/src/convert.rs b/api/types/src/convert.rs index 09798fb76a4a5..fc8f895977a75 100644 --- a/api/types/src/convert.rs +++ b/api/types/src/convert.rs @@ -18,7 +18,6 @@ use crate::{ }; use anyhow::{bail, ensure, format_err, Context as AnyhowContext, Result}; use aptos_crypto::{hash::CryptoHash, HashValue}; -use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_logger::{sample, sample::SampleRate}; use aptos_resource_viewer::AptosValueAnnotator; use aptos_storage_interface::DbReader; @@ -26,6 +25,7 @@ use aptos_types::{ access_path::{AccessPath, Path}, chain_id::ChainId, contract_event::{ContractEvent, EventWithVersion}, + indexer::table_info_reader::TableInfoReader, state_store::{ state_key::{inner::StateKeyInner, StateKey}, table::{TableHandle, TableInfo}, diff --git a/aptos-node/src/lib.rs b/aptos-node/src/lib.rs index f7ce9e938591b..ee7e71f7d1939 100644 --- a/aptos-node/src/lib.rs +++ b/aptos-node/src/lib.rs @@ -211,6 +211,7 @@ pub struct AptosHandle { _peer_monitoring_service_runtime: Runtime, _state_sync_runtimes: StateSyncRuntimes, _telemetry_runtime: Option, + _indexer_db_tailer_runtime: Option, } /// Start an Aptos node @@ -694,6 +695,7 @@ pub fn setup_environment_and_start_node( indexer_table_info_runtime, indexer_runtime, indexer_grpc_runtime, + db_tailer_runtime, ) = services::bootstrap_api_and_indexer(&node_config, db_rw.clone(), chain_id)?; // Create mempool and get the consensus to mempool sender @@ -835,6 +837,7 @@ pub fn setup_environment_and_start_node( _peer_monitoring_service_runtime: peer_monitoring_service_runtime, _state_sync_runtimes: state_sync_runtimes, _telemetry_runtime: telemetry_runtime, + _indexer_db_tailer_runtime: db_tailer_runtime, }) } diff --git a/aptos-node/src/services.rs b/aptos-node/src/services.rs index cea0de1b84963..01ea24968451b 100644 --- a/aptos-node/src/services.rs +++ b/aptos-node/src/services.rs @@ -11,10 +11,11 @@ use aptos_consensus::{ }; use aptos_consensus_notifications::ConsensusNotifier; use aptos_data_client::client::AptosDataClient; -use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_event_notifications::{DbBackedOnChainConfig, ReconfigNotificationListener}; use aptos_indexer_grpc_fullnode::runtime::bootstrap as bootstrap_indexer_grpc; -use aptos_indexer_grpc_table_info::runtime::bootstrap as bootstrap_indexer_table_info; +use aptos_indexer_grpc_table_info::runtime::{ + bootstrap as bootstrap_indexer_table_info, bootstrap_db_tailer, +}; use aptos_logger::{debug, telemetry_log_writer::TelemetryLog, LoggerFilterUpdater}; use aptos_mempool::{network::MempoolSyncMsg, MempoolClientRequest, QuorumStoreRequest}; use aptos_mempool_notifications::MempoolNotificationListener; @@ -30,7 +31,12 @@ use aptos_peer_monitoring_service_server::{ use aptos_peer_monitoring_service_types::PeerMonitoringServiceMessage; use aptos_storage_interface::{DbReader, DbReaderWriter}; use aptos_time_service::TimeService; -use aptos_types::chain_id::ChainId; +use aptos_types::{ + chain_id::ChainId, + indexer::{ + db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader, + }, +}; use aptos_validator_transaction_pool::VTxnPoolState; use futures::channel::{mpsc, mpsc::Sender}; use std::{sync::Arc, time::Instant}; @@ -51,6 +57,7 @@ pub fn bootstrap_api_and_indexer( Option, Option, Option, + Option, )> { // Create the mempool client and sender let (mempool_client_sender, mempool_client_receiver) = @@ -66,11 +73,24 @@ pub fn bootstrap_api_and_indexer( None => (None, None), }; + let (db_tailer_runtime, txn_event_reader) = + match bootstrap_db_tailer(node_config, db_rw.clone()) { + Some((runtime, db_tailer)) => (Some(runtime), Some(db_tailer)), + None => (None, None), + }; + // Create the API runtime let table_info_reader: Option> = indexer_async_v2.map(|arc| { let trait_object: Arc = arc; trait_object }); + + let txn_event_reader: Option> = + txn_event_reader.map(|arc| { + let trait_object: Arc = arc; + trait_object + }); + let api_runtime = if node_config.api.enabled { Some(bootstrap_api( node_config, @@ -78,6 +98,7 @@ pub fn bootstrap_api_and_indexer( db_rw.reader.clone(), mempool_client_sender.clone(), table_info_reader.clone(), + txn_event_reader.clone(), )?) } else { None @@ -106,6 +127,7 @@ pub fn bootstrap_api_and_indexer( indexer_table_info_runtime, indexer_runtime, indexer_grpc, + db_tailer_runtime, )) } diff --git a/aptos-node/src/storage.rs b/aptos-node/src/storage.rs index b5f6780fbab39..ab196fa0162c9 100644 --- a/aptos-node/src/storage.rs +++ b/aptos-node/src/storage.rs @@ -73,7 +73,6 @@ pub(crate) fn bootstrap_db( let db_backup_service = start_backup_service(node_config.storage.backup_service_address, fast_sync_db); - (db_arc as Arc, db_rw, Some(db_backup_service)) }, }; diff --git a/config/src/config/index_db_tailer_config.rs b/config/src/config/index_db_tailer_config.rs new file mode 100644 index 0000000000000..c7fd78c2a4057 --- /dev/null +++ b/config/src/config/index_db_tailer_config.rs @@ -0,0 +1,33 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +pub struct IndexDBTailerConfig { + pub enable: bool, + pub batch_size: usize, +} + +impl IndexDBTailerConfig { + pub fn new(enable: bool, batch_size: usize) -> Self { + Self { enable, batch_size } + } + + pub fn enable(&self) -> bool { + self.enable + } + + pub fn batch_size(&self) -> usize { + self.batch_size + } +} + +impl Default for IndexDBTailerConfig { + fn default() -> Self { + Self { + enable: false, + batch_size: 10_000, + } + } +} diff --git a/config/src/config/mod.rs b/config/src/config/mod.rs index 2ed9087a7b6d9..27f7bd90f2890 100644 --- a/config/src/config/mod.rs +++ b/config/src/config/mod.rs @@ -15,6 +15,7 @@ mod error; mod execution_config; mod gas_estimation_config; mod identity_config; +pub mod index_db_tailer_config; mod indexer_config; mod indexer_grpc_config; mod indexer_table_info_config; diff --git a/config/src/config/node_config.rs b/config/src/config/node_config.rs index 008343fb53896..162a3dfd8a372 100644 --- a/config/src/config/node_config.rs +++ b/config/src/config/node_config.rs @@ -4,13 +4,14 @@ use super::{DagConsensusConfig, IndexerTableInfoConfig}; use crate::{ config::{ - dkg_config::DKGConfig, jwk_consensus_config::JWKConsensusConfig, - netbench_config::NetbenchConfig, node_config_loader::NodeConfigLoader, - node_startup_config::NodeStartupConfig, observer_config::ObserverConfig, - persistable_config::PersistableConfig, utils::RootPath, AdminServiceConfig, ApiConfig, - BaseConfig, ConsensusConfig, Error, ExecutionConfig, IndexerConfig, IndexerGrpcConfig, - InspectionServiceConfig, LoggerConfig, MempoolConfig, NetworkConfig, - PeerMonitoringServiceConfig, SafetyRulesTestConfig, StateSyncConfig, StorageConfig, + dkg_config::DKGConfig, index_db_tailer_config::IndexDBTailerConfig, + jwk_consensus_config::JWKConsensusConfig, netbench_config::NetbenchConfig, + node_config_loader::NodeConfigLoader, node_startup_config::NodeStartupConfig, + observer_config::ObserverConfig, persistable_config::PersistableConfig, utils::RootPath, + AdminServiceConfig, ApiConfig, BaseConfig, ConsensusConfig, Error, ExecutionConfig, + IndexerConfig, IndexerGrpcConfig, InspectionServiceConfig, LoggerConfig, MempoolConfig, + NetworkConfig, PeerMonitoringServiceConfig, SafetyRulesTestConfig, StateSyncConfig, + StorageConfig, }, network_id::NetworkId, }; @@ -83,6 +84,8 @@ pub struct NodeConfig { pub storage: StorageConfig, #[serde(default)] pub validator_network: Option, + #[serde(default)] + pub index_db_tailer: IndexDBTailerConfig, } impl NodeConfig { diff --git a/crates/indexer/src/runtime.rs b/crates/indexer/src/runtime.rs index c3bf5a0516c0d..e8a825b7c5561 100644 --- a/crates/indexer/src/runtime.rs +++ b/crates/indexer/src/runtime.rs @@ -96,6 +96,7 @@ pub fn bootstrap( mp_sender, node_config, None, /* table info reader */ + None, )); run_forever(indexer_config, context).await; }); diff --git a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs index 39345992ac8fc..1fbfdf3a765f7 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs @@ -7,7 +7,6 @@ use crate::{ }; use aptos_api::context::Context; use aptos_config::config::NodeConfig; -use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_logger::info; use aptos_mempool::MempoolClientSender; use aptos_protos::{ @@ -19,7 +18,7 @@ use aptos_protos::{ util::timestamp::FILE_DESCRIPTOR_SET as UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET, }; use aptos_storage_interface::DbReader; -use aptos_types::chain_id::ChainId; +use aptos_types::{chain_id::ChainId, indexer::table_info_reader::TableInfoReader}; use std::{net::ToSocketAddrs, sync::Arc}; use tokio::runtime::Runtime; use tonic::{codec::CompressionEncoding, transport::Server}; @@ -58,6 +57,7 @@ pub fn bootstrap( mp_sender, node_config, table_info_reader, + None, )); let service_context = ServiceContext { context: context.clone(), diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs index 2d311d0902e48..5e45b34f25561 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs @@ -4,3 +4,4 @@ pub mod backup_restore; pub mod runtime; pub mod table_info_service; +pub mod tailer_service; diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs index cb5aa9531e880..13be3b25c579e 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs @@ -1,10 +1,10 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::table_info_service::TableInfoService; +use crate::{table_info_service::TableInfoService, tailer_service::TailerService}; use aptos_api::context::Context; use aptos_config::config::NodeConfig; -use aptos_db_indexer::{db_ops::open_db, db_v2::IndexerAsyncV2}; +use aptos_db_indexer::{db_ops::open_db, db_tailer::DBTailer, db_v2::IndexerAsyncV2}; use aptos_mempool::MempoolClientSender; use aptos_storage_interface::DbReaderWriter; use aptos_types::chain_id::ChainId; @@ -13,6 +13,26 @@ use tokio::runtime::Runtime; const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db"; +pub fn bootstrap_db_tailer( + config: &NodeConfig, + db_rw: DbReaderWriter, +) -> Option<(Runtime, Arc)> { + if !config.indexer_table_info.enabled { + return None; + } + let runtime = aptos_runtimes::spawn_named_runtime("index-db-tailer".to_string(), None); + // Set up db config and open up the db initially to read metadata + let node_config = config.clone(); + let mut tailer_service = TailerService::new(db_rw.reader, &node_config); + let db_tailer = tailer_service.get_db_tailer(); + // Spawn the runtime for db tailer + runtime.spawn(async move { + tailer_service.run().await; + }); + + Some((runtime, db_tailer)) +} + /// Creates a runtime which creates a thread pool which sets up fullnode indexer table info service /// Returns corresponding Tokio runtime pub fn bootstrap( @@ -50,6 +70,7 @@ pub fn bootstrap( mp_sender, node_config.clone(), None, + None, )); let mut parser = TableInfoService::new( diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs index 95dbda43e7471..0e768cf4fa96e 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/table_info_service.rs @@ -54,7 +54,7 @@ impl TableInfoService { /// 6. retry all the txns in the loop sequentially to clean up the pending on items pub async fn run(&mut self) { loop { - let start_time = std::time::Instant::now(); + let start_time: std::time::Instant = std::time::Instant::now(); let ledger_version = self.get_highest_known_version().await.unwrap_or_default(); let batches = self.get_batches(ledger_version).await; let results = self diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/tailer_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/tailer_service.rs new file mode 100644 index 0000000000000..37bfdc3de7474 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/tailer_service.rs @@ -0,0 +1,69 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_config::config::NodeConfig; +use aptos_db_indexer::{db_ops::open_db, db_tailer::DBTailer}; +use aptos_indexer_grpc_utils::counters::{log_grpc_step, IndexerGrpcStep}; +use aptos_storage_interface::DbReader; +use std::sync::Arc; + +const SERVICE_TYPE: &str = "db_tailer_service"; +const INDEX_ASYNC_DB_TAILER: &str = "index_async_db_tailer"; + +pub struct TailerService { + pub db_tailer: Arc, +} + +impl TailerService { + pub fn new(db_reader: Arc, node_config: &NodeConfig) -> Self { + let db_path = node_config + .storage + .get_dir_paths() + .default_root_path() + .join(INDEX_ASYNC_DB_TAILER); + let rocksdb_config = node_config.storage.rocksdb_configs.index_db_config; + let db = Arc::new( + open_db(db_path, &rocksdb_config) + .expect("Failed to open up indexer db tailer initially"), + ); + + let indexer_db_tailer = + Arc::new(DBTailer::new(db, db_reader, &node_config.index_db_tailer)); + Self { + db_tailer: indexer_db_tailer, + } + } + + pub fn get_db_tailer(&self) -> Arc { + Arc::clone(&self.db_tailer) + } + + pub async fn run(&mut self) { + let mut start_version = self.db_tailer.get_persisted_version(); + loop { + let start_time: std::time::Instant = std::time::Instant::now(); + let cur_version = self + .db_tailer + .process_a_batch(Some(start_version)) + .expect("Failed to run indexer db tailer"); + + if cur_version == start_version { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + continue; + } + start_version = cur_version; + log_grpc_step( + SERVICE_TYPE, + IndexerGrpcStep::DBTailerProcessed, + None, + None, + None, + None, + Some(start_time.elapsed().as_secs_f64()), + None, + Some(cur_version as i64), + None, + ); + } + } +} diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs index 434bdb5cf613c..eb9c11f711694 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs @@ -58,6 +58,8 @@ pub enum IndexerGrpcStep { TableInfoProcessedBatch, // [Indexer Table Info] Processed transactions from fullnode TableInfoProcessed, + // [Indexer DB Tailer] Tailed AptosDB and write to indexer DB + DBTailerProcessed, } impl IndexerGrpcStep { @@ -91,6 +93,8 @@ impl IndexerGrpcStep { // Table info service steps IndexerGrpcStep::TableInfoProcessedBatch => "1", IndexerGrpcStep::TableInfoProcessed => "2", + // DB tailer service steps + IndexerGrpcStep::DBTailerProcessed => "1", } } @@ -136,6 +140,10 @@ impl IndexerGrpcStep { IndexerGrpcStep::TableInfoProcessed => { "[Indexer Table Info] Processed successfully" } + // DB tailer service steps + IndexerGrpcStep::DBTailerProcessed => { + "[Indexer DB Tailer] Tailed AptosDB and write to indexer DB" + } } } } diff --git a/execution/executor/Cargo.toml b/execution/executor/Cargo.toml index aec6c3ca9ae54..4c1239884863b 100644 --- a/execution/executor/Cargo.toml +++ b/execution/executor/Cargo.toml @@ -20,6 +20,7 @@ aptos-drop-helper = { workspace = true } aptos-executor-service = { workspace = true } aptos-executor-types = { workspace = true } aptos-experimental-runtimes = { workspace = true } +aptos-sdk = { workspace = true } aptos-infallible = { workspace = true } aptos-logger = { workspace = true } aptos-metrics-core = { workspace = true } @@ -42,6 +43,7 @@ serde = { workspace = true } aptos-cached-packages = { workspace = true } aptos-config = { workspace = true } aptos-db = { workspace = true } +aptos-db-indexer = { workspace = true, features = ["test"] } aptos-executor-test-helpers = { workspace = true } aptos-genesis = { workspace = true } aptos-storage-interface = { workspace = true } diff --git a/execution/executor/tests/internal_indexer_test.rs b/execution/executor/tests/internal_indexer_test.rs new file mode 100644 index 0000000000000..6675bb0231bbf --- /dev/null +++ b/execution/executor/tests/internal_indexer_test.rs @@ -0,0 +1,161 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_cached_packages::aptos_stdlib; +use aptos_config::config::{index_db_tailer_config::IndexDBTailerConfig, RocksdbConfig}; +use aptos_db::AptosDB; +use aptos_db_indexer::{db_ops::open_tailer_db, db_tailer::DBTailer}; +use aptos_executor_test_helpers::{ + gen_block_id, gen_ledger_info_with_sigs, integration_test_impl::create_db_and_executor, +}; +use aptos_executor_types::BlockExecutorTrait; +use aptos_sdk::{ + transaction_builder::TransactionFactory, + types::{AccountKey, LocalAccount}, +}; +use aptos_storage_interface::DbReader; +use aptos_temppath::TempPath; +use aptos_types::{ + account_config::aptos_test_root_address, + block_metadata::BlockMetadata, + chain_id::ChainId, + test_helpers::transaction_test_helpers::TEST_BLOCK_EXECUTOR_ONCHAIN_CONFIG, + transaction::{ + signature_verified_transaction::into_signature_verified_block, Transaction, + Transaction::UserTransaction, WriteSetPayload, + }, +}; +use rand::SeedableRng; +use std::sync::Arc; + +const B: u64 = 1_000_000_000; + +#[cfg(test)] +pub fn create_test_db() -> (Arc, LocalAccount) { + // create test db + let path = aptos_temppath::TempPath::new(); + let (genesis, validators) = aptos_vm_genesis::test_genesis_change_set_and_validators(Some(1)); + let genesis_txn = Transaction::GenesisTransaction(WriteSetPayload::Direct(genesis)); + let core_resources_account: LocalAccount = LocalAccount::new( + aptos_test_root_address(), + AccountKey::from_private_key(aptos_vm_genesis::GENESIS_KEYPAIR.0.clone()), + 0, + ); + let (aptos_db, _db, executor, _waypoint) = + create_db_and_executor(path.path(), &genesis_txn, true); + let parent_block_id = executor.committed_block_id(); + + // This generates accounts that do not overlap with genesis + let seed = [3u8; 32]; + let mut rng = ::rand::rngs::StdRng::from_seed(seed); + let signer = aptos_types::validator_signer::ValidatorSigner::new( + validators[0].data.owner_address, + validators[0].consensus_key.clone(), + ); + let account1 = LocalAccount::generate(&mut rng); + let account2 = LocalAccount::generate(&mut rng); + let account3 = LocalAccount::generate(&mut rng); + + let txn_factory = TransactionFactory::new(ChainId::test()); + + let block1_id = gen_block_id(1); + let block1_meta = Transaction::BlockMetadata(BlockMetadata::new( + block1_id, + 1, + 0, + signer.author(), + vec![0], + vec![], + 1, + )); + let tx1 = core_resources_account + .sign_with_transaction_builder(txn_factory.create_user_account(account1.public_key())); + let tx2 = core_resources_account + .sign_with_transaction_builder(txn_factory.create_user_account(account2.public_key())); + let tx3 = core_resources_account + .sign_with_transaction_builder(txn_factory.create_user_account(account3.public_key())); + // Create account1 with 2T coins. + let txn1 = core_resources_account + .sign_with_transaction_builder(txn_factory.mint(account1.address(), 2_000 * B)); + // Create account2 with 1.2T coins. + let txn2 = core_resources_account + .sign_with_transaction_builder(txn_factory.mint(account2.address(), 1_200 * B)); + // Create account3 with 1T coins. + let txn3 = core_resources_account + .sign_with_transaction_builder(txn_factory.mint(account3.address(), 1_000 * B)); + + // Transfer 20B coins from account1 to account2. + // balance: <1.98T, 1.22T, 1T + let txn4 = + account1.sign_with_transaction_builder(txn_factory.transfer(account2.address(), 20 * B)); + + // Transfer 10B coins from account2 to account3. + // balance: <1.98T, <1.21T, 1.01T + let txn5 = + account2.sign_with_transaction_builder(txn_factory.transfer(account3.address(), 10 * B)); + + // Transfer 70B coins from account1 to account3. + // balance: <1.91T, <1.21T, 1.08T + let txn6 = + account1.sign_with_transaction_builder(txn_factory.transfer(account3.address(), 70 * B)); + + let reconfig1 = core_resources_account.sign_with_transaction_builder( + txn_factory.payload(aptos_stdlib::aptos_governance_force_end_epoch_test_only()), + ); + + let block1: Vec<_> = into_signature_verified_block(vec![ + block1_meta, + UserTransaction(tx1), + UserTransaction(tx2), + UserTransaction(tx3), + UserTransaction(txn1), + UserTransaction(txn2), + UserTransaction(txn3), + UserTransaction(txn4), + UserTransaction(txn5), + UserTransaction(txn6), + UserTransaction(reconfig1), + ]); + let output1 = executor + .execute_block( + (block1_id, block1.clone()).into(), + parent_block_id, + TEST_BLOCK_EXECUTOR_ONCHAIN_CONFIG, + ) + .unwrap(); + let li1 = gen_ledger_info_with_sigs(1, &output1, block1_id, &[signer.clone()]); + executor.commit_blocks(vec![block1_id], li1).unwrap(); + (aptos_db, core_resources_account) +} + +#[test] +fn test_db_tailer_data() { + // create test db + let (aptos_db, core_account) = create_test_db(); + let total_version = aptos_db.get_latest_version().unwrap(); + // create db tailer + let rocksdb_config = RocksdbConfig::default(); + let temp_path = TempPath::new(); + let db = Arc::new( + open_tailer_db(temp_path.as_ref(), &rocksdb_config) + .expect("Failed to open up indexer db tailer initially"), + ); + let tailer = DBTailer::new(db, aptos_db, &IndexDBTailerConfig::new(true, 2)); + // assert the data matches the expected data + let mut version = tailer.get_persisted_version(); + assert_eq!(version, 0); + while version < total_version { + version = tailer.process_a_batch(Some(version)).unwrap(); + } + let txn_iter = tailer + .get_account_transaction_version_iter(core_account.address(), 0, 1000, 1000) + .unwrap(); + let res: Vec<_> = txn_iter.collect(); + // core account submitted 7 transactions, and the first transaction is version 2 + assert!(res.len() == 7); + assert!(res[0].as_ref().unwrap().1 == 2); + + let x = tailer.get_event_by_key_iter().unwrap(); + let res: Vec<_> = x.collect(); + assert!(!res.is_empty()); +} diff --git a/storage/aptosdb/src/db/include/aptosdb_reader.rs b/storage/aptosdb/src/db/include/aptosdb_reader.rs index ee8d469ddb8c1..42cdaa87d5492 100644 --- a/storage/aptosdb/src/db/include/aptosdb_reader.rs +++ b/storage/aptosdb/src/db/include/aptosdb_reader.rs @@ -284,6 +284,7 @@ impl DbReader for AptosDB { }) } + /// TODO(bowu): Deprecate after internal index migration fn get_events( &self, event_key: &EventKey, @@ -809,6 +810,86 @@ impl DbReader for AptosDB { self.state_store.get_usage(version) }) } + + fn get_db_backup_iter( + &self, + start_version: Version, + num_transactions: usize, + ) -> Result)>> + '_,>> { + gauged_api("get_db_backup_iter", || { + self.error_if_ledger_pruned("Transaction", start_version)?; + + let txn_iter = self + .ledger_db + .transaction_db() + .get_transaction_iter(start_version, num_transactions)?; + let mut event_vec_iter = self + .ledger_db + .event_db() + .get_events_by_version_iter(start_version, num_transactions)?; + let zipped = txn_iter.enumerate().map(move |(idx, txn_res)| { + let version = start_version + idx as u64; // overflow is impossible since it's check upon txn_iter construction. + + let txn = txn_res?; + let event_vec = event_vec_iter.next().ok_or_else(|| { + AptosDbError::NotFound(format!( + "Events not found when Transaction exists., version {}", + version + )) + })??; + Ok((txn, event_vec)) + }); + Ok(Box::new(zipped) as Box)>> + '_,> ) + + }) + } + + /// Returns the transaction with proof for a given version, or error if the transaction is not + /// found. + fn get_transaction_with_proof( + &self, + version: Version, + ledger_version: Version, + fetch_events: bool, + ) -> Result { + self.error_if_ledger_pruned("Transaction", version)?; + + let proof = self + .ledger_db + .transaction_info_db() + .get_transaction_info_with_proof( + version, + ledger_version, + self.ledger_db.transaction_accumulator_db(), + )?; + let transaction = self.ledger_db.transaction_db().get_transaction(version)?; + + // If events were requested, also fetch those. + let events = if fetch_events { + Some(self.ledger_db.event_db().get_events_by_version(version)?) + } else { + None + }; + + Ok(TransactionWithProof { + version, + transaction, + events, + proof, + }) + } + + fn get_event_by_version_and_index( + &self, + version: Version, + index: u64, + ) -> Result { + gauged_api("get_event_by_version_and_index", || { + self.error_if_ledger_pruned("Event", version)?; + self.event_store.get_event_by_version_and_index(version, index) + }) + + } } impl AptosDB { @@ -918,6 +999,7 @@ impl AptosDB { }) } + /// TODO(bowu): Deprecate after internal index migration fn get_events_by_event_key( &self, event_key: &EventKey, diff --git a/storage/indexer/Cargo.toml b/storage/indexer/Cargo.toml index b57c5b7da1435..0c111007b9aa1 100644 --- a/storage/indexer/Cargo.toml +++ b/storage/indexer/Cargo.toml @@ -19,10 +19,13 @@ aptos-logger = { workspace = true } aptos-resource-viewer = { workspace = true } aptos-rocksdb-options = { workspace = true } aptos-schemadb = { workspace = true } +aptos-sdk = { workspace = true } aptos-storage-interface = { workspace = true } +aptos-vm-genesis = { workspace = true } aptos-types = { workspace = true } bcs = { workspace = true } bytes = { workspace = true } +byteorder = { workspace = true } dashmap = { workspace = true } move-core-types = { workspace = true } proptest = { workspace = true, optional = true } @@ -39,4 +42,5 @@ rand = { workspace = true } [features] default = [] +test = [] fuzzing = ["proptest", "proptest-derive", "aptos-types/fuzzing", "aptos-schemadb/fuzzing"] diff --git a/storage/indexer/src/db_ops.rs b/storage/indexer/src/db_ops.rs index 2f109bed876f0..000997ab32f4a 100644 --- a/storage/indexer/src/db_ops.rs +++ b/storage/indexer/src/db_ops.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::schema::column_families; +use crate::schema::{column_families, tailer_column_families}; use anyhow::Result; use aptos_config::config::RocksdbConfig; use aptos_rocksdb_options::gen_rocksdb_options; @@ -17,6 +17,15 @@ pub fn open_db>(db_path: P, rocksdb_config: &RocksdbConfig) -> Re )?) } +pub fn open_tailer_db>(db_path: P, rocksdb_config: &RocksdbConfig) -> Result { + Ok(DB::open( + db_path, + "tailer_db", + tailer_column_families(), + &gen_rocksdb_options(rocksdb_config, false), + )?) +} + pub fn close_db(db: DB) { mem::drop(db) } diff --git a/storage/indexer/src/db_tailer.rs b/storage/indexer/src/db_tailer.rs new file mode 100644 index 0000000000000..014fda1ae3514 --- /dev/null +++ b/storage/indexer/src/db_tailer.rs @@ -0,0 +1,302 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + schema::{ + event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema, + indexer_metadata::TailerMetadataSchema, transaction_by_account::TransactionByAccountSchema, + }, + utils::{ + error_if_too_many_requested, get_first_seq_num_and_limit, AccountTransactionVersionIter, + MAX_REQUEST_LIMIT, + }, +}; +use aptos_config::config::index_db_tailer_config::IndexDBTailerConfig; +use aptos_schemadb::{ReadOptions, SchemaBatch, DB}; +use aptos_storage_interface::{ + db_ensure as ensure, db_other_bail as bail, AptosDbError, DbReader, Result, +}; +use aptos_types::{ + account_address::AccountAddress, + contract_event::{ContractEvent, EventWithVersion}, + event::EventKey, + indexer::db_tailer_reader::{IndexerTransactionEventReader, Order}, + transaction::{AccountTransactionsWithProof, Version}, +}; +use std::sync::Arc; + +pub struct DBTailer { + pub db: Arc, + pub main_db_reader: Arc, + batch_size: usize, +} + +impl DBTailer { + pub fn new(db: Arc, db_reader: Arc, config: &IndexDBTailerConfig) -> Self { + Self { + db, + main_db_reader: db_reader, + batch_size: config.batch_size, + } + } + + pub fn get_persisted_version(&self) -> Version { + // read the latest key from the db + let mut rev_iter_res = self + .db + .rev_iter::(Default::default()) + .expect("Cannot create db tailer metadata iterator"); + rev_iter_res + .next() + .map(|res| res.map_or(0, |(version, _)| version)) + .unwrap_or_default() + } + + pub fn process_a_batch(&self, start_version: Option) -> Result { + let mut version = start_version.unwrap_or(0); + let db_iter: Box< + dyn Iterator< + Item = std::prelude::v1::Result< + (aptos_types::transaction::Transaction, Vec), + AptosDbError, + >, + >, + > = self + .main_db_reader + .get_db_backup_iter(version, self.batch_size) + .expect("Cannot create db tailer iterator"); + let batch = SchemaBatch::new(); + let metadata_batch = SchemaBatch::new(); + db_iter.for_each(|res| { + res.map(|(txn, events)| { + if let Some(txn) = txn.try_as_signed_user_txn() { + batch + .put::( + &(txn.sender(), txn.sequence_number()), + &version, + ) + .expect("Failed to put txn to db tailer batch"); + + events.iter().enumerate().for_each(|(idx, event)| { + if let ContractEvent::V1(v1) = event { + batch + .put::( + &(*v1.key(), v1.sequence_number()), + &(version, idx as u64), + ) + .expect("Failed to event by key to db tailer batch"); + batch + .put::( + &(*v1.key(), version, v1.sequence_number()), + &(idx as u64), + ) + .expect("Failed to event by version to db tailer batch"); + } + }); + } + version += 1; + }) + .expect("Failed to iterate db tailer iterator"); + }); + // write to index db + self.db.write_schemas(batch)?; + // update the metadata + metadata_batch.put::(&version, &())?; + self.db.write_schemas(metadata_batch)?; + Ok(version) + } + + pub fn get_account_transaction_version_iter( + &self, + address: AccountAddress, + min_seq_num: u64, + num_versions: u64, + ledger_version: Version, + ) -> Result { + let mut iter = self + .db + .iter::(ReadOptions::default())?; + iter.seek(&(address, min_seq_num))?; + Ok(AccountTransactionVersionIter::new( + iter, + address, + min_seq_num + .checked_add(num_versions) + .ok_or(AptosDbError::TooManyRequested(min_seq_num, num_versions))?, + ledger_version, + )) + } + + pub fn get_latest_sequence_number( + &self, + ledger_version: Version, + event_key: &EventKey, + ) -> Result> { + let mut iter = self + .db + .iter::(ReadOptions::default())?; + iter.seek_for_prev(&(*event_key, ledger_version, u64::max_value()))?; + + Ok(iter.next().transpose()?.and_then( + |((key, _version, seq), _idx)| if &key == event_key { Some(seq) } else { None }, + )) + } + + /// Given `event_key` and `start_seq_num`, returns events identified by transaction version and + /// index among all events emitted by the same transaction. Result won't contain records with a + /// transaction version > `ledger_version` and is in ascending order. + pub fn lookup_events_by_key( + &self, + event_key: &EventKey, + start_seq_num: u64, + limit: u64, + ledger_version: u64, + ) -> Result< + Vec<( + u64, // sequence number + Version, // transaction version it belongs to + u64, // index among events for the same transaction + )>, + > { + let mut iter = self.db.iter::(ReadOptions::default())?; + iter.seek(&(*event_key, start_seq_num))?; + + let mut result = Vec::new(); + let mut cur_seq = start_seq_num; + for res in iter.take(limit as usize) { + let ((path, seq), (ver, idx)) = res?; + if path != *event_key || ver > ledger_version { + break; + } + if seq != cur_seq { + let msg = if cur_seq == start_seq_num { + "First requested event is probably pruned." + } else { + "DB corruption: Sequence number not continuous." + }; + bail!("{} expected: {}, actual: {}", msg, cur_seq, seq); + } + result.push((seq, ver, idx)); + cur_seq += 1; + } + + Ok(result) + } + + #[cfg(any(test, feature = "test"))] + pub fn get_event_by_key_iter( + &self, + ) -> Result + '_>> { + let mut iter = self.db.iter::(ReadOptions::default())?; + iter.seek_to_first(); + Ok(Box::new(iter.map(|res| { + let ((event_key, seq_num), (txn_version, idx)) = res.unwrap(); + (event_key, txn_version, seq_num, idx) + }))) + } +} + +impl IndexerTransactionEventReader for DBTailer { + fn get_events( + &self, + event_key: &EventKey, + start: u64, + order: Order, + limit: u64, + ledger_version: Version, + ) -> anyhow::Result> { + self.get_events_by_event_key(event_key, start, order, limit, ledger_version) + } + + fn get_events_by_event_key( + &self, + event_key: &EventKey, + start_seq_num: u64, + order: Order, + limit: u64, + ledger_version: Version, + ) -> anyhow::Result> { + error_if_too_many_requested(limit, MAX_REQUEST_LIMIT)?; + let get_latest = order == Order::Descending && start_seq_num == u64::max_value(); + + let cursor = if get_latest { + // Caller wants the latest, figure out the latest seq_num. + // In the case of no events on that path, use 0 and expect empty result below. + self.get_latest_sequence_number(ledger_version, event_key)? + .unwrap_or(0) + } else { + start_seq_num + }; + + // Convert requested range and order to a range in ascending order. + let (first_seq, real_limit) = get_first_seq_num_and_limit(order, cursor, limit)?; + + // Query the index. + let mut event_indices = + self.lookup_events_by_key(event_key, first_seq, real_limit, ledger_version)?; + + // When descending, it's possible that user is asking for something beyond the latest + // sequence number, in which case we will consider it a bad request and return an empty + // list. + // For example, if the latest sequence number is 100, and the caller is asking for 110 to + // 90, we will get 90 to 100 from the index lookup above. Seeing that the last item + // is 100 instead of 110 tells us 110 is out of bound. + if order == Order::Descending { + if let Some((seq_num, _, _)) = event_indices.last() { + if *seq_num < cursor { + event_indices = Vec::new(); + } + } + } + + let mut events_with_version = event_indices + .into_iter() + .map(|(seq, ver, idx)| { + let event = self + .main_db_reader + .get_event_by_version_and_index(ver, idx)?; + let v0 = match &event { + ContractEvent::V1(event) => event, + ContractEvent::V2(_) => bail!("Unexpected module event"), + }; + ensure!( + seq == v0.sequence_number(), + "Index broken, expected seq:{}, actual:{}", + seq, + v0.sequence_number() + ); + Ok(EventWithVersion::new(ver, event)) + }) + .collect::>>()?; + if order == Order::Descending { + events_with_version.reverse(); + } + + Ok(events_with_version) + } + + fn get_account_transactions( + &self, + address: AccountAddress, + start_seq_num: u64, + limit: u64, + include_events: bool, + ledger_version: Version, + ) -> anyhow::Result { + error_if_too_many_requested(limit, MAX_REQUEST_LIMIT)?; + + let txns_with_proofs = self + .get_account_transaction_version_iter(address, start_seq_num, limit, ledger_version)? + .map(|result| { + let (_seq_num, txn_version) = result?; + self.main_db_reader.get_transaction_with_proof( + txn_version, + ledger_version, + include_events, + ) + }) + .collect::>>()?; + + Ok(AccountTransactionsWithProof::new(txns_with_proofs)) + } +} diff --git a/storage/indexer/src/lib.rs b/storage/indexer/src/lib.rs index 3b82dd83ba574..01990ae2475ca 100644 --- a/storage/indexer/src/lib.rs +++ b/storage/indexer/src/lib.rs @@ -4,10 +4,12 @@ /// TODO(jill): deprecate Indexer once Indexer Async V2 is ready mod db; pub mod db_ops; +pub mod db_tailer; pub mod db_v2; mod metadata; mod schema; pub mod table_info_reader; +mod utils; use crate::{ db::INDEX_DB_NAME, diff --git a/storage/indexer/src/schema/event_by_key/mod.rs b/storage/indexer/src/schema/event_by_key/mod.rs new file mode 100644 index 0000000000000..12ecc845d8c68 --- /dev/null +++ b/storage/indexer/src/schema/event_by_key/mod.rs @@ -0,0 +1,76 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! This module defines physical storage schema for an event index via which a ContractEvent ( +//! represented by a tuple so that it can be fetched from `EventSchema`) +//! can be found by tuple. +//! +//! ```text +//! |<---------key------->|<----value---->| +//! | event_key | seq_num | txn_ver | idx | +//! ``` + +use crate::{schema::EVENT_BY_KEY_CF_NAME, utils::ensure_slice_len_eq}; +use anyhow::Result; +use aptos_schemadb::{ + define_schema, + schema::{KeyCodec, ValueCodec}, +}; +use aptos_types::{event::EventKey, transaction::Version}; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use std::mem::size_of; + +define_schema!(EventByKeySchema, Key, Value, EVENT_BY_KEY_CF_NAME); + +type SeqNum = u64; +type Key = (EventKey, SeqNum); + +type Index = u64; +type Value = (Version, Index); + +impl KeyCodec for Key { + fn encode_key(&self) -> Result> { + let (ref event_key, seq_num) = *self; + + let mut encoded = event_key.to_bytes(); + encoded.write_u64::(seq_num)?; + + Ok(encoded) + } + + fn decode_key(data: &[u8]) -> Result { + ensure_slice_len_eq(data, size_of::())?; + + const EVENT_KEY_LEN: usize = size_of::(); + let event_key = bcs::from_bytes(&data[..EVENT_KEY_LEN])?; + let seq_num = (&data[EVENT_KEY_LEN..]).read_u64::()?; + + Ok((event_key, seq_num)) + } +} + +impl ValueCodec for Value { + fn encode_value(&self) -> Result> { + let (version, index) = *self; + + let mut encoded = Vec::with_capacity(size_of::() + size_of::()); + encoded.write_u64::(version)?; + encoded.write_u64::(index)?; + + Ok(encoded) + } + + fn decode_value(data: &[u8]) -> Result { + ensure_slice_len_eq(data, size_of::())?; + + const VERSION_SIZE: usize = size_of::(); + let version = (&data[..VERSION_SIZE]).read_u64::()?; + let index = (&data[VERSION_SIZE..]).read_u64::()?; + + Ok((version, index)) + } +} + +#[cfg(test)] +mod test; diff --git a/storage/indexer/src/schema/event_by_key/test.rs b/storage/indexer/src/schema/event_by_key/test.rs new file mode 100644 index 0000000000000..9e345fb26fefc --- /dev/null +++ b/storage/indexer/src/schema/event_by_key/test.rs @@ -0,0 +1,21 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; +use aptos_schemadb::{schema::fuzzing::assert_encode_decode, test_no_panic_decoding}; +use proptest::prelude::*; + +proptest! { + #[test] + fn test_encode_decode( + event_key in any::(), + seq_num in any::(), + version in any::(), + index in any::(), + ) { + assert_encode_decode::(&(event_key, seq_num), &(version, index)); + } +} + +test_no_panic_decoding!(EventByKeySchema); diff --git a/storage/indexer/src/schema/event_by_version/mod.rs b/storage/indexer/src/schema/event_by_version/mod.rs new file mode 100644 index 0000000000000..aa3824d9f04c9 --- /dev/null +++ b/storage/indexer/src/schema/event_by_version/mod.rs @@ -0,0 +1,69 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! This module defines physical storage schema for an event index via which a ContractEvent ( +//! represented by a tuple so that it can be fetched from `EventSchema`) +//! can be found by tuple. +//! +//! ```text +//! |<--------------key------------>|<-value->| +//! | event_key | txn_ver | seq_num | idx | +//! ``` + +use crate::{schema::EVENT_BY_VERSION_CF_NAME, utils::ensure_slice_len_eq}; +use anyhow::Result; +use aptos_schemadb::{ + define_schema, + schema::{KeyCodec, ValueCodec}, +}; +use aptos_types::{event::EventKey, transaction::Version}; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use std::mem::size_of; + +define_schema!(EventByVersionSchema, Key, Value, EVENT_BY_VERSION_CF_NAME); + +type SeqNum = u64; +type Key = (EventKey, Version, SeqNum); + +type Index = u64; +type Value = Index; + +impl KeyCodec for Key { + fn encode_key(&self) -> Result> { + let (ref event_key, version, seq_num) = *self; + + let mut encoded = event_key.to_bytes(); + encoded.write_u64::(version)?; + encoded.write_u64::(seq_num)?; + + Ok(encoded) + } + + fn decode_key(data: &[u8]) -> Result { + ensure_slice_len_eq(data, size_of::())?; + + const EVENT_KEY_LEN: usize = size_of::(); + const EVENT_KEY_AND_VER_LEN: usize = size_of::<(EventKey, Version)>(); + let event_key = bcs::from_bytes(&data[..EVENT_KEY_LEN])?; + let version = (&data[EVENT_KEY_LEN..]).read_u64::()?; + let seq_num = (&data[EVENT_KEY_AND_VER_LEN..]).read_u64::()?; + + Ok((event_key, version, seq_num)) + } +} + +impl ValueCodec for Value { + fn encode_value(&self) -> Result> { + Ok(self.to_be_bytes().to_vec()) + } + + fn decode_value(mut data: &[u8]) -> Result { + ensure_slice_len_eq(data, size_of::())?; + + Ok(data.read_u64::()?) + } +} + +#[cfg(test)] +mod test; diff --git a/storage/indexer/src/schema/event_by_version/test.rs b/storage/indexer/src/schema/event_by_version/test.rs new file mode 100644 index 0000000000000..a6d78bd7baf88 --- /dev/null +++ b/storage/indexer/src/schema/event_by_version/test.rs @@ -0,0 +1,21 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; +use aptos_schemadb::{schema::fuzzing::assert_encode_decode, test_no_panic_decoding}; +use proptest::prelude::*; + +proptest! { + #[test] + fn test_encode_decode( + event_key in any::(), + seq_num in any::(), + version in any::(), + index in any::(), + ) { + assert_encode_decode::(&(event_key, version, seq_num), &index); + } +} + +test_no_panic_decoding!(EventByVersionSchema); diff --git a/storage/indexer/src/schema/indexer_metadata/mod.rs b/storage/indexer/src/schema/indexer_metadata/mod.rs index f8615ba5b8ee9..652fcde44469b 100644 --- a/storage/indexer/src/schema/indexer_metadata/mod.rs +++ b/storage/indexer/src/schema/indexer_metadata/mod.rs @@ -4,15 +4,18 @@ //! This module defines physical storage schema storing metadata for the internal indexer //! +use super::TAILER_METADATA_CF_NAME; use crate::{ metadata::{MetadataKey, MetadataValue}, schema::INDEXER_METADATA_CF_NAME, + utils::ensure_slice_len_eq, }; use anyhow::Result; use aptos_schemadb::{ define_schema, schema::{KeyCodec, ValueCodec}, }; +use aptos_types::transaction::Version; define_schema!( IndexerMetadataSchema, @@ -41,5 +44,28 @@ impl ValueCodec for MetadataValue { } } +define_schema!(TailerMetadataSchema, Version, (), TAILER_METADATA_CF_NAME); + +impl KeyCodec for Version { + fn encode_key(&self) -> Result> { + Ok(bcs::to_bytes(self)?) + } + + fn decode_key(data: &[u8]) -> Result { + Ok(bcs::from_bytes(data)?) + } +} + +impl ValueCodec for () { + fn encode_value(&self) -> Result> { + Ok(Vec::new()) + } + + fn decode_value(data: &[u8]) -> Result { + ensure_slice_len_eq(data, 0)?; + Ok(()) + } +} + #[cfg(test)] mod test; diff --git a/storage/indexer/src/schema/indexer_metadata/test.rs b/storage/indexer/src/schema/indexer_metadata/test.rs index a1117d2b17ea3..0876f4f818051 100644 --- a/storage/indexer/src/schema/indexer_metadata/test.rs +++ b/storage/indexer/src/schema/indexer_metadata/test.rs @@ -13,6 +13,13 @@ proptest! { ) { assert_encode_decode::(&tag, &metadata); } + + #[test] + fn test_encode_decode_tailer_metadata( + version in any::(), + ) { + assert_encode_decode::(&version, &()); + } } test_no_panic_decoding!(IndexerMetadataSchema); diff --git a/storage/indexer/src/schema/mod.rs b/storage/indexer/src/schema/mod.rs index 90ba38374788d..844e8e9487e39 100644 --- a/storage/indexer/src/schema/mod.rs +++ b/storage/indexer/src/schema/mod.rs @@ -6,14 +6,20 @@ //! //! All schemas are `pub(crate)` so not shown in rustdoc, refer to the source code to see details. -pub(crate) mod indexer_metadata; -pub(crate) mod table_info; - +pub mod event_by_key; +pub mod event_by_version; +pub mod indexer_metadata; +pub mod table_info; +pub mod transaction_by_account; use aptos_schemadb::ColumnFamilyName; pub const DEFAULT_COLUMN_FAMILY_NAME: ColumnFamilyName = "default"; pub const INDEXER_METADATA_CF_NAME: ColumnFamilyName = "indexer_metadata"; +pub const TAILER_METADATA_CF_NAME: ColumnFamilyName = "event_metadata"; pub const TABLE_INFO_CF_NAME: ColumnFamilyName = "table_info"; +pub const EVENT_BY_KEY_CF_NAME: ColumnFamilyName = "event_by_key"; +pub const EVENT_BY_VERSION_CF_NAME: ColumnFamilyName = "event_by_version"; +pub const TRANSACTION_BY_ACCOUNT_CF_NAME: ColumnFamilyName = "transaction_by_account"; pub fn column_families() -> Vec { vec![ @@ -22,3 +28,13 @@ pub fn column_families() -> Vec { TABLE_INFO_CF_NAME, ] } + +pub fn tailer_column_families() -> Vec { + vec![ + /* empty cf */ DEFAULT_COLUMN_FAMILY_NAME, + TAILER_METADATA_CF_NAME, + EVENT_BY_KEY_CF_NAME, + EVENT_BY_VERSION_CF_NAME, + TRANSACTION_BY_ACCOUNT_CF_NAME, + ] +} diff --git a/storage/indexer/src/schema/transaction_by_account/mod.rs b/storage/indexer/src/schema/transaction_by_account/mod.rs new file mode 100644 index 0000000000000..c4e1d50e553a8 --- /dev/null +++ b/storage/indexer/src/schema/transaction_by_account/mod.rs @@ -0,0 +1,67 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! This module defines physical storage schema for a transaction index via which the version of a +//! transaction sent by `account_address` with `sequence_number` can be found. With the version one +//! can resort to `TransactionSchema` for the transaction content. +//! +//! ```text +//! |<-------key------->|<-value->| +//! | address | seq_num | txn_ver | +//! ``` + +use crate::{schema::TRANSACTION_BY_ACCOUNT_CF_NAME, utils::ensure_slice_len_eq}; +use anyhow::Result; +use aptos_schemadb::{ + define_schema, + schema::{KeyCodec, ValueCodec}, +}; +use aptos_types::{account_address::AccountAddress, transaction::Version}; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use std::{convert::TryFrom, mem::size_of}; + +define_schema!( + TransactionByAccountSchema, + Key, + Version, + TRANSACTION_BY_ACCOUNT_CF_NAME +); + +type SeqNum = u64; +type Key = (AccountAddress, SeqNum); + +impl KeyCodec for Key { + fn encode_key(&self) -> Result> { + let (ref account_address, seq_num) = *self; + + let mut encoded = account_address.to_vec(); + encoded.write_u64::(seq_num)?; + + Ok(encoded) + } + + fn decode_key(data: &[u8]) -> Result { + ensure_slice_len_eq(data, size_of::())?; + + let address = AccountAddress::try_from(&data[..AccountAddress::LENGTH])?; + let seq_num = (&data[AccountAddress::LENGTH..]).read_u64::()?; + + Ok((address, seq_num)) + } +} + +impl ValueCodec for Version { + fn encode_value(&self) -> Result> { + Ok(self.to_be_bytes().to_vec()) + } + + fn decode_value(mut data: &[u8]) -> Result { + ensure_slice_len_eq(data, size_of::())?; + + Ok(data.read_u64::()?) + } +} + +#[cfg(test)] +mod test; diff --git a/storage/indexer/src/schema/transaction_by_account/test.rs b/storage/indexer/src/schema/transaction_by_account/test.rs new file mode 100644 index 0000000000000..dc04cff3b6453 --- /dev/null +++ b/storage/indexer/src/schema/transaction_by_account/test.rs @@ -0,0 +1,20 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; +use aptos_schemadb::{schema::fuzzing::assert_encode_decode, test_no_panic_decoding}; +use proptest::prelude::*; + +proptest! { + #[test] + fn test_encode_decode( + address in any::(), + seq_num in any::(), + version in any::(), + ) { + assert_encode_decode::(&(address, seq_num), &version); + } +} + +test_no_panic_decoding!(TransactionByAccountSchema); diff --git a/storage/indexer/src/table_info_reader.rs b/storage/indexer/src/table_info_reader.rs index f0ddcff4aa1ff..5cb8b05897c45 100644 --- a/storage/indexer/src/table_info_reader.rs +++ b/storage/indexer/src/table_info_reader.rs @@ -2,18 +2,18 @@ // SPDX-License-Identifier: Apache-2.0 use crate::db_v2::IndexerAsyncV2; -use aptos_storage_interface::Result; -use aptos_types::state_store::table::{TableHandle, TableInfo}; +use anyhow::Result; +use aptos_types::{ + indexer::table_info_reader::TableInfoReader, + state_store::table::{TableHandle, TableInfo}, +}; /// Table info reader is to create a thin interface for other services to read the db data, /// this standalone db is officially not part of the AptosDB anymore. /// For services that need table info mapping, they need to acquire this reader in the FN bootstrapping stage. -pub trait TableInfoReader: Send + Sync { - fn get_table_info(&self, handle: TableHandle) -> Result>; -} impl TableInfoReader for IndexerAsyncV2 { fn get_table_info(&self, handle: TableHandle) -> Result> { - self.get_table_info_with_retry(handle) + Ok(self.get_table_info_with_retry(handle)?) } } diff --git a/storage/indexer/src/utils.rs b/storage/indexer/src/utils.rs new file mode 100644 index 0000000000000..f0ccadd63d047 --- /dev/null +++ b/storage/indexer/src/utils.rs @@ -0,0 +1,126 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::schema::transaction_by_account::TransactionByAccountSchema; +use aptos_schemadb::iterator::SchemaIterator; +use aptos_storage_interface::{db_ensure as ensure, AptosDbError, Result}; +use aptos_types::{ + account_address::AccountAddress, indexer::db_tailer_reader::Order, transaction::Version, +}; + +pub const MAX_REQUEST_LIMIT: u64 = 10_000; + +pub fn ensure_slice_len_eq(data: &[u8], len: usize) -> Result<()> { + ensure!( + data.len() == len, + "Unexpected data len {}, expected {}.", + data.len(), + len, + ); + Ok(()) +} + +pub fn error_if_too_many_requested(num_requested: u64, max_allowed: u64) -> Result<()> { + if num_requested > max_allowed { + Err(AptosDbError::TooManyRequested(num_requested, max_allowed)) + } else { + Ok(()) + } +} + +// Convert requested range and order to a range in ascending order. +pub fn get_first_seq_num_and_limit(order: Order, cursor: u64, limit: u64) -> Result<(u64, u64)> { + ensure!(limit > 0, "limit should > 0, got {}", limit); + + Ok(if order == Order::Ascending { + (cursor, limit) + } else if limit <= cursor { + (cursor - limit + 1, limit) + } else { + (0, cursor + 1) + }) +} + +// This is a replicate of the AccountTransactionVersionIter from storage/aptosdb crate. +pub struct AccountTransactionVersionIter<'a> { + inner: SchemaIterator<'a, TransactionByAccountSchema>, + address: AccountAddress, + expected_next_seq_num: Option, + end_seq_num: u64, + prev_version: Option, + ledger_version: Version, +} + +impl<'a> AccountTransactionVersionIter<'a> { + pub(crate) fn new( + inner: SchemaIterator<'a, TransactionByAccountSchema>, + address: AccountAddress, + end_seq_num: u64, + ledger_version: Version, + ) -> Self { + Self { + inner, + address, + end_seq_num, + ledger_version, + expected_next_seq_num: None, + prev_version: None, + } + } +} + +impl<'a> AccountTransactionVersionIter<'a> { + fn next_impl(&mut self) -> Result> { + Ok(match self.inner.next().transpose()? { + Some(((address, seq_num), version)) => { + // No more transactions sent by this account. + if address != self.address { + return Ok(None); + } + if seq_num >= self.end_seq_num { + return Ok(None); + } + + // Ensure seq_num_{i+1} == seq_num_{i} + 1 + if let Some(expected_seq_num) = self.expected_next_seq_num { + ensure!( + seq_num == expected_seq_num, + "DB corruption: account transactions sequence numbers are not contiguous: \ + actual: {}, expected: {}", + seq_num, + expected_seq_num, + ); + }; + + // Ensure version_{i+1} > version_{i} + if let Some(prev_version) = self.prev_version { + ensure!( + prev_version < version, + "DB corruption: account transaction versions are not strictly increasing: \ + previous version: {}, current version: {}", + prev_version, + version, + ); + } + + // No more transactions (in this view of the ledger). + if version > self.ledger_version { + return Ok(None); + } + + self.expected_next_seq_num = Some(seq_num + 1); + self.prev_version = Some(version); + Some((seq_num, version)) + }, + None => None, + }) + } +} + +impl<'a> Iterator for AccountTransactionVersionIter<'a> { + type Item = Result<(u64, Version)>; + + fn next(&mut self) -> Option { + self.next_impl().transpose() + } +} diff --git a/storage/storage-interface/src/lib.rs b/storage/storage-interface/src/lib.rs index 8281c0522c3da..5432dd1c3ddf3 100644 --- a/storage/storage-interface/src/lib.rs +++ b/storage/storage-interface/src/lib.rs @@ -103,6 +103,15 @@ pub enum Order { Descending, } +impl From for Order { + fn from(order: aptos_types::indexer::db_tailer_reader::Order) -> Self { + match order { + aptos_types::indexer::db_tailer_reader::Order::Ascending => Self::Ascending, + aptos_types::indexer::db_tailer_reader::Order::Descending => Self::Descending, + } + } +} + macro_rules! delegate_read { ($( $(#[$($attr:meta)*])* @@ -455,6 +464,25 @@ pub trait DbReader: Send + Sync { /// Returns state storage usage at the end of an epoch. fn get_state_storage_usage(&self, version: Option) -> Result; + + fn get_db_backup_iter( + &self, + start_version: Version, + num_transactions: usize, + ) -> Result)>> + '_>>; + + fn get_transaction_with_proof( + &self, + version: Version, + ledger_version: Version, + fetch_events: bool, + ) -> Result; + + fn get_event_by_version_and_index( + &self, + version: Version, + index: u64, + ) -> Result; ); // end delegated /// Returns the latest ledger info. diff --git a/types/src/indexer/db_tailer_reader.rs b/types/src/indexer/db_tailer_reader.rs new file mode 100644 index 0000000000000..9253c39db139b --- /dev/null +++ b/types/src/indexer/db_tailer_reader.rs @@ -0,0 +1,45 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + account_address::AccountAddress, + contract_event::EventWithVersion, + event::EventKey, + transaction::{AccountTransactionsWithProof, Version}, +}; +use anyhow::Result; + +#[derive(Clone, Copy, Eq, PartialEq)] +pub enum Order { + Ascending, + Descending, +} + +pub trait IndexerTransactionEventReader: Send + Sync { + fn get_events( + &self, + event_key: &EventKey, + start: u64, + order: Order, + limit: u64, + ledger_version: Version, + ) -> Result>; + + fn get_events_by_event_key( + &self, + event_key: &EventKey, + start_seq_num: u64, + order: Order, + limit: u64, + ledger_version: Version, + ) -> Result>; + + fn get_account_transactions( + &self, + address: AccountAddress, + start_seq_num: u64, + limit: u64, + include_events: bool, + ledger_version: Version, + ) -> Result; +} diff --git a/types/src/indexer/mod.rs b/types/src/indexer/mod.rs new file mode 100644 index 0000000000000..2793462e3844f --- /dev/null +++ b/types/src/indexer/mod.rs @@ -0,0 +1,5 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod db_tailer_reader; +pub mod table_info_reader; diff --git a/types/src/indexer/table_info_reader.rs b/types/src/indexer/table_info_reader.rs new file mode 100644 index 0000000000000..5d6910e24b919 --- /dev/null +++ b/types/src/indexer/table_info_reader.rs @@ -0,0 +1,12 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::state_store::table::{TableHandle, TableInfo}; +use anyhow::Result; + +/// Table info reader is to create a thin interface for other services to read the db data, +/// this standalone db is officially not part of the AptosDB anymore. +/// For services that need table info mapping, they need to acquire this reader in the FN bootstrapping stage. +pub trait TableInfoReader: Send + Sync { + fn get_table_info(&self, handle: TableHandle) -> Result>; +} diff --git a/types/src/lib.rs b/types/src/lib.rs index f76c5a467865a..5cae297bb17ba 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -20,6 +20,7 @@ pub mod event; pub mod executable; pub mod fee_statement; pub mod governance; +pub mod indexer; pub mod jwks; pub mod ledger_info; pub mod mempool_status;