diff --git a/Cargo.lock b/Cargo.lock index af72c26c0f2c0..37e7dd7cb9ab3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -511,7 +511,6 @@ dependencies = [ "anyhow", "aptos-config", "aptos-crypto", - "aptos-db-indexer", "aptos-framework", "aptos-logger", "aptos-openapi", @@ -1113,6 +1112,7 @@ dependencies = [ "aptos-config", "aptos-crypto", "aptos-db-indexer", + "aptos-db-indexer-schemas", "aptos-executor", "aptos-executor-types", "aptos-experimental-runtimes", @@ -1160,14 +1160,42 @@ version = "0.1.0" dependencies = [ "anyhow", "aptos-config", + "aptos-db-indexer-schemas", "aptos-logger", "aptos-proptest-helpers", "aptos-resource-viewer", "aptos-rocksdb-options", "aptos-schemadb", + "aptos-sdk", "aptos-storage-interface", "aptos-types", + "aptos-vm-genesis", "bcs 0.1.4", + "byteorder", + "bytes", + "dashmap", + "move-core-types", + "proptest", + "proptest-derive", + "rand 0.7.3", + "serde", +] + +[[package]] +name = "aptos-db-indexer-schemas" +version = "0.1.0" +dependencies = [ + "anyhow", + "aptos-config", + "aptos-logger", + "aptos-proptest-helpers", + "aptos-resource-viewer", + "aptos-rocksdb-options", + "aptos-schemadb", + "aptos-storage-interface", + "aptos-types", + "bcs 0.1.4", + "byteorder", "bytes", "dashmap", "move-core-types", @@ -1353,6 +1381,8 @@ dependencies = [ "aptos-consensus-types", "aptos-crypto", "aptos-db", + "aptos-db-indexer", + "aptos-db-indexer-schemas", "aptos-drop-helper", "aptos-executor-service", "aptos-executor-test-helpers", @@ -1363,6 +1393,7 @@ dependencies = [ "aptos-logger", "aptos-metrics-core", "aptos-scratchpad", + "aptos-sdk", "aptos-storage-interface", "aptos-temppath", "aptos-types", diff --git a/Cargo.toml b/Cargo.toml index cbbc345cf45d3..3c3d93af73724 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -172,6 +172,7 @@ members = [ "storage/db-tool", "storage/executable-store", "storage/indexer", + "storage/indexer_schemas", "storage/jellyfish-merkle", "storage/rocksdb-options", "storage/schemadb", @@ -314,6 +315,7 @@ aptos-data-client = { path = "state-sync/aptos-data-client" } aptos-data-streaming-service = { path = "state-sync/data-streaming-service" } aptos-db = { path = "storage/aptosdb" } aptos-db-indexer = { path = "storage/indexer" } +aptos-db-indexer-schemas = { path = "storage/indexer_schemas" } aptos-db-tool = { path = "storage/db-tool" } aptos-debugger = { path = "crates/aptos-debugger" } aptos-dkg = { path = "crates/aptos-dkg" } diff --git a/api/src/accounts.rs b/api/src/accounts.rs index 0193a7626d93e..1f597ce8552e1 100644 --- a/api/src/accounts.rs +++ b/api/src/accounts.rs @@ -342,10 +342,8 @@ impl Account { let state_view = self .context .latest_state_view_poem(&self.latest_ledger_info)?; - let converter = state_view.as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ); + let converter = state_view + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()); let converted_resources = converter .try_into_resources(resources.iter().map(|(k, v)| (k.clone(), v.as_slice()))) .context("Failed to build move resource response from data in DB") @@ -522,10 +520,7 @@ impl Account { self.context.state_view(Some(self.ledger_version))?; let bytes = state_view - .as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .find_resource(&state_view, self.address, resource_type) .context(format!( "Failed to query DB to check for {} at {}", @@ -543,10 +538,7 @@ impl Account { })?; state_view - .as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .move_struct_fields(resource_type, &bytes) .context("Failed to convert move structs from storage") .map_err(|err| { diff --git a/api/src/context.rs b/api/src/context.rs index dad62cfc124f3..a077845cb0b63 100644 --- a/api/src/context.rs +++ b/api/src/context.rs @@ -18,7 +18,6 @@ use aptos_api_types::{ }; use aptos_config::config::{NodeConfig, RoleType}; use aptos_crypto::HashValue; -use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_gas_schedule::{AptosGasParameters, FromOnChainGasSchedule}; use aptos_logger::{error, info, Schema}; use aptos_mempool::{MempoolClientRequest, MempoolClientSender, SubmissionStatus}; @@ -34,6 +33,7 @@ use aptos_types::{ chain_id::ChainId, contract_event::EventWithVersion, event::EventKey, + indexer::indexer_db_reader::IndexerReader, ledger_info::LedgerInfoWithSignatures, on_chain_config::{GasSchedule, GasScheduleV2, OnChainConfig, OnChainExecutionConfig}, state_store::{ @@ -76,7 +76,7 @@ pub struct Context { gas_limit_cache: Arc>, view_function_stats: Arc, simulate_txn_stats: Arc, - pub table_info_reader: Option>, + pub indexer_reader: Option>, pub wait_for_hash_active_connections: Arc, } @@ -92,7 +92,7 @@ impl Context { db: Arc, mp_sender: MempoolClientSender, node_config: NodeConfig, - table_info_reader: Option>, + indexer_reader: Option>, ) -> Self { let (view_function_stats, simulate_txn_stats) = { let log_per_call_stats = node_config.api.periodic_function_stats_sec.is_some(); @@ -129,7 +129,7 @@ impl Context { })), view_function_stats, simulate_txn_stats, - table_info_reader, + indexer_reader, wait_for_hash_active_connections: Arc::new(AtomicUsize::new(0)), } } @@ -418,7 +418,7 @@ impl Context { // We should be able to do an unwrap here, otherwise the above db read would fail. let state_view = self.state_view_at_version(version)?; - let converter = state_view.as_converter(self.db.clone(), self.table_info_reader.clone()); + let converter = state_view.as_converter(self.db.clone(), self.indexer_reader.clone()); // Extract resources from resource groups and flatten into all resources let kvs = kvs @@ -618,7 +618,7 @@ impl Context { } let state_view = self.latest_state_view_poem(ledger_info)?; - let converter = state_view.as_converter(self.db.clone(), self.table_info_reader.clone()); + let converter = state_view.as_converter(self.db.clone(), self.indexer_reader.clone()); let txns: Vec = data .into_iter() .map(|t| { @@ -650,7 +650,7 @@ impl Context { } let state_view = self.latest_state_view_poem(ledger_info)?; - let converter = state_view.as_converter(self.db.clone(), self.table_info_reader.clone()); + let converter = state_view.as_converter(self.db.clone(), self.indexer_reader.clone()); let txns: Vec = data .into_iter() .map(|t| { diff --git a/api/src/events.rs b/api/src/events.rs index 5dc4f2310f789..49c4fad21ce9f 100644 --- a/api/src/events.rs +++ b/api/src/events.rs @@ -184,10 +184,7 @@ impl EventsApi { let events = self .context .latest_state_view_poem(&latest_ledger_info)? - .as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .try_into_versioned_events(&events) .context("Failed to convert events from storage into response") .map_err(|err| { diff --git a/api/src/runtime.rs b/api/src/runtime.rs index 4673c087235a1..cf666d1f959df 100644 --- a/api/src/runtime.rs +++ b/api/src/runtime.rs @@ -10,11 +10,10 @@ use crate::{ }; use anyhow::Context as AnyhowContext; use aptos_config::config::{ApiConfig, NodeConfig}; -use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_logger::info; use aptos_mempool::MempoolClientSender; use aptos_storage_interface::DbReader; -use aptos_types::chain_id::ChainId; +use aptos_types::{chain_id::ChainId, indexer::indexer_db_reader::IndexerReader}; use poem::{ handler, http::Method, @@ -35,12 +34,12 @@ pub fn bootstrap( chain_id: ChainId, db: Arc, mp_sender: MempoolClientSender, - table_info_reader: Option>, + indexer_reader: Option>, ) -> anyhow::Result { let max_runtime_workers = get_max_runtime_workers(&config.api); let runtime = aptos_runtimes::spawn_named_runtime("api".into(), Some(max_runtime_workers)); - let context = Context::new(chain_id, db, mp_sender, config.clone(), table_info_reader); + let context = Context::new(chain_id, db, mp_sender, config.clone(), indexer_reader); attach_poem_to_runtime(runtime.handle(), context.clone(), config, false) .context("Failed to attach poem to runtime")?; diff --git a/api/src/state.rs b/api/src/state.rs index 4b083a6d47b16..8c11810b98ccb 100644 --- a/api/src/state.rs +++ b/api/src/state.rs @@ -287,10 +287,7 @@ impl StateApi { let (ledger_info, ledger_version, state_view) = self.context.state_view(ledger_version)?; let bytes = state_view - .as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .find_resource(&state_view, address, &tag) .context(format!( "Failed to query DB to check for {} at {}", @@ -308,10 +305,7 @@ impl StateApi { match accept_type { AcceptType::Json => { let resource = state_view - .as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .try_into_resource(&tag, &bytes) .context("Failed to deserialize resource data retrieved from DB") .map_err(|err| { @@ -412,10 +406,8 @@ impl StateApi { .context .state_view(ledger_version.map(|inner| inner.0))?; - let converter = state_view.as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ); + let converter = + state_view.as_converter(self.context.db.clone(), self.context.indexer_reader.clone()); // Convert key to lookup version for DB let vm_key = converter diff --git a/api/src/transactions.rs b/api/src/transactions.rs index b0bd7a1b49b15..a0433209a6bf8 100644 --- a/api/src/transactions.rs +++ b/api/src/transactions.rs @@ -898,7 +898,7 @@ impl TransactionsApi { state_view .as_converter( self.context.db.clone(), - self.context.table_info_reader.clone(), + self.context.indexer_reader.clone(), ) .try_into_onchain_transaction(timestamp, txn) .context("Failed to convert on chain transaction to Transaction") @@ -911,10 +911,7 @@ impl TransactionsApi { })? }, TransactionData::Pending(txn) => state_view - .as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .try_into_pending_transaction(*txn) .context("Failed to convert on pending transaction to Transaction") .map_err(|err| { @@ -1092,10 +1089,7 @@ impl TransactionsApi { SubmitTransactionPost::Json(data) => self .context .latest_state_view_poem(ledger_info)? - .as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .try_into_signed_transaction_poem(data.0, self.context.chain_id()) .context("Failed to create SignedTransaction from SubmitTransactionRequest") .map_err(|err| { @@ -1173,7 +1167,7 @@ impl TransactionsApi { .enumerate() .map(|(index, txn)| { self.context.latest_state_view_poem(ledger_info)? - .as_converter(self.context.db.clone(), self.context.table_info_reader.clone()) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .try_into_signed_transaction_poem(txn, self.context.chain_id()) .context(format!("Failed to create SignedTransaction from SubmitTransactionRequest at position {}", index)) .map_err(|err| { @@ -1264,7 +1258,7 @@ impl TransactionsApi { // We provide the pending transaction so that users have the hash associated let pending_txn = state_view - .as_converter(self.context.db.clone(), self.context.table_info_reader.clone()) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .try_into_pending_transaction_poem(txn) .context("Failed to build PendingTransaction from mempool response, even though it said the request was accepted") .map_err(|err| SubmitTransactionError::internal_with_code( @@ -1495,10 +1489,7 @@ impl TransactionsApi { let ledger_info = self.context.get_latest_ledger_info()?; let state_view = self.context.latest_state_view_poem(&ledger_info)?; let raw_txn: RawTransaction = state_view - .as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .try_into_raw_transaction_poem(request.transaction, self.context.chain_id()) .context("The given transaction is invalid") .map_err(|err| { diff --git a/api/src/view_function.rs b/api/src/view_function.rs index 08335c5e7005a..fa6b31e4f732b 100644 --- a/api/src/view_function.rs +++ b/api/src/view_function.rs @@ -95,7 +95,7 @@ fn view_request( let view_function: ViewFunction = match request { ViewFunctionRequest::Json(data) => state_view - .as_converter(context.db.clone(), context.table_info_reader.clone()) + .as_converter(context.db.clone(), context.indexer_reader.clone()) .convert_view_function(data.0) .map_err(|err| { BasicErrorWith404::bad_request_with_code( @@ -167,7 +167,7 @@ fn view_request( }, AcceptType::Json => { let return_types = state_view - .as_converter(context.db.clone(), context.table_info_reader.clone()) + .as_converter(context.db.clone(), context.indexer_reader.clone()) .function_return_types(&view_function) .and_then(|tys| { tys.into_iter() @@ -187,7 +187,7 @@ fn view_request( .zip(return_types.into_iter()) .map(|(v, ty)| { state_view - .as_converter(context.db.clone(), context.table_info_reader.clone()) + .as_converter(context.db.clone(), context.indexer_reader.clone()) .try_into_move_value(&ty, &v) }) .collect::>>() diff --git a/api/types/Cargo.toml b/api/types/Cargo.toml index 1ff09c771eed3..4f395c7457dd9 100644 --- a/api/types/Cargo.toml +++ b/api/types/Cargo.toml @@ -16,7 +16,6 @@ rust-version = { workspace = true } anyhow = { workspace = true } aptos-config = { workspace = true } aptos-crypto = { workspace = true } -aptos-db-indexer = { workspace = true } aptos-framework = { workspace = true } aptos-logger = { workspace = true } aptos-openapi = { workspace = true } diff --git a/api/types/src/convert.rs b/api/types/src/convert.rs index 0b464c9369019..a5d54c2b4d42f 100644 --- a/api/types/src/convert.rs +++ b/api/types/src/convert.rs @@ -18,7 +18,6 @@ use crate::{ }; use anyhow::{bail, ensure, format_err, Context as AnyhowContext, Result}; use aptos_crypto::{hash::CryptoHash, HashValue}; -use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_logger::{sample, sample::SampleRate}; use aptos_resource_viewer::AptosValueAnnotator; use aptos_storage_interface::DbReader; @@ -26,6 +25,7 @@ use aptos_types::{ access_path::{AccessPath, Path}, chain_id::ChainId, contract_event::{ContractEvent, EventWithVersion}, + indexer::indexer_db_reader::IndexerReader, state_store::{ state_key::{inner::StateKeyInner, StateKey}, table::{TableHandle, TableInfo}, @@ -66,19 +66,19 @@ const OBJECT_STRUCT: &IdentStr = ident_str!("Object"); pub struct MoveConverter<'a, S> { inner: AptosValueAnnotator<'a, S>, db: Arc, - table_info_reader: Option>, + indexer_reader: Option>, } impl<'a, S: StateView> MoveConverter<'a, S> { pub fn new( inner: &'a S, db: Arc, - table_info_reader: Option>, + indexer_reader: Option>, ) -> Self { Self { inner: AptosValueAnnotator::new(inner), db, - table_info_reader, + indexer_reader, } } @@ -982,9 +982,9 @@ impl<'a, S: StateView> MoveConverter<'a, S> { } fn get_table_info(&self, handle: TableHandle) -> Result> { - if let Some(table_info_reader) = self.table_info_reader.as_ref() { - // Attempt to get table_info from the table_info_reader if it exists - Ok(table_info_reader.get_table_info(handle)?) + if let Some(indexer_reader) = self.indexer_reader.as_ref() { + // Attempt to get table_info from the indexer_reader if it exists + Ok(indexer_reader.get_table_info(handle)?) } else if self.db.indexer_enabled() { // Attempt to get table_info from the db if indexer is enabled Ok(Some(self.db.get_table_info(handle)?)) @@ -1088,7 +1088,7 @@ pub trait AsConverter { fn as_converter( &self, db: Arc, - table_info_reader: Option>, + indexer_reader: Option>, ) -> MoveConverter; } @@ -1096,9 +1096,9 @@ impl AsConverter for R { fn as_converter( &self, db: Arc, - table_info_reader: Option>, + indexer_reader: Option>, ) -> MoveConverter { - MoveConverter::new(self, db, table_info_reader) + MoveConverter::new(self, db, indexer_reader) } } diff --git a/aptos-node/src/lib.rs b/aptos-node/src/lib.rs index 76503ed9fd20d..3de1a14cc9612 100644 --- a/aptos-node/src/lib.rs +++ b/aptos-node/src/lib.rs @@ -207,6 +207,7 @@ pub struct AptosHandle { _peer_monitoring_service_runtime: Runtime, _state_sync_runtimes: StateSyncRuntimes, _telemetry_runtime: Option, + _indexer_db_runtime: Option, } /// Start an Aptos node @@ -688,6 +689,7 @@ pub fn setup_environment_and_start_node( indexer_table_info_runtime, indexer_runtime, indexer_grpc_runtime, + internal_indexer_db_runtime, ) = services::bootstrap_api_and_indexer(&node_config, db_rw.clone(), chain_id)?; // Create mempool and get the consensus to mempool sender @@ -774,6 +776,7 @@ pub fn setup_environment_and_start_node( _peer_monitoring_service_runtime: peer_monitoring_service_runtime, _state_sync_runtimes: state_sync_runtimes, _telemetry_runtime: telemetry_runtime, + _indexer_db_runtime: internal_indexer_db_runtime, }) } diff --git a/aptos-node/src/services.rs b/aptos-node/src/services.rs index 9f850a62046b8..60cad89ed598e 100644 --- a/aptos-node/src/services.rs +++ b/aptos-node/src/services.rs @@ -11,10 +11,12 @@ use aptos_consensus::{ }; use aptos_consensus_notifications::ConsensusNotifier; use aptos_data_client::client::AptosDataClient; -use aptos_db_indexer::table_info_reader::TableInfoReader; +use aptos_db_indexer::indexer_reader::IndexerReaders; use aptos_event_notifications::{DbBackedOnChainConfig, ReconfigNotificationListener}; use aptos_indexer_grpc_fullnode::runtime::bootstrap as bootstrap_indexer_grpc; -use aptos_indexer_grpc_table_info::runtime::bootstrap as bootstrap_indexer_table_info; +use aptos_indexer_grpc_table_info::runtime::{ + bootstrap as bootstrap_indexer_table_info, bootstrap_internal_indexer_db, +}; use aptos_logger::{debug, telemetry_log_writer::TelemetryLog, LoggerFilterUpdater}; use aptos_mempool::{network::MempoolSyncMsg, MempoolClientRequest, QuorumStoreRequest}; use aptos_mempool_notifications::MempoolNotificationListener; @@ -27,7 +29,7 @@ use aptos_peer_monitoring_service_server::{ use aptos_peer_monitoring_service_types::PeerMonitoringServiceMessage; use aptos_storage_interface::{DbReader, DbReaderWriter}; use aptos_time_service::TimeService; -use aptos_types::chain_id::ChainId; +use aptos_types::{chain_id::ChainId, indexer::indexer_db_reader::IndexerReader}; use aptos_validator_transaction_pool::VTxnPoolState; use futures::channel::{mpsc, mpsc::Sender}; use std::{sync::Arc, time::Instant}; @@ -48,6 +50,7 @@ pub fn bootstrap_api_and_indexer( Option, Option, Option, + Option, )> { // Create the mempool client and sender let (mempool_client_sender, mempool_client_receiver) = @@ -63,18 +66,27 @@ pub fn bootstrap_api_and_indexer( None => (None, None), }; + let (db_indexer_runtime, txn_event_reader) = + match bootstrap_internal_indexer_db(node_config, db_rw.clone()) { + Some((runtime, db_indexer)) => (Some(runtime), Some(db_indexer)), + None => (None, None), + }; + + let indexer_readers = IndexerReaders::new(indexer_async_v2, txn_event_reader); + // Create the API runtime - let table_info_reader: Option> = indexer_async_v2.map(|arc| { - let trait_object: Arc = arc; + let indexer_reader: Option> = indexer_readers.map(|readers| { + let trait_object: Arc = Arc::new(readers); trait_object }); + let api_runtime = if node_config.api.enabled { Some(bootstrap_api( node_config, chain_id, db_rw.reader.clone(), mempool_client_sender.clone(), - table_info_reader.clone(), + indexer_reader.clone(), )?) } else { None @@ -86,7 +98,7 @@ pub fn bootstrap_api_and_indexer( chain_id, db_rw.reader.clone(), mempool_client_sender.clone(), - table_info_reader, + indexer_reader, ); // Create the indexer runtime @@ -103,6 +115,7 @@ pub fn bootstrap_api_and_indexer( indexer_table_info_runtime, indexer_runtime, indexer_grpc, + db_indexer_runtime, )) } diff --git a/aptos-node/src/storage.rs b/aptos-node/src/storage.rs index b5f6780fbab39..ab196fa0162c9 100644 --- a/aptos-node/src/storage.rs +++ b/aptos-node/src/storage.rs @@ -73,7 +73,6 @@ pub(crate) fn bootstrap_db( let db_backup_service = start_backup_service(node_config.storage.backup_service_address, fast_sync_db); - (db_arc as Arc, db_rw, Some(db_backup_service)) }, }; diff --git a/config/src/config/internal_indexer_db_config.rs b/config/src/config/internal_indexer_db_config.rs new file mode 100644 index 0000000000000..7f238cc9c3306 --- /dev/null +++ b/config/src/config/internal_indexer_db_config.rs @@ -0,0 +1,43 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)] +pub struct InternalIndexerDBConfig { + pub enable_transaction: bool, + pub enable_event: bool, + pub batch_size: usize, +} + +impl InternalIndexerDBConfig { + pub fn new(enable_transaction: bool, enable_event: bool, batch_size: usize) -> Self { + Self { + enable_transaction, + enable_event, + batch_size, + } + } + + pub fn enable_transaction(&self) -> bool { + self.enable_transaction + } + + pub fn enable_event(&self) -> bool { + self.enable_event + } + + pub fn batch_size(&self) -> usize { + self.batch_size + } +} + +impl Default for InternalIndexerDBConfig { + fn default() -> Self { + Self { + enable_transaction: false, + enable_event: false, + batch_size: 10_000, + } + } +} diff --git a/config/src/config/mod.rs b/config/src/config/mod.rs index 5d50c8dd84264..6d133ade1c3fe 100644 --- a/config/src/config/mod.rs +++ b/config/src/config/mod.rs @@ -20,6 +20,7 @@ mod indexer_config; mod indexer_grpc_config; mod indexer_table_info_config; mod inspection_service_config; +pub mod internal_indexer_db_config; mod jwk_consensus_config; mod logger_config; mod mempool_config; diff --git a/config/src/config/node_config.rs b/config/src/config/node_config.rs index ecdcfde372271..f124dee3b2a33 100644 --- a/config/src/config/node_config.rs +++ b/config/src/config/node_config.rs @@ -5,6 +5,7 @@ use super::{DagConsensusConfig, IndexerTableInfoConfig}; use crate::{ config::{ consensus_observer_config::ConsensusObserverConfig, dkg_config::DKGConfig, + internal_indexer_db_config::InternalIndexerDBConfig, jwk_consensus_config::JWKConsensusConfig, netbench_config::NetbenchConfig, node_config_loader::NodeConfigLoader, node_startup_config::NodeStartupConfig, persistable_config::PersistableConfig, utils::RootPath, AdminServiceConfig, ApiConfig, @@ -83,6 +84,8 @@ pub struct NodeConfig { pub storage: StorageConfig, #[serde(default)] pub validator_network: Option, + #[serde(default)] + pub indexer_db_config: InternalIndexerDBConfig, } impl NodeConfig { diff --git a/crates/indexer/src/indexer/fetcher.rs b/crates/indexer/src/indexer/fetcher.rs index ba3615d46e1cd..e4d5cee0bd65a 100644 --- a/crates/indexer/src/indexer/fetcher.rs +++ b/crates/indexer/src/indexer/fetcher.rs @@ -242,7 +242,7 @@ async fn fetch_nexts( let mut block_height_bcs = aptos_api_types::U64::from(block_height); let state_view = context.latest_state_view().unwrap(); - let converter = state_view.as_converter(context.db.clone(), context.table_info_reader.clone()); + let converter = state_view.as_converter(context.db.clone(), context.indexer_reader.clone()); let mut transactions = vec![]; for (ind, raw_txn) in raw_txns.into_iter().enumerate() { diff --git a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs index 8d704256110d8..ec74174693fc5 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs @@ -7,7 +7,6 @@ use crate::{ }; use aptos_api::context::Context; use aptos_config::config::NodeConfig; -use aptos_db_indexer::table_info_reader::TableInfoReader; use aptos_logger::info; use aptos_mempool::MempoolClientSender; use aptos_protos::{ @@ -19,7 +18,7 @@ use aptos_protos::{ util::timestamp::FILE_DESCRIPTOR_SET as UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET, }; use aptos_storage_interface::DbReader; -use aptos_types::chain_id::ChainId; +use aptos_types::{chain_id::ChainId, indexer::indexer_db_reader::IndexerReader}; use std::{net::ToSocketAddrs, sync::Arc}; use tokio::runtime::Runtime; use tonic::{codec::CompressionEncoding, transport::Server}; @@ -35,7 +34,7 @@ pub fn bootstrap( chain_id: ChainId, db: Arc, mp_sender: MempoolClientSender, - table_info_reader: Option>, + indexer_reader: Option>, ) -> Option { if !config.indexer_grpc.enabled { return None; @@ -57,7 +56,7 @@ pub fn bootstrap( db, mp_sender, node_config, - table_info_reader, + indexer_reader, )); let service_context = ServiceContext { context: context.clone(), diff --git a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs index 3964f65ac2401..8c856fc68a405 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs @@ -339,8 +339,7 @@ impl IndexerStreamCoordinator { let first_version = raw_txns.first().map(|txn| txn.version).unwrap(); let state_view = context.latest_state_view().unwrap(); - let converter = - state_view.as_converter(context.db.clone(), context.table_info_reader.clone()); + let converter = state_view.as_converter(context.db.clone(), context.indexer_reader.clone()); // Enrich data with block metadata let (_, _, block_event) = context diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs new file mode 100644 index 0000000000000..13d76c1355065 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs @@ -0,0 +1,72 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_config::config::NodeConfig; +use aptos_db_indexer::{db_indexer::DBIndexer, db_ops::open_internal_indexer_db}; +use aptos_indexer_grpc_utils::counters::{log_grpc_step, IndexerGrpcStep}; +use aptos_storage_interface::DbReader; +use std::sync::Arc; + +const SERVICE_TYPE: &str = "internal_indexer_db_service"; +const INTERNAL_INDEXER_DB: &str = "internal_indexer_db"; + +pub struct InternalIndexerDBService { + pub db_indexer: Arc, +} + +impl InternalIndexerDBService { + pub fn new(db_reader: Arc, node_config: &NodeConfig) -> Self { + let db_path = node_config + .storage + .get_dir_paths() + .default_root_path() + .join(INTERNAL_INDEXER_DB); + let rocksdb_config = node_config.storage.rocksdb_configs.index_db_config; + let db = Arc::new( + open_internal_indexer_db(db_path, &rocksdb_config) + .expect("Failed to open up indexer db initially"), + ); + + let internal_db_indexer = Arc::new(DBIndexer::new( + db, + db_reader, + &node_config.indexer_db_config, + )); + Self { + db_indexer: internal_db_indexer, + } + } + + pub fn get_db_indexer(&self) -> Arc { + Arc::clone(&self.db_indexer) + } + + pub async fn run(&mut self) { + let mut start_version = self.db_indexer.get_persisted_version().unwrap_or(0); + loop { + let start_time: std::time::Instant = std::time::Instant::now(); + let next_version = self + .db_indexer + .process_a_batch(Some(start_version)) + .expect("Failed to run internal db indexer"); + + if next_version == start_version { + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + continue; + } + log_grpc_step( + SERVICE_TYPE, + IndexerGrpcStep::InternalIndexerDBProcessed, + Some(start_version as i64), + Some(next_version as i64), + None, + None, + Some(start_time.elapsed().as_secs_f64()), + None, + Some((next_version - start_version) as i64), + None, + ); + start_version = next_version; + } + } +} diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs index 2d311d0902e48..2fa7aa4af3f65 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs @@ -2,5 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 pub mod backup_restore; +pub mod internal_indexer_db_service; pub mod runtime; pub mod table_info_service; diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs index cb5aa9531e880..68f97c20cfaf5 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs @@ -1,10 +1,12 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::table_info_service::TableInfoService; +use crate::{ + internal_indexer_db_service::InternalIndexerDBService, table_info_service::TableInfoService, +}; use aptos_api::context::Context; use aptos_config::config::NodeConfig; -use aptos_db_indexer::{db_ops::open_db, db_v2::IndexerAsyncV2}; +use aptos_db_indexer::{db_indexer::DBIndexer, db_ops::open_db, db_v2::IndexerAsyncV2}; use aptos_mempool::MempoolClientSender; use aptos_storage_interface::DbReaderWriter; use aptos_types::chain_id::ChainId; @@ -13,6 +15,25 @@ use tokio::runtime::Runtime; const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db"; +pub fn bootstrap_internal_indexer_db( + config: &NodeConfig, + db_rw: DbReaderWriter, +) -> Option<(Runtime, Arc)> { + if !(config.indexer_db_config.enable_event() || config.indexer_db_config.enable_transaction()) { + return None; + } + let runtime = aptos_runtimes::spawn_named_runtime("index-db".to_string(), None); + // Set up db config and open up the db initially to read metadata + let mut indexer_service = InternalIndexerDBService::new(db_rw.reader, config); + let db_indexer = indexer_service.get_db_indexer(); + // Spawn task for db indexer + runtime.spawn(async move { + indexer_service.run().await; + }); + + Some((runtime, db_indexer)) +} + /// Creates a runtime which creates a thread pool which sets up fullnode indexer table info service /// Returns corresponding Tokio runtime pub fn bootstrap( diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs index 8f1228a2ebdbc..b3746eee88f20 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/counters.rs @@ -58,6 +58,8 @@ pub enum IndexerGrpcStep { TableInfoProcessedBatch, // [Indexer Table Info] Processed transactions from fullnode TableInfoProcessed, + // [Indexer Indices] Processed transactions from AptosDB + InternalIndexerDBProcessed, } impl IndexerGrpcStep { @@ -91,6 +93,7 @@ impl IndexerGrpcStep { // Table info service steps IndexerGrpcStep::TableInfoProcessedBatch => "1", IndexerGrpcStep::TableInfoProcessed => "2", + IndexerGrpcStep::InternalIndexerDBProcessed => "1", } } @@ -136,6 +139,9 @@ impl IndexerGrpcStep { IndexerGrpcStep::TableInfoProcessed => { "[Indexer Table Info] Processed successfully" } + IndexerGrpcStep::InternalIndexerDBProcessed => { + "[Indexer DB indices] Processed successfully" + } } } } diff --git a/execution/executor/Cargo.toml b/execution/executor/Cargo.toml index aec6c3ca9ae54..da2d69e3cc59d 100644 --- a/execution/executor/Cargo.toml +++ b/execution/executor/Cargo.toml @@ -24,6 +24,7 @@ aptos-infallible = { workspace = true } aptos-logger = { workspace = true } aptos-metrics-core = { workspace = true } aptos-scratchpad = { workspace = true } +aptos-sdk = { workspace = true } aptos-storage-interface = { workspace = true } aptos-types = { workspace = true } aptos-vm = { workspace = true } @@ -42,6 +43,8 @@ serde = { workspace = true } aptos-cached-packages = { workspace = true } aptos-config = { workspace = true } aptos-db = { workspace = true } +aptos-db-indexer = { workspace = true, features = ["fuzzing"] } +aptos-db-indexer-schemas = { workspace = true, features = ["fuzzing"] } aptos-executor-test-helpers = { workspace = true } aptos-genesis = { workspace = true } aptos-storage-interface = { workspace = true } diff --git a/execution/executor/tests/internal_indexer_test.rs b/execution/executor/tests/internal_indexer_test.rs new file mode 100644 index 0000000000000..3a9ed1ea0df89 --- /dev/null +++ b/execution/executor/tests/internal_indexer_test.rs @@ -0,0 +1,172 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_cached_packages::aptos_stdlib; +use aptos_config::config::{internal_indexer_db_config::InternalIndexerDBConfig, RocksdbConfig}; +use aptos_db::AptosDB; +use aptos_db_indexer::{db_indexer::DBIndexer, db_ops::open_internal_indexer_db}; +use aptos_executor_test_helpers::{ + gen_block_id, gen_ledger_info_with_sigs, integration_test_impl::create_db_and_executor, +}; +use aptos_executor_types::BlockExecutorTrait; +use aptos_sdk::{ + transaction_builder::TransactionFactory, + types::{AccountKey, LocalAccount}, +}; +use aptos_storage_interface::DbReader; +use aptos_temppath::TempPath; +use aptos_types::{ + account_config::aptos_test_root_address, + block_metadata::BlockMetadata, + chain_id::ChainId, + test_helpers::transaction_test_helpers::TEST_BLOCK_EXECUTOR_ONCHAIN_CONFIG, + transaction::{ + signature_verified_transaction::into_signature_verified_block, Transaction, + Transaction::UserTransaction, WriteSetPayload, + }, +}; +use rand::SeedableRng; +use std::sync::Arc; + +const B: u64 = 1_000_000_000; + +#[cfg(test)] +pub fn create_test_db() -> (Arc, LocalAccount) { + // create test db + let path = aptos_temppath::TempPath::new(); + let (genesis, validators) = aptos_vm_genesis::test_genesis_change_set_and_validators(Some(1)); + let genesis_txn = Transaction::GenesisTransaction(WriteSetPayload::Direct(genesis)); + let core_resources_account: LocalAccount = LocalAccount::new( + aptos_test_root_address(), + AccountKey::from_private_key(aptos_vm_genesis::GENESIS_KEYPAIR.0.clone()), + 0, + ); + let (aptos_db, _db, executor, _waypoint) = + create_db_and_executor(path.path(), &genesis_txn, true); + let parent_block_id = executor.committed_block_id(); + + // This generates accounts that do not overlap with genesis + let seed = [3u8; 32]; + let mut rng = ::rand::rngs::StdRng::from_seed(seed); + let signer = aptos_types::validator_signer::ValidatorSigner::new( + validators[0].data.owner_address, + validators[0].consensus_key.clone(), + ); + let account1 = LocalAccount::generate(&mut rng); + let account2 = LocalAccount::generate(&mut rng); + let account3 = LocalAccount::generate(&mut rng); + + let txn_factory = TransactionFactory::new(ChainId::test()); + + let block1_id = gen_block_id(1); + let block1_meta = Transaction::BlockMetadata(BlockMetadata::new( + block1_id, + 1, + 0, + signer.author(), + vec![0], + vec![], + 1, + )); + let tx1 = core_resources_account + .sign_with_transaction_builder(txn_factory.create_user_account(account1.public_key())); + let tx2 = core_resources_account + .sign_with_transaction_builder(txn_factory.create_user_account(account2.public_key())); + let tx3 = core_resources_account + .sign_with_transaction_builder(txn_factory.create_user_account(account3.public_key())); + // Create account1 with 2T coins. + let txn1 = core_resources_account + .sign_with_transaction_builder(txn_factory.mint(account1.address(), 2_000 * B)); + // Create account2 with 1.2T coins. + let txn2 = core_resources_account + .sign_with_transaction_builder(txn_factory.mint(account2.address(), 1_200 * B)); + // Create account3 with 1T coins. + let txn3 = core_resources_account + .sign_with_transaction_builder(txn_factory.mint(account3.address(), 1_000 * B)); + + // Transfer 20B coins from account1 to account2. + // balance: <1.98T, 1.22T, 1T + let txn4 = + account1.sign_with_transaction_builder(txn_factory.transfer(account2.address(), 20 * B)); + + // Transfer 10B coins from account2 to account3. + // balance: <1.98T, <1.21T, 1.01T + let txn5 = + account2.sign_with_transaction_builder(txn_factory.transfer(account3.address(), 10 * B)); + + // Transfer 70B coins from account1 to account3. + // balance: <1.91T, <1.21T, 1.08T + let txn6 = + account1.sign_with_transaction_builder(txn_factory.transfer(account3.address(), 70 * B)); + + let reconfig1 = core_resources_account.sign_with_transaction_builder( + txn_factory.payload(aptos_stdlib::aptos_governance_force_end_epoch_test_only()), + ); + + let block1: Vec<_> = into_signature_verified_block(vec![ + block1_meta, + UserTransaction(tx1), + UserTransaction(tx2), + UserTransaction(tx3), + UserTransaction(txn1), + UserTransaction(txn2), + UserTransaction(txn3), + UserTransaction(txn4), + UserTransaction(txn5), + UserTransaction(txn6), + UserTransaction(reconfig1), + ]); + let output1 = executor + .execute_block( + (block1_id, block1.clone()).into(), + parent_block_id, + TEST_BLOCK_EXECUTOR_ONCHAIN_CONFIG, + ) + .unwrap(); + let li1 = gen_ledger_info_with_sigs(1, &output1, block1_id, &[signer.clone()]); + executor.commit_blocks(vec![block1_id], li1).unwrap(); + (aptos_db, core_resources_account) +} + +#[test] +fn test_db_indexer_data() { + use std::{thread, time::Duration}; + // create test db + let (aptos_db, core_account) = create_test_db(); + let total_version = aptos_db.get_synced_version().unwrap(); + assert_eq!(total_version, 11); + let rocksdb_config = RocksdbConfig::default(); + let temp_path = TempPath::new(); + let db = Arc::new( + open_internal_indexer_db(temp_path.as_ref(), &rocksdb_config) + .expect("Failed to open up indexer db initially"), + ); + let db_indexer = DBIndexer::new( + db.clone(), + aptos_db, + &InternalIndexerDBConfig::new(true, true, 2), + ); + // assert the data matches the expected data + let mut version = db_indexer.get_persisted_version().unwrap(); + assert_eq!(version, 0); + while version < total_version { + version = db_indexer.process_a_batch(Some(version)).unwrap(); + } + // wait for the commit to finish + thread::sleep(Duration::from_millis(100)); + // indexer has process all the transactions + assert_eq!(db_indexer.get_persisted_version().unwrap(), total_version); + + let txn_iter = db_indexer + .get_account_transaction_version_iter(core_account.address(), 0, 1000, 1000) + .unwrap(); + let res: Vec<_> = txn_iter.collect(); + + // core account submitted 7 transactions including last reconfig txn, and the first transaction is version 2 + assert!(res.len() == 7); + assert!(res[0].as_ref().unwrap().1 == 2); + + let x = db_indexer.get_event_by_key_iter().unwrap(); + let res: Vec<_> = x.collect(); + assert!(res.len() == 14); +} diff --git a/storage/aptosdb/Cargo.toml b/storage/aptosdb/Cargo.toml index 909fcbc7bf2fa..a3a5c3d439002 100644 --- a/storage/aptosdb/Cargo.toml +++ b/storage/aptosdb/Cargo.toml @@ -18,6 +18,7 @@ aptos-accumulator = { workspace = true } aptos-config = { workspace = true } aptos-crypto = { workspace = true } aptos-db-indexer = { workspace = true } +aptos-db-indexer-schemas = { workspace = true } aptos-executor = { workspace = true } aptos-executor-types = { workspace = true } aptos-experimental-runtimes = { workspace = true } diff --git a/storage/aptosdb/src/db/include/aptosdb_reader.rs b/storage/aptosdb/src/db/include/aptosdb_reader.rs index 12895e32c76cc..ed21b05cea629 100644 --- a/storage/aptosdb/src/db/include/aptosdb_reader.rs +++ b/storage/aptosdb/src/db/include/aptosdb_reader.rs @@ -284,6 +284,7 @@ impl DbReader for AptosDB { }) } + /// TODO(bowu): Deprecate after internal index migration fn get_events( &self, event_key: &EventKey, @@ -724,6 +725,19 @@ impl DbReader for AptosDB { self.state_store.get_usage(version) }) } + + + fn get_event_by_version_and_index( + &self, + version: Version, + index: u64, + ) -> Result { + gauged_api("get_event_by_version_and_index", || { + self.error_if_ledger_pruned("Event", version)?; + self.event_store.get_event_by_version_and_index(version, index) + }) + + } } impl AptosDB { @@ -829,6 +843,7 @@ impl AptosDB { }) } + /// TODO(bowu): Deprecate after internal index migration fn get_events_by_event_key( &self, event_key: &EventKey, diff --git a/storage/aptosdb/src/event_store/mod.rs b/storage/aptosdb/src/event_store/mod.rs index 5c0881a686c9a..1ff4d31c330d4 100644 --- a/storage/aptosdb/src/event_store/mod.rs +++ b/storage/aptosdb/src/event_store/mod.rs @@ -8,10 +8,7 @@ use super::AptosDB; use crate::{ - schema::{ - event::EventSchema, event_accumulator::EventAccumulatorSchema, - event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema, - }, + schema::{event::EventSchema, event_accumulator::EventAccumulatorSchema}, utils::iterators::EventsByVersionIter, }; use anyhow::anyhow; @@ -20,6 +17,9 @@ use aptos_crypto::{ hash::{CryptoHash, EventAccumulatorHasher}, HashValue, }; +use aptos_db_indexer_schemas::schema::{ + event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema, +}; use aptos_schemadb::{iterator::SchemaIterator, schema::ValueCodec, ReadOptions, SchemaBatch, DB}; use aptos_storage_interface::{db_ensure as ensure, db_other_bail, AptosDbError, Result}; use aptos_types::{ diff --git a/storage/aptosdb/src/ledger_db/event_db.rs b/storage/aptosdb/src/ledger_db/event_db.rs index d0de057c53d40..d475062f9d8ac 100644 --- a/storage/aptosdb/src/ledger_db/event_db.rs +++ b/storage/aptosdb/src/ledger_db/event_db.rs @@ -7,8 +7,6 @@ use crate::{ db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue}, event::EventSchema, event_accumulator::EventAccumulatorSchema, - event_by_key::EventByKeySchema, - event_by_version::EventByVersionSchema, }, utils::iterators::EventsByVersionIter, }; @@ -17,6 +15,9 @@ use aptos_crypto::{ hash::{CryptoHash, EventAccumulatorHasher}, HashValue, }; +use aptos_db_indexer_schemas::schema::{ + event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema, +}; use aptos_schemadb::{SchemaBatch, DB}; use aptos_storage_interface::{AptosDbError, Result}; use aptos_types::{ diff --git a/storage/aptosdb/src/ledger_db/transaction_db.rs b/storage/aptosdb/src/ledger_db/transaction_db.rs index afb02edb07f27..f2f3b48b26641 100644 --- a/storage/aptosdb/src/ledger_db/transaction_db.rs +++ b/storage/aptosdb/src/ledger_db/transaction_db.rs @@ -6,12 +6,12 @@ use crate::{ schema::{ db_metadata::{DbMetadataKey, DbMetadataSchema, DbMetadataValue}, transaction::TransactionSchema, - transaction_by_account::TransactionByAccountSchema, transaction_by_hash::TransactionByHashSchema, }, utils::iterators::ExpectContinuousVersions, }; use aptos_crypto::hash::{CryptoHash, HashValue}; +use aptos_db_indexer_schemas::schema::transaction_by_account::TransactionByAccountSchema; use aptos_schemadb::{SchemaBatch, DB}; use aptos_storage_interface::{AptosDbError, Result}; use aptos_types::transaction::{Transaction, TransactionToCommit, Version}; diff --git a/storage/aptosdb/src/schema/mod.rs b/storage/aptosdb/src/schema/mod.rs index 6cd8d23935193..8210dc0fdf08a 100644 --- a/storage/aptosdb/src/schema/mod.rs +++ b/storage/aptosdb/src/schema/mod.rs @@ -12,8 +12,6 @@ pub(crate) mod db_metadata; pub(crate) mod epoch_by_version; pub(crate) mod event; pub(crate) mod event_accumulator; -pub(crate) mod event_by_key; -pub(crate) mod event_by_version; pub(crate) mod jellyfish_merkle_node; pub(crate) mod ledger_info; pub(crate) mod stale_node_index; @@ -26,7 +24,6 @@ pub(crate) mod state_value_index; pub(crate) mod transaction; pub(crate) mod transaction_accumulator; pub(crate) mod transaction_auxiliary_data; -pub(crate) mod transaction_by_account; pub(crate) mod transaction_by_hash; pub(crate) mod transaction_info; pub(crate) mod version_data; @@ -94,8 +91,6 @@ pub mod fuzzing { assert_no_panic_decoding::(data); assert_no_panic_decoding::(data); assert_no_panic_decoding::(data); - assert_no_panic_decoding::(data); - assert_no_panic_decoding::(data); assert_no_panic_decoding::( data, ); @@ -117,9 +112,6 @@ pub mod fuzzing { assert_no_panic_decoding::< super::transaction_auxiliary_data::TransactionAuxiliaryDataSchema, >(data); - assert_no_panic_decoding::( - data, - ); assert_no_panic_decoding::(data); assert_no_panic_decoding::(data); assert_no_panic_decoding::(data); diff --git a/storage/aptosdb/src/transaction_store/mod.rs b/storage/aptosdb/src/transaction_store/mod.rs index d957e11caaa0a..cb7e15a1ee1a1 100644 --- a/storage/aptosdb/src/transaction_store/mod.rs +++ b/storage/aptosdb/src/transaction_store/mod.rs @@ -4,9 +4,10 @@ //! This file defines transaction store APIs that are related to committed signed transactions. -use crate::{ - ledger_db::LedgerDb, schema::transaction_by_account::TransactionByAccountSchema, - utils::iterators::AccountTransactionVersionIter, +use crate::ledger_db::LedgerDb; +use aptos_db_indexer_schemas::{ + schema::transaction_by_account::TransactionByAccountSchema, + utils::AccountTransactionVersionIter, }; use aptos_schemadb::SchemaBatch; use aptos_storage_interface::{AptosDbError, Result}; diff --git a/storage/aptosdb/src/utils/iterators.rs b/storage/aptosdb/src/utils/iterators.rs index 1d57433b21875..dca7ae3b19bef 100644 --- a/storage/aptosdb/src/utils/iterators.rs +++ b/storage/aptosdb/src/utils/iterators.rs @@ -5,14 +5,12 @@ use crate::{ schema::{ event::EventSchema, ledger_info::LedgerInfoSchema, state_value::StateValueSchema, state_value_index::StateValueIndexSchema, - transaction_by_account::TransactionByAccountSchema, }, state_kv_db::StateKvDb, }; use aptos_schemadb::{iterator::SchemaIterator, ReadOptions}; use aptos_storage_interface::{db_ensure as ensure, AptosDbError, Result}; use aptos_types::{ - account_address::AccountAddress, contract_event::ContractEvent, ledger_info::LedgerInfoWithSignatures, state_store::{ @@ -253,89 +251,6 @@ impl<'a> Iterator for PrefixedStateValueIterator<'a> { } } -pub struct AccountTransactionVersionIter<'a> { - inner: SchemaIterator<'a, TransactionByAccountSchema>, - address: AccountAddress, - expected_next_seq_num: Option, - end_seq_num: u64, - prev_version: Option, - ledger_version: Version, -} - -impl<'a> AccountTransactionVersionIter<'a> { - pub(crate) fn new( - inner: SchemaIterator<'a, TransactionByAccountSchema>, - address: AccountAddress, - end_seq_num: u64, - ledger_version: Version, - ) -> Self { - Self { - inner, - address, - end_seq_num, - ledger_version, - expected_next_seq_num: None, - prev_version: None, - } - } -} - -impl<'a> AccountTransactionVersionIter<'a> { - fn next_impl(&mut self) -> Result> { - Ok(match self.inner.next().transpose()? { - Some(((address, seq_num), version)) => { - // No more transactions sent by this account. - if address != self.address { - return Ok(None); - } - if seq_num >= self.end_seq_num { - return Ok(None); - } - - // Ensure seq_num_{i+1} == seq_num_{i} + 1 - if let Some(expected_seq_num) = self.expected_next_seq_num { - ensure!( - seq_num == expected_seq_num, - "DB corruption: account transactions sequence numbers are not contiguous: \ - actual: {}, expected: {}", - seq_num, - expected_seq_num, - ); - }; - - // Ensure version_{i+1} > version_{i} - if let Some(prev_version) = self.prev_version { - ensure!( - prev_version < version, - "DB corruption: account transaction versions are not strictly increasing: \ - previous version: {}, current version: {}", - prev_version, - version, - ); - } - - // No more transactions (in this view of the ledger). - if version > self.ledger_version { - return Ok(None); - } - - self.expected_next_seq_num = Some(seq_num + 1); - self.prev_version = Some(version); - Some((seq_num, version)) - }, - None => None, - }) - } -} - -impl<'a> Iterator for AccountTransactionVersionIter<'a> { - type Item = Result<(u64, Version)>; - - fn next(&mut self) -> Option { - self.next_impl().transpose() - } -} - pub struct EpochEndingLedgerInfoIter<'a> { inner: SchemaIterator<'a, LedgerInfoSchema>, next_epoch: u64, diff --git a/storage/indexer/Cargo.toml b/storage/indexer/Cargo.toml index b57c5b7da1435..58bba0559e724 100644 --- a/storage/indexer/Cargo.toml +++ b/storage/indexer/Cargo.toml @@ -15,13 +15,17 @@ rust-version = { workspace = true } [dependencies] anyhow = { workspace = true } aptos-config = { workspace = true } +aptos-db-indexer-schemas = { workspace = true } aptos-logger = { workspace = true } aptos-resource-viewer = { workspace = true } aptos-rocksdb-options = { workspace = true } aptos-schemadb = { workspace = true } +aptos-sdk = { workspace = true } aptos-storage-interface = { workspace = true } aptos-types = { workspace = true } +aptos-vm-genesis = { workspace = true } bcs = { workspace = true } +byteorder = { workspace = true } bytes = { workspace = true } dashmap = { workspace = true } move-core-types = { workspace = true } diff --git a/storage/indexer/src/db_indexer.rs b/storage/indexer/src/db_indexer.rs new file mode 100644 index 0000000000000..b01e1c906fddd --- /dev/null +++ b/storage/indexer/src/db_indexer.rs @@ -0,0 +1,388 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_config::config::internal_indexer_db_config::InternalIndexerDBConfig; +use aptos_db_indexer_schemas::{ + metadata::{MetadataKey, MetadataValue}, + schema::{ + event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema, + indexer_metadata::InternalIndexerMetadataSchema, + transaction_by_account::TransactionByAccountSchema, + }, + utils::{ + error_if_too_many_requested, get_first_seq_num_and_limit, AccountTransactionVersionIter, + MAX_REQUEST_LIMIT, + }, +}; +use aptos_schemadb::{SchemaBatch, DB}; +use aptos_storage_interface::{ + db_ensure as ensure, db_other_bail as bail, AptosDbError, DbReader, Result, +}; +use aptos_types::{ + account_address::AccountAddress, + contract_event::{ContractEvent, EventWithVersion}, + event::EventKey, + indexer::indexer_db_reader::Order, + transaction::{AccountTransactionsWithProof, Transaction, Version}, +}; +use std::{ + cmp::min, + sync::{ + mpsc::{self, Receiver, Sender}, + Arc, + }, + thread, +}; + +pub struct DBCommitter { + db: Arc, + receiver: Receiver>, +} + +impl DBCommitter { + pub fn new(db: Arc, receiver: Receiver>) -> Self { + Self { db, receiver } + } + + pub fn run(&self) { + loop { + let batch_opt = self + .receiver + .recv() + .expect("Failed to receive batch from DB Indexer"); + if let Some(batch) = batch_opt { + self.db + .write_schemas(batch) + .expect("Failed to write batch to indexer db"); + } else { + break; + } + } + } +} + +pub struct DBIndexer { + pub db: Arc, + pub main_db_reader: Arc, + config: InternalIndexerDBConfig, + sender: Sender>, + committer_handle: Option>, +} + +impl Drop for DBIndexer { + fn drop(&mut self) { + if let Some(handle) = self.committer_handle.take() { + self.sender + .send(None) + .expect("Failed to send None to DBIndexer committer"); + handle + .join() + .expect("DBIndexer committer thread fails to join"); + } + } +} + +impl DBIndexer { + pub fn new( + db: Arc, + db_reader: Arc, + config: &InternalIndexerDBConfig, + ) -> Self { + let (sender, reciver) = mpsc::channel(); + + let db_clone = db.clone(); + let committer_handle = thread::spawn(move || { + let committer = DBCommitter::new(db, reciver); + committer.run(); + }); + + Self { + db: db_clone, + main_db_reader: db_reader, + config: *config, + sender, + committer_handle: Some(committer_handle), + } + } + + pub fn get_persisted_version(&self) -> Result { + // read the latest key from the db + self.db + .get::(&MetadataKey::LatestVersion)? + .map_or(Ok(0), |metavalue| Ok(metavalue.expect_version())) + } + + pub fn event_enabled(&self) -> bool { + self.config.enable_event + } + + pub fn transaction_enabled(&self) -> bool { + self.config.enable_transaction + } + + fn get_main_db_iter( + &self, + start_version: Version, + num_transactions: u64, + ) -> Result)>> + '_> { + let txn_iter = self + .main_db_reader + .get_transaction_iterator(start_version, num_transactions)?; + let event_vec_iter = self + .main_db_reader + .get_events_iterator(start_version, num_transactions)?; + let zipped = txn_iter + .zip(event_vec_iter) + .map(|(txn_res, event_vec_res)| { + let txn = txn_res?; + let event_vec = event_vec_res?; + Ok((txn, event_vec)) + }); + Ok(zipped) + } + + fn get_num_of_transactions(&self, version: Version) -> Result { + let highest_version = self.main_db_reader.get_synced_version()?; + Ok(min( + self.config.batch_size as u64, + highest_version - version, + )) + } + + pub fn process_a_batch(&self, start_version: Option) -> Result { + let mut version = start_version.unwrap_or(0); + + let num_transactions = self.get_num_of_transactions(version)?; + let mut db_iter = self.get_main_db_iter(version, num_transactions)?; + let batch = SchemaBatch::new(); + db_iter.try_for_each(|res| { + let (txn, events) = res?; + if let Some(txn) = txn.try_as_signed_user_txn() { + if self.config.enable_transaction { + batch.put::( + &(txn.sender(), txn.sequence_number()), + &version, + )?; + } + + if self.config.enable_event { + events.iter().enumerate().for_each(|(idx, event)| { + if let ContractEvent::V1(v1) = event { + batch + .put::( + &(*v1.key(), v1.sequence_number()), + &(version, idx as u64), + ) + .expect("Failed to put events by key to a batch"); + batch + .put::( + &(*v1.key(), version, v1.sequence_number()), + &(idx as u64), + ) + .expect("Failed to put events by version to a batch"); + } + }); + } + } + version += 1; + Ok::<(), AptosDbError>(()) + })?; + assert_eq!(num_transactions, version - start_version.unwrap_or(0)); + batch.put::( + &MetadataKey::LatestVersion, + &MetadataValue::Version(version - 1), + )?; + self.sender + .send(Some(batch)) + .map_err(|e| AptosDbError::Other(e.to_string()))?; + Ok(version) + } + + pub fn get_account_transaction_version_iter( + &self, + address: AccountAddress, + min_seq_num: u64, + num_versions: u64, + ledger_version: Version, + ) -> Result { + let mut iter = self.db.iter::()?; + iter.seek(&(address, min_seq_num))?; + Ok(AccountTransactionVersionIter::new( + iter, + address, + min_seq_num + .checked_add(num_versions) + .ok_or(AptosDbError::TooManyRequested(min_seq_num, num_versions))?, + ledger_version, + )) + } + + pub fn get_latest_sequence_number( + &self, + ledger_version: Version, + event_key: &EventKey, + ) -> Result> { + let mut iter = self.db.iter::()?; + iter.seek_for_prev(&(*event_key, ledger_version, u64::max_value()))?; + + Ok(iter.next().transpose()?.and_then( + |((key, _version, seq), _idx)| if &key == event_key { Some(seq) } else { None }, + )) + } + + /// Given `event_key` and `start_seq_num`, returns events identified by transaction version and + /// index among all events emitted by the same transaction. Result won't contain records with a + /// transaction version > `ledger_version` and is in ascending order. + pub fn lookup_events_by_key( + &self, + event_key: &EventKey, + start_seq_num: u64, + limit: u64, + ledger_version: u64, + ) -> Result< + Vec<( + u64, // sequence number + Version, // transaction version it belongs to + u64, // index among events for the same transaction + )>, + > { + let mut iter = self.db.iter::()?; + iter.seek(&(*event_key, start_seq_num))?; + + let mut result = Vec::new(); + let mut cur_seq = start_seq_num; + for res in iter.take(limit as usize) { + let ((path, seq), (ver, idx)) = res?; + if path != *event_key || ver > ledger_version { + break; + } + if seq != cur_seq { + let msg = if cur_seq == start_seq_num { + "First requested event is probably pruned." + } else { + "DB corruption: Sequence number not continuous." + }; + bail!("{} expected: {}, actual: {}", msg, cur_seq, seq); + } + result.push((seq, ver, idx)); + cur_seq += 1; + } + + Ok(result) + } + + #[cfg(any(test, feature = "fuzzing"))] + pub fn get_event_by_key_iter( + &self, + ) -> Result + '_>> { + let mut iter = self.db.iter::()?; + iter.seek_to_first(); + Ok(Box::new(iter.map(|res| { + let ((event_key, seq_num), (txn_version, idx)) = res.unwrap(); + (event_key, txn_version, seq_num, idx) + }))) + } + + pub fn get_events( + &self, + event_key: &EventKey, + start: u64, + order: Order, + limit: u64, + ledger_version: Version, + ) -> anyhow::Result> { + self.get_events_by_event_key(event_key, start, order, limit, ledger_version) + } + + pub fn get_events_by_event_key( + &self, + event_key: &EventKey, + start_seq_num: u64, + order: Order, + limit: u64, + ledger_version: Version, + ) -> anyhow::Result> { + error_if_too_many_requested(limit, MAX_REQUEST_LIMIT)?; + let get_latest = order == Order::Descending && start_seq_num == u64::max_value(); + + let cursor = if get_latest { + // Caller wants the latest, figure out the latest seq_num. + // In the case of no events on that path, use 0 and expect empty result below. + self.get_latest_sequence_number(ledger_version, event_key)? + .unwrap_or(0) + } else { + start_seq_num + }; + + // Convert requested range and order to a range in ascending order. + let (first_seq, real_limit) = get_first_seq_num_and_limit(order, cursor, limit)?; + + // Query the index. + let mut event_indices = + self.lookup_events_by_key(event_key, first_seq, real_limit, ledger_version)?; + + // When descending, it's possible that user is asking for something beyond the latest + // sequence number, in which case we will consider it a bad request and return an empty + // list. + // For example, if the latest sequence number is 100, and the caller is asking for 110 to + // 90, we will get 90 to 100 from the index lookup above. Seeing that the last item + // is 100 instead of 110 tells us 110 is out of bound. + if order == Order::Descending { + if let Some((seq_num, _, _)) = event_indices.last() { + if *seq_num < cursor { + event_indices = Vec::new(); + } + } + } + + let mut events_with_version = event_indices + .into_iter() + .map(|(seq, ver, idx)| { + let event = self + .main_db_reader + .get_event_by_version_and_index(ver, idx)?; + let v0 = match &event { + ContractEvent::V1(event) => event, + ContractEvent::V2(_) => bail!("Unexpected module event"), + }; + ensure!( + seq == v0.sequence_number(), + "Index broken, expected seq:{}, actual:{}", + seq, + v0.sequence_number() + ); + Ok(EventWithVersion::new(ver, event)) + }) + .collect::>>()?; + if order == Order::Descending { + events_with_version.reverse(); + } + + Ok(events_with_version) + } + + pub fn get_account_transactions( + &self, + address: AccountAddress, + start_seq_num: u64, + limit: u64, + include_events: bool, + ledger_version: Version, + ) -> anyhow::Result { + error_if_too_many_requested(limit, MAX_REQUEST_LIMIT)?; + + let txns_with_proofs = self + .get_account_transaction_version_iter(address, start_seq_num, limit, ledger_version)? + .map(|result| { + let (_seq_num, txn_version) = result?; + self.main_db_reader.get_transaction_by_version( + txn_version, + ledger_version, + include_events, + ) + }) + .collect::>>()?; + + Ok(AccountTransactionsWithProof::new(txns_with_proofs)) + } +} diff --git a/storage/indexer/src/db_ops.rs b/storage/indexer/src/db_ops.rs index 2f109bed876f0..e8a9a1fbf2048 100644 --- a/storage/indexer/src/db_ops.rs +++ b/storage/indexer/src/db_ops.rs @@ -1,22 +1,37 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::schema::column_families; use anyhow::Result; use aptos_config::config::RocksdbConfig; +use aptos_db_indexer_schemas::schema::{column_families, internal_indexer_column_families}; use aptos_rocksdb_options::gen_rocksdb_options; use aptos_schemadb::DB; use std::{mem, path::Path}; +const INTERNAL_INDEXER_DB_NAME: &str = "internal_indexer_db"; +const TABLE_INFO_DB_NAME: &str = "index_async_v2_db"; + pub fn open_db>(db_path: P, rocksdb_config: &RocksdbConfig) -> Result { Ok(DB::open( db_path, - "index_asnync_v2_db", + TABLE_INFO_DB_NAME, column_families(), &gen_rocksdb_options(rocksdb_config, false), )?) } +pub fn open_internal_indexer_db>( + db_path: P, + rocksdb_config: &RocksdbConfig, +) -> Result { + Ok(DB::open( + db_path, + INTERNAL_INDEXER_DB_NAME, + internal_indexer_column_families(), + &gen_rocksdb_options(rocksdb_config, false), + )?) +} + pub fn close_db(db: DB) { mem::drop(db) } diff --git a/storage/indexer/src/db_v2.rs b/storage/indexer/src/db_v2.rs index 273b754f01d8a..01ad40d096251 100644 --- a/storage/indexer/src/db_v2.rs +++ b/storage/indexer/src/db_v2.rs @@ -5,7 +5,7 @@ /// At the end of the migration to migrate table info mapping /// from storage critical path to indexer, the other file will be removed /// and this file will be moved to /ecosystem/indexer-grpc/indexer-grpc-table-info. -use crate::{ +use aptos_db_indexer_schemas::{ metadata::{MetadataKey, MetadataValue}, schema::{indexer_metadata::IndexerMetadataSchema, table_info::TableInfoSchema}, }; diff --git a/storage/indexer/src/indexer_reader.rs b/storage/indexer/src/indexer_reader.rs new file mode 100644 index 0000000000000..e5a9535027d75 --- /dev/null +++ b/storage/indexer/src/indexer_reader.rs @@ -0,0 +1,116 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{db_indexer::DBIndexer, db_v2::IndexerAsyncV2}; +use anyhow::{bail, Result}; +use aptos_types::{ + account_address::AccountAddress, + contract_event::EventWithVersion, + event::EventKey, + indexer::indexer_db_reader::{IndexerReader, Order}, + state_store::table::{TableHandle, TableInfo}, + transaction::{AccountTransactionsWithProof, Version}, +}; +use std::sync::Arc; + +pub struct IndexerReaders { + table_info_reader: Option>, + db_indexer_reader: Option>, +} + +impl IndexerReaders { + pub fn new( + table_info_reader: Option>, + db_indexer_reader: Option>, + ) -> Option { + if table_info_reader.is_none() && db_indexer_reader.is_none() { + None + } else { + Some(Self { + table_info_reader, + db_indexer_reader, + }) + } + } +} + +impl IndexerReader for IndexerReaders { + fn get_table_info(&self, handle: TableHandle) -> Result> { + if let Some(table_info_reader) = &self.table_info_reader { + return Ok(table_info_reader.get_table_info_with_retry(handle)?); + } + bail!("Table info reader is not available") + } + + fn get_events( + &self, + event_key: &EventKey, + start: u64, + order: Order, + limit: u64, + ledger_version: Version, + ) -> Result> { + if let Some(db_indexer_reader) = &self.db_indexer_reader { + if db_indexer_reader.event_enabled() { + return db_indexer_reader.get_events( + event_key, + start, + order, + limit, + ledger_version, + ); + } else { + bail!("Event index is not enabled") + } + } + bail!("DB Indexer reader is not available") + } + + fn get_events_by_event_key( + &self, + event_key: &EventKey, + start_seq_num: u64, + order: Order, + limit: u64, + ledger_version: Version, + ) -> Result> { + if let Some(db_indexer_reader) = &self.db_indexer_reader { + if db_indexer_reader.event_enabled() { + return db_indexer_reader.get_events_by_event_key( + event_key, + start_seq_num, + order, + limit, + ledger_version, + ); + } else { + bail!("Event index is not enabled") + } + } + bail!("DB indexer reader is not available") + } + + fn get_account_transactions( + &self, + address: AccountAddress, + start_seq_num: u64, + limit: u64, + include_events: bool, + ledger_version: Version, + ) -> Result { + if let Some(db_indexer_reader) = &self.db_indexer_reader { + if db_indexer_reader.transaction_enabled() { + return db_indexer_reader.get_account_transactions( + address, + start_seq_num, + limit, + include_events, + ledger_version, + ); + } else { + bail!("Transaction by account index is not enabled") + } + } + bail!("DB indexer reader is not available") + } +} diff --git a/storage/indexer/src/lib.rs b/storage/indexer/src/lib.rs index 3b82dd83ba574..44fd7c58e28c7 100644 --- a/storage/indexer/src/lib.rs +++ b/storage/indexer/src/lib.rs @@ -3,20 +3,19 @@ /// TODO(jill): deprecate Indexer once Indexer Async V2 is ready mod db; +pub mod db_indexer; pub mod db_ops; pub mod db_v2; -mod metadata; -mod schema; -pub mod table_info_reader; +pub mod indexer_reader; -use crate::{ - db::INDEX_DB_NAME, +use crate::db::INDEX_DB_NAME; +use aptos_config::config::RocksdbConfig; +use aptos_db_indexer_schemas::{ metadata::{MetadataKey, MetadataValue}, schema::{ column_families, indexer_metadata::IndexerMetadataSchema, table_info::TableInfoSchema, }, }; -use aptos_config::config::RocksdbConfig; use aptos_logger::warn; use aptos_resource_viewer::{AnnotatedMoveValue, AptosValueAnnotator}; use aptos_rocksdb_options::gen_rocksdb_options; diff --git a/storage/indexer/src/table_info_reader.rs b/storage/indexer/src/table_info_reader.rs deleted file mode 100644 index f0ddcff4aa1ff..0000000000000 --- a/storage/indexer/src/table_info_reader.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::db_v2::IndexerAsyncV2; -use aptos_storage_interface::Result; -use aptos_types::state_store::table::{TableHandle, TableInfo}; - -/// Table info reader is to create a thin interface for other services to read the db data, -/// this standalone db is officially not part of the AptosDB anymore. -/// For services that need table info mapping, they need to acquire this reader in the FN bootstrapping stage. -pub trait TableInfoReader: Send + Sync { - fn get_table_info(&self, handle: TableHandle) -> Result>; -} - -impl TableInfoReader for IndexerAsyncV2 { - fn get_table_info(&self, handle: TableHandle) -> Result> { - self.get_table_info_with_retry(handle) - } -} diff --git a/storage/indexer_schemas/Cargo.toml b/storage/indexer_schemas/Cargo.toml new file mode 100644 index 0000000000000..8c20573789274 --- /dev/null +++ b/storage/indexer_schemas/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "aptos-db-indexer-schemas" +description = "AptosDB Internal Indexer DB schemas" +version = "0.1.0" + +# Workspace inherited keys +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +anyhow = { workspace = true } +aptos-config = { workspace = true } +aptos-logger = { workspace = true } +aptos-resource-viewer = { workspace = true } +aptos-rocksdb-options = { workspace = true } +aptos-schemadb = { workspace = true } +aptos-storage-interface = { workspace = true } +aptos-types = { workspace = true } +bcs = { workspace = true } +byteorder = { workspace = true } +bytes = { workspace = true } +dashmap = { workspace = true } +move-core-types = { workspace = true } +proptest = { workspace = true, optional = true } +proptest-derive = { workspace = true, optional = true } +serde = { workspace = true } + +[dev-dependencies] +aptos-proptest-helpers = { workspace = true } +aptos-schemadb = { workspace = true, features = ["fuzzing"] } +aptos-types = { workspace = true, features = ["fuzzing"] } +proptest = { workspace = true } +proptest-derive = { workspace = true } +rand = { workspace = true } + +[features] +default = [] +fuzzing = ["proptest", "proptest-derive", "aptos-types/fuzzing", "aptos-schemadb/fuzzing"] diff --git a/storage/indexer_schemas/src/lib.rs b/storage/indexer_schemas/src/lib.rs new file mode 100644 index 0000000000000..4e2dbb0c8338a --- /dev/null +++ b/storage/indexer_schemas/src/lib.rs @@ -0,0 +1,6 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod metadata; +pub mod schema; +pub mod utils; diff --git a/storage/indexer/src/metadata.rs b/storage/indexer_schemas/src/metadata.rs similarity index 90% rename from storage/indexer/src/metadata.rs rename to storage/indexer_schemas/src/metadata.rs index 116709252727a..f0af9f42b51b6 100644 --- a/storage/indexer/src/metadata.rs +++ b/storage/indexer_schemas/src/metadata.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] #[cfg_attr(any(test, feature = "fuzzing"), derive(proptest_derive::Arbitrary))] -pub(crate) enum MetadataValue { +pub enum MetadataValue { Version(Version), } @@ -20,6 +20,6 @@ impl MetadataValue { #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] #[cfg_attr(any(test, feature = "fuzzing"), derive(proptest_derive::Arbitrary))] -pub(crate) enum MetadataKey { +pub enum MetadataKey { LatestVersion, } diff --git a/storage/aptosdb/src/schema/event_by_key/mod.rs b/storage/indexer_schemas/src/schema/event_by_key/mod.rs similarity index 92% rename from storage/aptosdb/src/schema/event_by_key/mod.rs rename to storage/indexer_schemas/src/schema/event_by_key/mod.rs index 465b10b271d2e..5e147b708298a 100644 --- a/storage/aptosdb/src/schema/event_by_key/mod.rs +++ b/storage/indexer_schemas/src/schema/event_by_key/mod.rs @@ -11,17 +11,17 @@ //! | event_key | seq_num | txn_ver | idx | //! ``` -use crate::schema::{ensure_slice_len_eq, EVENT_BY_KEY_CF_NAME}; +use crate::{schema::EVENT_BY_KEY_CF_NAME, utils::ensure_slice_len_eq}; use anyhow::Result; use aptos_schemadb::{ - define_schema, + define_pub_schema, schema::{KeyCodec, ValueCodec}, }; use aptos_types::{event::EventKey, transaction::Version}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::mem::size_of; -define_schema!(EventByKeySchema, Key, Value, EVENT_BY_KEY_CF_NAME); +define_pub_schema!(EventByKeySchema, Key, Value, EVENT_BY_KEY_CF_NAME); type SeqNum = u64; type Key = (EventKey, SeqNum); diff --git a/storage/aptosdb/src/schema/event_by_key/test.rs b/storage/indexer_schemas/src/schema/event_by_key/test.rs similarity index 100% rename from storage/aptosdb/src/schema/event_by_key/test.rs rename to storage/indexer_schemas/src/schema/event_by_key/test.rs diff --git a/storage/aptosdb/src/schema/event_by_version/mod.rs b/storage/indexer_schemas/src/schema/event_by_version/mod.rs similarity index 91% rename from storage/aptosdb/src/schema/event_by_version/mod.rs rename to storage/indexer_schemas/src/schema/event_by_version/mod.rs index 287109b5de521..ac33c9219671b 100644 --- a/storage/aptosdb/src/schema/event_by_version/mod.rs +++ b/storage/indexer_schemas/src/schema/event_by_version/mod.rs @@ -11,17 +11,17 @@ //! | event_key | txn_ver | seq_num | idx | //! ``` -use crate::schema::{ensure_slice_len_eq, EVENT_BY_VERSION_CF_NAME}; +use crate::{schema::EVENT_BY_VERSION_CF_NAME, utils::ensure_slice_len_eq}; use anyhow::Result; use aptos_schemadb::{ - define_schema, + define_pub_schema, schema::{KeyCodec, ValueCodec}, }; use aptos_types::{event::EventKey, transaction::Version}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::mem::size_of; -define_schema!(EventByVersionSchema, Key, Value, EVENT_BY_VERSION_CF_NAME); +define_pub_schema!(EventByVersionSchema, Key, Value, EVENT_BY_VERSION_CF_NAME); type SeqNum = u64; type Key = (EventKey, Version, SeqNum); diff --git a/storage/aptosdb/src/schema/event_by_version/test.rs b/storage/indexer_schemas/src/schema/event_by_version/test.rs similarity index 100% rename from storage/aptosdb/src/schema/event_by_version/test.rs rename to storage/indexer_schemas/src/schema/event_by_version/test.rs diff --git a/storage/indexer/src/schema/indexer_metadata/mod.rs b/storage/indexer_schemas/src/schema/indexer_metadata/mod.rs similarity index 56% rename from storage/indexer/src/schema/indexer_metadata/mod.rs rename to storage/indexer_schemas/src/schema/indexer_metadata/mod.rs index f8615ba5b8ee9..ede60f2679159 100644 --- a/storage/indexer/src/schema/indexer_metadata/mod.rs +++ b/storage/indexer_schemas/src/schema/indexer_metadata/mod.rs @@ -4,17 +4,18 @@ //! This module defines physical storage schema storing metadata for the internal indexer //! +use super::INTERNAL_INDEXER_METADATA_CF_NAME; use crate::{ metadata::{MetadataKey, MetadataValue}, schema::INDEXER_METADATA_CF_NAME, }; use anyhow::Result; use aptos_schemadb::{ - define_schema, + define_pub_schema, schema::{KeyCodec, ValueCodec}, }; -define_schema!( +define_pub_schema!( IndexerMetadataSchema, MetadataKey, MetadataValue, @@ -41,5 +42,32 @@ impl ValueCodec for MetadataValue { } } +define_pub_schema!( + InternalIndexerMetadataSchema, + MetadataKey, + MetadataValue, + INTERNAL_INDEXER_METADATA_CF_NAME +); + +impl KeyCodec for MetadataKey { + fn encode_key(&self) -> Result> { + Ok(bcs::to_bytes(self)?) + } + + fn decode_key(data: &[u8]) -> Result { + Ok(bcs::from_bytes(data)?) + } +} + +impl ValueCodec for MetadataValue { + fn encode_value(&self) -> Result> { + Ok(bcs::to_bytes(self)?) + } + + fn decode_value(data: &[u8]) -> Result { + Ok(bcs::from_bytes(data)?) + } +} + #[cfg(test)] mod test; diff --git a/storage/indexer/src/schema/indexer_metadata/test.rs b/storage/indexer_schemas/src/schema/indexer_metadata/test.rs similarity index 66% rename from storage/indexer/src/schema/indexer_metadata/test.rs rename to storage/indexer_schemas/src/schema/indexer_metadata/test.rs index a1117d2b17ea3..60d03d590e5b4 100644 --- a/storage/indexer/src/schema/indexer_metadata/test.rs +++ b/storage/indexer_schemas/src/schema/indexer_metadata/test.rs @@ -13,6 +13,14 @@ proptest! { ) { assert_encode_decode::(&tag, &metadata); } + + #[test] + fn test_encode_decode_internal_indexer_metadata( + key in any::(), + metadata in any::(), + ) { + assert_encode_decode::(&key, &metadata); + } } test_no_panic_decoding!(IndexerMetadataSchema); diff --git a/storage/indexer/src/schema/mod.rs b/storage/indexer_schemas/src/schema/mod.rs similarity index 51% rename from storage/indexer/src/schema/mod.rs rename to storage/indexer_schemas/src/schema/mod.rs index 90ba38374788d..9f40618496d5c 100644 --- a/storage/indexer/src/schema/mod.rs +++ b/storage/indexer_schemas/src/schema/mod.rs @@ -6,14 +6,20 @@ //! //! All schemas are `pub(crate)` so not shown in rustdoc, refer to the source code to see details. -pub(crate) mod indexer_metadata; -pub(crate) mod table_info; - +pub mod event_by_key; +pub mod event_by_version; +pub mod indexer_metadata; +pub mod table_info; +pub mod transaction_by_account; use aptos_schemadb::ColumnFamilyName; pub const DEFAULT_COLUMN_FAMILY_NAME: ColumnFamilyName = "default"; pub const INDEXER_METADATA_CF_NAME: ColumnFamilyName = "indexer_metadata"; +pub const INTERNAL_INDEXER_METADATA_CF_NAME: ColumnFamilyName = "internal_indexer_metadata"; pub const TABLE_INFO_CF_NAME: ColumnFamilyName = "table_info"; +pub const EVENT_BY_KEY_CF_NAME: ColumnFamilyName = "event_by_key"; +pub const EVENT_BY_VERSION_CF_NAME: ColumnFamilyName = "event_by_version"; +pub const TRANSACTION_BY_ACCOUNT_CF_NAME: ColumnFamilyName = "transaction_by_account"; pub fn column_families() -> Vec { vec![ @@ -22,3 +28,13 @@ pub fn column_families() -> Vec { TABLE_INFO_CF_NAME, ] } + +pub fn internal_indexer_column_families() -> Vec { + vec![ + /* empty cf */ DEFAULT_COLUMN_FAMILY_NAME, + INTERNAL_INDEXER_METADATA_CF_NAME, + EVENT_BY_KEY_CF_NAME, + EVENT_BY_VERSION_CF_NAME, + TRANSACTION_BY_ACCOUNT_CF_NAME, + ] +} diff --git a/storage/indexer/src/schema/table_info/mod.rs b/storage/indexer_schemas/src/schema/table_info/mod.rs similarity index 90% rename from storage/indexer/src/schema/table_info/mod.rs rename to storage/indexer_schemas/src/schema/table_info/mod.rs index 33dbb82f51184..3d3858bbfa507 100644 --- a/storage/indexer/src/schema/table_info/mod.rs +++ b/storage/indexer_schemas/src/schema/table_info/mod.rs @@ -12,12 +12,12 @@ use crate::schema::TABLE_INFO_CF_NAME; use anyhow::Result; use aptos_schemadb::{ - define_schema, + define_pub_schema, schema::{KeyCodec, ValueCodec}, }; use aptos_types::state_store::table::{TableHandle, TableInfo}; -define_schema!(TableInfoSchema, TableHandle, TableInfo, TABLE_INFO_CF_NAME); +define_pub_schema!(TableInfoSchema, TableHandle, TableInfo, TABLE_INFO_CF_NAME); impl KeyCodec for TableHandle { fn encode_key(&self) -> Result> { diff --git a/storage/indexer/src/schema/table_info/test.rs b/storage/indexer_schemas/src/schema/table_info/test.rs similarity index 100% rename from storage/indexer/src/schema/table_info/test.rs rename to storage/indexer_schemas/src/schema/table_info/test.rs diff --git a/storage/aptosdb/src/schema/transaction_by_account/mod.rs b/storage/indexer_schemas/src/schema/transaction_by_account/mod.rs similarity index 93% rename from storage/aptosdb/src/schema/transaction_by_account/mod.rs rename to storage/indexer_schemas/src/schema/transaction_by_account/mod.rs index 988b2ad53d125..511788c5f5bb2 100644 --- a/storage/aptosdb/src/schema/transaction_by_account/mod.rs +++ b/storage/indexer_schemas/src/schema/transaction_by_account/mod.rs @@ -11,17 +11,17 @@ //! | address | seq_num | txn_ver | //! ``` -use crate::schema::{ensure_slice_len_eq, TRANSACTION_BY_ACCOUNT_CF_NAME}; +use crate::{schema::TRANSACTION_BY_ACCOUNT_CF_NAME, utils::ensure_slice_len_eq}; use anyhow::Result; use aptos_schemadb::{ - define_schema, + define_pub_schema, schema::{KeyCodec, ValueCodec}, }; use aptos_types::{account_address::AccountAddress, transaction::Version}; use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; use std::{convert::TryFrom, mem::size_of}; -define_schema!( +define_pub_schema!( TransactionByAccountSchema, Key, Version, diff --git a/storage/aptosdb/src/schema/transaction_by_account/test.rs b/storage/indexer_schemas/src/schema/transaction_by_account/test.rs similarity index 100% rename from storage/aptosdb/src/schema/transaction_by_account/test.rs rename to storage/indexer_schemas/src/schema/transaction_by_account/test.rs diff --git a/storage/indexer_schemas/src/utils.rs b/storage/indexer_schemas/src/utils.rs new file mode 100644 index 0000000000000..c1309093c271e --- /dev/null +++ b/storage/indexer_schemas/src/utils.rs @@ -0,0 +1,126 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::schema::transaction_by_account::TransactionByAccountSchema; +use aptos_schemadb::iterator::SchemaIterator; +use aptos_storage_interface::{db_ensure as ensure, AptosDbError, Result}; +use aptos_types::{ + account_address::AccountAddress, indexer::indexer_db_reader::Order, transaction::Version, +}; + +pub fn ensure_slice_len_eq(data: &[u8], len: usize) -> Result<()> { + ensure!( + data.len() == len, + "Unexpected data len {}, expected {}.", + data.len(), + len, + ); + Ok(()) +} + +pub const MAX_REQUEST_LIMIT: u64 = 10_000; + +pub fn error_if_too_many_requested(num_requested: u64, max_allowed: u64) -> Result<()> { + if num_requested > max_allowed { + Err(AptosDbError::TooManyRequested(num_requested, max_allowed)) + } else { + Ok(()) + } +} + +// Convert requested range and order to a range in ascending order. +pub fn get_first_seq_num_and_limit(order: Order, cursor: u64, limit: u64) -> Result<(u64, u64)> { + ensure!(limit > 0, "limit should > 0, got {}", limit); + + Ok(if order == Order::Ascending { + (cursor, limit) + } else if limit <= cursor { + (cursor - limit + 1, limit) + } else { + (0, cursor + 1) + }) +} + +// This is a replicate of the AccountTransactionVersionIter from storage/aptosdb crate. +pub struct AccountTransactionVersionIter<'a> { + inner: SchemaIterator<'a, TransactionByAccountSchema>, + address: AccountAddress, + expected_next_seq_num: Option, + end_seq_num: u64, + prev_version: Option, + ledger_version: Version, +} + +impl<'a> AccountTransactionVersionIter<'a> { + pub fn new( + inner: SchemaIterator<'a, TransactionByAccountSchema>, + address: AccountAddress, + end_seq_num: u64, + ledger_version: Version, + ) -> Self { + Self { + inner, + address, + end_seq_num, + ledger_version, + expected_next_seq_num: None, + prev_version: None, + } + } +} + +impl<'a> AccountTransactionVersionIter<'a> { + fn next_impl(&mut self) -> Result> { + Ok(match self.inner.next().transpose()? { + Some(((address, seq_num), version)) => { + // No more transactions sent by this account. + if address != self.address { + return Ok(None); + } + if seq_num >= self.end_seq_num { + return Ok(None); + } + + // Ensure seq_num_{i+1} == seq_num_{i} + 1 + if let Some(expected_seq_num) = self.expected_next_seq_num { + ensure!( + seq_num == expected_seq_num, + "DB corruption: account transactions sequence numbers are not contiguous: \ + actual: {}, expected: {}", + seq_num, + expected_seq_num, + ); + }; + + // Ensure version_{i+1} > version_{i} + if let Some(prev_version) = self.prev_version { + ensure!( + prev_version < version, + "DB corruption: account transaction versions are not strictly increasing: \ + previous version: {}, current version: {}", + prev_version, + version, + ); + } + + // No more transactions (in this view of the ledger). + if version > self.ledger_version { + return Ok(None); + } + + self.expected_next_seq_num = Some(seq_num + 1); + self.prev_version = Some(version); + Some((seq_num, version)) + }, + None => None, + }) + } +} + +impl<'a> Iterator for AccountTransactionVersionIter<'a> { + type Item = Result<(u64, Version)>; + + fn next(&mut self) -> Option { + self.next_impl().transpose() + } +} diff --git a/storage/schemadb/src/schema.rs b/storage/schemadb/src/schema.rs index 34c26607da22a..b0e8181e98d85 100644 --- a/storage/schemadb/src/schema.rs +++ b/storage/schemadb/src/schema.rs @@ -80,6 +80,21 @@ macro_rules! define_schema { }; } +#[macro_export] +macro_rules! define_pub_schema { + ($schema_type:ident, $key_type:ty, $value_type:ty, $cf_name:expr) => { + #[derive(Debug)] + pub struct $schema_type; + + impl $crate::schema::Schema for $schema_type { + type Key = $key_type; + type Value = $value_type; + + const COLUMN_FAMILY_NAME: $crate::ColumnFamilyName = $cf_name; + } + }; +} + /// This trait defines a type that can serve as a [`Schema::Key`]. pub trait KeyCodec: Sized + PartialEq + Debug { /// Converts `self` to bytes to be stored in DB. diff --git a/storage/storage-interface/src/lib.rs b/storage/storage-interface/src/lib.rs index 5199c40f19997..5dd89d48ba9fb 100644 --- a/storage/storage-interface/src/lib.rs +++ b/storage/storage-interface/src/lib.rs @@ -4,6 +4,7 @@ use crate::cached_state_view::ShardedStateCache; use aptos_crypto::{hash::CryptoHash, HashValue}; +pub use aptos_types::indexer::indexer_db_reader::Order; use aptos_types::{ account_address::AccountAddress, account_config::NewBlockEvent, @@ -97,12 +98,6 @@ impl From for Error { } } -#[derive(Clone, Copy, Eq, PartialEq)] -pub enum Order { - Ascending, - Descending, -} - macro_rules! delegate_read { ($( $(#[$($attr:meta)*])* @@ -455,6 +450,12 @@ pub trait DbReader: Send + Sync { /// Returns state storage usage at the end of an epoch. fn get_state_storage_usage(&self, version: Option) -> Result; + + fn get_event_by_version_and_index( + &self, + version: Version, + index: u64, + ) -> Result; ); // end delegated /// Returns the latest ledger info. diff --git a/types/src/indexer/indexer_db_reader.rs b/types/src/indexer/indexer_db_reader.rs new file mode 100644 index 0000000000000..9aac0e3d29fb3 --- /dev/null +++ b/types/src/indexer/indexer_db_reader.rs @@ -0,0 +1,48 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + account_address::AccountAddress, + contract_event::EventWithVersion, + event::EventKey, + state_store::table::{TableHandle, TableInfo}, + transaction::{AccountTransactionsWithProof, Version}, +}; +use anyhow::Result; + +#[derive(Clone, Copy, Eq, PartialEq)] +pub enum Order { + Ascending, + Descending, +} + +pub trait IndexerReader: Send + Sync { + fn get_table_info(&self, handle: TableHandle) -> Result>; + + fn get_events( + &self, + event_key: &EventKey, + start: u64, + order: Order, + limit: u64, + ledger_version: Version, + ) -> Result>; + + fn get_events_by_event_key( + &self, + event_key: &EventKey, + start_seq_num: u64, + order: Order, + limit: u64, + ledger_version: Version, + ) -> Result>; + + fn get_account_transactions( + &self, + address: AccountAddress, + start_seq_num: u64, + limit: u64, + include_events: bool, + ledger_version: Version, + ) -> Result; +} diff --git a/types/src/indexer/mod.rs b/types/src/indexer/mod.rs new file mode 100644 index 0000000000000..1b4cb5194ed86 --- /dev/null +++ b/types/src/indexer/mod.rs @@ -0,0 +1,4 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod indexer_db_reader; diff --git a/types/src/lib.rs b/types/src/lib.rs index 329f56c592920..9081b6c0f0a4d 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -20,6 +20,7 @@ pub mod event; pub mod executable; pub mod fee_statement; pub mod governance; +pub mod indexer; pub mod jwks; pub mod ledger_info; pub mod mempool_status;