From 7e12fcb18ef8c0dddba487c100b56897e665096b Mon Sep 17 00:00:00 2001 From: Bo Wu Date: Tue, 28 May 2024 15:56:23 -0700 Subject: [PATCH] merge two traits --- .../src/fake_context.rs | 1 - api/src/accounts.rs | 16 +-- api/src/context.rs | 19 ++-- api/src/events.rs | 5 +- api/src/runtime.rs | 20 +--- api/src/state.rs | 16 +-- api/src/transactions.rs | 21 ++-- api/src/view_function.rs | 6 +- api/test-context/src/test_context.rs | 1 - api/types/src/convert.rs | 10 +- aptos-node/src/services.rs | 25 ++--- crates/indexer/src/indexer/fetcher.rs | 2 +- crates/indexer/src/runtime.rs | 1 - .../indexer-grpc-fullnode/src/runtime.rs | 7 +- .../src/stream_coordinator.rs | 3 +- .../indexer-grpc-table-info/src/runtime.rs | 1 - .../src/tailer_service.rs | 4 +- .../aptosdb/src/db/include/aptosdb_reader.rs | 14 +-- storage/indexer/src/db_tailer.rs | 18 ++-- storage/indexer/src/indexer_reader.rs | 98 +++++++++++++++++++ storage/indexer/src/lib.rs | 2 +- storage/indexer/src/schema/mod.rs | 2 +- storage/indexer/src/table_info_reader.rs | 19 ---- storage/storage-interface/src/lib.rs | 2 +- types/src/indexer/db_tailer_reader.rs | 5 +- types/src/indexer/mod.rs | 1 - types/src/indexer/table_info_reader.rs | 12 --- 27 files changed, 167 insertions(+), 164 deletions(-) create mode 100644 storage/indexer/src/indexer_reader.rs delete mode 100644 storage/indexer/src/table_info_reader.rs delete mode 100644 types/src/indexer/table_info_reader.rs diff --git a/api/openapi-spec-generator/src/fake_context.rs b/api/openapi-spec-generator/src/fake_context.rs index 4529ac829b0e28..98dee1782b6412 100644 --- a/api/openapi-spec-generator/src/fake_context.rs +++ b/api/openapi-spec-generator/src/fake_context.rs @@ -17,6 +17,5 @@ pub fn get_fake_context() -> Context { mempool.ac_client, NodeConfig::default(), None, /* table info reader */ - None, ) } diff --git a/api/src/accounts.rs b/api/src/accounts.rs index 0193a7626d93e3..1f597ce8552e17 100644 --- a/api/src/accounts.rs +++ b/api/src/accounts.rs @@ -342,10 +342,8 @@ impl Account { let state_view = self .context .latest_state_view_poem(&self.latest_ledger_info)?; - let converter = state_view.as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ); + let converter = state_view + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()); let converted_resources = converter .try_into_resources(resources.iter().map(|(k, v)| (k.clone(), v.as_slice()))) .context("Failed to build move resource response from data in DB") @@ -522,10 +520,7 @@ impl Account { self.context.state_view(Some(self.ledger_version))?; let bytes = state_view - .as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .find_resource(&state_view, self.address, resource_type) .context(format!( "Failed to query DB to check for {} at {}", @@ -543,10 +538,7 @@ impl Account { })?; state_view - .as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .move_struct_fields(resource_type, &bytes) .context("Failed to convert move structs from storage") .map_err(|err| { diff --git a/api/src/context.rs b/api/src/context.rs index 650a04c3fa5050..86e64b386fdbbe 100644 --- a/api/src/context.rs +++ b/api/src/context.rs @@ -33,9 +33,7 @@ use aptos_types::{ chain_id::ChainId, contract_event::EventWithVersion, event::EventKey, - indexer::{ - db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader, - }, + indexer::db_tailer_reader::IndexerReader, ledger_info::LedgerInfoWithSignatures, on_chain_config::{GasSchedule, GasScheduleV2, OnChainConfig, OnChainExecutionConfig}, state_store::{ @@ -76,9 +74,8 @@ pub struct Context { gas_limit_cache: Arc>, view_function_stats: Arc, simulate_txn_stats: Arc, - pub table_info_reader: Option>, + pub indexer_reader: Option>, pub wait_for_hash_active_connections: Arc, - pub txn_event_reader: Option>, } impl std::fmt::Debug for Context { @@ -93,8 +90,7 @@ impl Context { db: Arc, mp_sender: MempoolClientSender, node_config: NodeConfig, - table_info_reader: Option>, - txn_event_reader: Option>, + indexer_reader: Option>, ) -> Self { let (view_function_stats, simulate_txn_stats) = { let log_per_call_stats = node_config.api.periodic_function_stats_sec.is_some(); @@ -131,9 +127,8 @@ impl Context { })), view_function_stats, simulate_txn_stats, - table_info_reader, + indexer_reader, wait_for_hash_active_connections: Arc::new(AtomicUsize::new(0)), - txn_event_reader, } } @@ -421,7 +416,7 @@ impl Context { // We should be able to do an unwrap here, otherwise the above db read would fail. let state_view = self.state_view_at_version(version)?; - let converter = state_view.as_converter(self.db.clone(), self.table_info_reader.clone()); + let converter = state_view.as_converter(self.db.clone(), self.indexer_reader.clone()); // Extract resources from resource groups and flatten into all resources let kvs = kvs @@ -621,7 +616,7 @@ impl Context { } let state_view = self.latest_state_view_poem(ledger_info)?; - let converter = state_view.as_converter(self.db.clone(), self.table_info_reader.clone()); + let converter = state_view.as_converter(self.db.clone(), self.indexer_reader.clone()); let txns: Vec = data .into_iter() .map(|t| { @@ -653,7 +648,7 @@ impl Context { } let state_view = self.latest_state_view_poem(ledger_info)?; - let converter = state_view.as_converter(self.db.clone(), self.table_info_reader.clone()); + let converter = state_view.as_converter(self.db.clone(), self.indexer_reader.clone()); let txns: Vec = data .into_iter() .map(|t| { diff --git a/api/src/events.rs b/api/src/events.rs index 5dc4f2310f789f..49c4fad21ce9f2 100644 --- a/api/src/events.rs +++ b/api/src/events.rs @@ -184,10 +184,7 @@ impl EventsApi { let events = self .context .latest_state_view_poem(&latest_ledger_info)? - .as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .try_into_versioned_events(&events) .context("Failed to convert events from storage into response") .map_err(|err| { diff --git a/api/src/runtime.rs b/api/src/runtime.rs index 3e74927aaf7f84..d5e7ec5a16efe8 100644 --- a/api/src/runtime.rs +++ b/api/src/runtime.rs @@ -13,12 +13,7 @@ use aptos_config::config::{ApiConfig, NodeConfig}; use aptos_logger::info; use aptos_mempool::MempoolClientSender; use aptos_storage_interface::DbReader; -use aptos_types::{ - chain_id::ChainId, - indexer::{ - db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader, - }, -}; +use aptos_types::{chain_id::ChainId, indexer::db_tailer_reader::IndexerReader}; use poem::{ handler, http::Method, @@ -39,20 +34,12 @@ pub fn bootstrap( chain_id: ChainId, db: Arc, mp_sender: MempoolClientSender, - table_info_reader: Option>, - txn_event_reader: Option>, + indexer_reader: Option>, ) -> anyhow::Result { let max_runtime_workers = get_max_runtime_workers(&config.api); let runtime = aptos_runtimes::spawn_named_runtime("api".into(), Some(max_runtime_workers)); - let context = Context::new( - chain_id, - db, - mp_sender, - config.clone(), - table_info_reader, - txn_event_reader, - ); + let context = Context::new(chain_id, db, mp_sender, config.clone(), indexer_reader); attach_poem_to_runtime(runtime.handle(), context.clone(), config, false) .context("Failed to attach poem to runtime")?; @@ -354,7 +341,6 @@ mod tests { context.db.clone(), context.mempool.ac_client.clone(), None, - None, ); assert!(ret.is_ok()); diff --git a/api/src/state.rs b/api/src/state.rs index 4b083a6d47b168..8c11810b98ccb2 100644 --- a/api/src/state.rs +++ b/api/src/state.rs @@ -287,10 +287,7 @@ impl StateApi { let (ledger_info, ledger_version, state_view) = self.context.state_view(ledger_version)?; let bytes = state_view - .as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .find_resource(&state_view, address, &tag) .context(format!( "Failed to query DB to check for {} at {}", @@ -308,10 +305,7 @@ impl StateApi { match accept_type { AcceptType::Json => { let resource = state_view - .as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .try_into_resource(&tag, &bytes) .context("Failed to deserialize resource data retrieved from DB") .map_err(|err| { @@ -412,10 +406,8 @@ impl StateApi { .context .state_view(ledger_version.map(|inner| inner.0))?; - let converter = state_view.as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ); + let converter = + state_view.as_converter(self.context.db.clone(), self.context.indexer_reader.clone()); // Convert key to lookup version for DB let vm_key = converter diff --git a/api/src/transactions.rs b/api/src/transactions.rs index 81dcf59a89e362..5f94b338d00c82 100644 --- a/api/src/transactions.rs +++ b/api/src/transactions.rs @@ -898,7 +898,7 @@ impl TransactionsApi { state_view .as_converter( self.context.db.clone(), - self.context.table_info_reader.clone(), + self.context.indexer_reader.clone(), ) .try_into_onchain_transaction(timestamp, txn) .context("Failed to convert on chain transaction to Transaction") @@ -911,10 +911,7 @@ impl TransactionsApi { })? }, TransactionData::Pending(txn) => state_view - .as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .try_into_pending_transaction(*txn) .context("Failed to convert on pending transaction to Transaction") .map_err(|err| { @@ -1092,10 +1089,7 @@ impl TransactionsApi { SubmitTransactionPost::Json(data) => self .context .latest_state_view_poem(ledger_info)? - .as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .try_into_signed_transaction_poem(data.0, self.context.chain_id()) .context("Failed to create SignedTransaction from SubmitTransactionRequest") .map_err(|err| { @@ -1173,7 +1167,7 @@ impl TransactionsApi { .enumerate() .map(|(index, txn)| { self.context.latest_state_view_poem(ledger_info)? - .as_converter(self.context.db.clone(), self.context.table_info_reader.clone()) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .try_into_signed_transaction_poem(txn, self.context.chain_id()) .context(format!("Failed to create SignedTransaction from SubmitTransactionRequest at position {}", index)) .map_err(|err| { @@ -1264,7 +1258,7 @@ impl TransactionsApi { // We provide the pending transaction so that users have the hash associated let pending_txn = state_view - .as_converter(self.context.db.clone(), self.context.table_info_reader.clone()) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .try_into_pending_transaction_poem(txn) .context("Failed to build PendingTransaction from mempool response, even though it said the request was accepted") .map_err(|err| SubmitTransactionError::internal_with_code( @@ -1500,10 +1494,7 @@ impl TransactionsApi { let ledger_info = self.context.get_latest_ledger_info()?; let state_view = self.context.latest_state_view_poem(&ledger_info)?; let raw_txn: RawTransaction = state_view - .as_converter( - self.context.db.clone(), - self.context.table_info_reader.clone(), - ) + .as_converter(self.context.db.clone(), self.context.indexer_reader.clone()) .try_into_raw_transaction_poem(request.transaction, self.context.chain_id()) .context("The given transaction is invalid") .map_err(|err| { diff --git a/api/src/view_function.rs b/api/src/view_function.rs index 08335c5e7005a9..fa6b31e4f732b4 100644 --- a/api/src/view_function.rs +++ b/api/src/view_function.rs @@ -95,7 +95,7 @@ fn view_request( let view_function: ViewFunction = match request { ViewFunctionRequest::Json(data) => state_view - .as_converter(context.db.clone(), context.table_info_reader.clone()) + .as_converter(context.db.clone(), context.indexer_reader.clone()) .convert_view_function(data.0) .map_err(|err| { BasicErrorWith404::bad_request_with_code( @@ -167,7 +167,7 @@ fn view_request( }, AcceptType::Json => { let return_types = state_view - .as_converter(context.db.clone(), context.table_info_reader.clone()) + .as_converter(context.db.clone(), context.indexer_reader.clone()) .function_return_types(&view_function) .and_then(|tys| { tys.into_iter() @@ -187,7 +187,7 @@ fn view_request( .zip(return_types.into_iter()) .map(|(v, ty)| { state_view - .as_converter(context.db.clone(), context.table_info_reader.clone()) + .as_converter(context.db.clone(), context.indexer_reader.clone()) .try_into_move_value(&ty, &v) }) .collect::>>() diff --git a/api/test-context/src/test_context.rs b/api/test-context/src/test_context.rs index b13c5fe731b4b0..c8786900d73d46 100644 --- a/api/test-context/src/test_context.rs +++ b/api/test-context/src/test_context.rs @@ -146,7 +146,6 @@ pub fn new_test_context( mempool.ac_client.clone(), node_config.clone(), None, /* table info reader */ - None, ); // Configure the testing depending on which API version we're testing. diff --git a/api/types/src/convert.rs b/api/types/src/convert.rs index fc8f895977a757..ae3c483126f9d3 100644 --- a/api/types/src/convert.rs +++ b/api/types/src/convert.rs @@ -25,7 +25,7 @@ use aptos_types::{ access_path::{AccessPath, Path}, chain_id::ChainId, contract_event::{ContractEvent, EventWithVersion}, - indexer::table_info_reader::TableInfoReader, + indexer::db_tailer_reader::IndexerReader, state_store::{ state_key::{inner::StateKeyInner, StateKey}, table::{TableHandle, TableInfo}, @@ -65,14 +65,14 @@ const OBJECT_STRUCT: &IdentStr = ident_str!("Object"); pub struct MoveConverter<'a, S> { inner: AptosValueAnnotator<'a, S>, db: Arc, - table_info_reader: Option>, + table_info_reader: Option>, } impl<'a, S: StateView> MoveConverter<'a, S> { pub fn new( inner: &'a S, db: Arc, - table_info_reader: Option>, + table_info_reader: Option>, ) -> Self { Self { inner: AptosValueAnnotator::new(inner), @@ -1045,7 +1045,7 @@ pub trait AsConverter { fn as_converter( &self, db: Arc, - table_info_reader: Option>, + table_info_reader: Option>, ) -> MoveConverter; } @@ -1053,7 +1053,7 @@ impl AsConverter for R { fn as_converter( &self, db: Arc, - table_info_reader: Option>, + table_info_reader: Option>, ) -> MoveConverter { MoveConverter::new(self, db, table_info_reader) } diff --git a/aptos-node/src/services.rs b/aptos-node/src/services.rs index 01ea24968451bd..be30b4ecbb4a40 100644 --- a/aptos-node/src/services.rs +++ b/aptos-node/src/services.rs @@ -11,6 +11,7 @@ use aptos_consensus::{ }; use aptos_consensus_notifications::ConsensusNotifier; use aptos_data_client::client::AptosDataClient; +use aptos_db_indexer::indexer_reader::IndexerReaders; use aptos_event_notifications::{DbBackedOnChainConfig, ReconfigNotificationListener}; use aptos_indexer_grpc_fullnode::runtime::bootstrap as bootstrap_indexer_grpc; use aptos_indexer_grpc_table_info::runtime::{ @@ -31,12 +32,7 @@ use aptos_peer_monitoring_service_server::{ use aptos_peer_monitoring_service_types::PeerMonitoringServiceMessage; use aptos_storage_interface::{DbReader, DbReaderWriter}; use aptos_time_service::TimeService; -use aptos_types::{ - chain_id::ChainId, - indexer::{ - db_tailer_reader::IndexerTransactionEventReader, table_info_reader::TableInfoReader, - }, -}; +use aptos_types::{chain_id::ChainId, indexer::db_tailer_reader::IndexerReader}; use aptos_validator_transaction_pool::VTxnPoolState; use futures::channel::{mpsc, mpsc::Sender}; use std::{sync::Arc, time::Instant}; @@ -79,26 +75,21 @@ pub fn bootstrap_api_and_indexer( None => (None, None), }; + let indexer_readers = IndexerReaders::new(indexer_async_v2, txn_event_reader); + // Create the API runtime - let table_info_reader: Option> = indexer_async_v2.map(|arc| { - let trait_object: Arc = arc; + let indexer_reader: Option> = indexer_readers.map(|readers| { + let trait_object: Arc = Arc::new(readers); trait_object }); - let txn_event_reader: Option> = - txn_event_reader.map(|arc| { - let trait_object: Arc = arc; - trait_object - }); - let api_runtime = if node_config.api.enabled { Some(bootstrap_api( node_config, chain_id, db_rw.reader.clone(), mempool_client_sender.clone(), - table_info_reader.clone(), - txn_event_reader.clone(), + indexer_reader.clone(), )?) } else { None @@ -110,7 +101,7 @@ pub fn bootstrap_api_and_indexer( chain_id, db_rw.reader.clone(), mempool_client_sender.clone(), - table_info_reader, + indexer_reader, ); // Create the indexer runtime diff --git a/crates/indexer/src/indexer/fetcher.rs b/crates/indexer/src/indexer/fetcher.rs index aa622c01abb0ce..415aa56b675dbb 100644 --- a/crates/indexer/src/indexer/fetcher.rs +++ b/crates/indexer/src/indexer/fetcher.rs @@ -242,7 +242,7 @@ async fn fetch_nexts( let mut block_height_bcs = aptos_api_types::U64::from(block_height); let state_view = context.latest_state_view().unwrap(); - let converter = state_view.as_converter(context.db.clone(), context.table_info_reader.clone()); + let converter = state_view.as_converter(context.db.clone(), context.indexer_reader.clone()); let mut transactions = vec![]; for (ind, raw_txn) in raw_txns.into_iter().enumerate() { diff --git a/crates/indexer/src/runtime.rs b/crates/indexer/src/runtime.rs index e8a825b7c5561c..c3bf5a0516c0d4 100644 --- a/crates/indexer/src/runtime.rs +++ b/crates/indexer/src/runtime.rs @@ -96,7 +96,6 @@ pub fn bootstrap( mp_sender, node_config, None, /* table info reader */ - None, )); run_forever(indexer_config, context).await; }); diff --git a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs index 1fbfdf3a765f71..eb7b1260e65a55 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/runtime.rs @@ -18,7 +18,7 @@ use aptos_protos::{ util::timestamp::FILE_DESCRIPTOR_SET as UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET, }; use aptos_storage_interface::DbReader; -use aptos_types::{chain_id::ChainId, indexer::table_info_reader::TableInfoReader}; +use aptos_types::{chain_id::ChainId, indexer::db_tailer_reader::IndexerReader}; use std::{net::ToSocketAddrs, sync::Arc}; use tokio::runtime::Runtime; use tonic::{codec::CompressionEncoding, transport::Server}; @@ -34,7 +34,7 @@ pub fn bootstrap( chain_id: ChainId, db: Arc, mp_sender: MempoolClientSender, - table_info_reader: Option>, + indexer_reader: Option>, ) -> Option { if !config.indexer_grpc.enabled { return None; @@ -56,8 +56,7 @@ pub fn bootstrap( db, mp_sender, node_config, - table_info_reader, - None, + indexer_reader, )); let service_context = ServiceContext { context: context.clone(), diff --git a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs index 23826888a1ac7d..d4cd2a361d5bdc 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-fullnode/src/stream_coordinator.rs @@ -339,8 +339,7 @@ impl IndexerStreamCoordinator { let first_version = raw_txns.first().map(|txn| txn.version).unwrap(); let state_view = context.latest_state_view().unwrap(); - let converter = - state_view.as_converter(context.db.clone(), context.table_info_reader.clone()); + let converter = state_view.as_converter(context.db.clone(), context.indexer_reader.clone()); // Enrich data with block metadata let (_, _, block_event) = context diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs index 13be3b25c579ea..7da41b9676dd60 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs @@ -70,7 +70,6 @@ pub fn bootstrap( mp_sender, node_config.clone(), None, - None, )); let mut parser = TableInfoService::new( diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/tailer_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/tailer_service.rs index 37bfdc3de74742..7c2d093ff6690f 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/tailer_service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/tailer_service.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use aptos_config::config::NodeConfig; -use aptos_db_indexer::{db_ops::open_db, db_tailer::DBTailer}; +use aptos_db_indexer::{db_ops::open_tailer_db, db_tailer::DBTailer}; use aptos_indexer_grpc_utils::counters::{log_grpc_step, IndexerGrpcStep}; use aptos_storage_interface::DbReader; use std::sync::Arc; @@ -23,7 +23,7 @@ impl TailerService { .join(INDEX_ASYNC_DB_TAILER); let rocksdb_config = node_config.storage.rocksdb_configs.index_db_config; let db = Arc::new( - open_db(db_path, &rocksdb_config) + open_tailer_db(db_path, &rocksdb_config) .expect("Failed to open up indexer db tailer initially"), ); diff --git a/storage/aptosdb/src/db/include/aptosdb_reader.rs b/storage/aptosdb/src/db/include/aptosdb_reader.rs index 42cdaa87d54926..c6fdb53abb1c0f 100644 --- a/storage/aptosdb/src/db/include/aptosdb_reader.rs +++ b/storage/aptosdb/src/db/include/aptosdb_reader.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use aptos_types::block_info::BlockHeight; +use itertools::zip_eq; impl DbReader for AptosDB { fn get_epoch_ending_ledger_infos( @@ -811,32 +812,33 @@ impl DbReader for AptosDB { }) } - fn get_db_backup_iter( + fn get_db_tailor_iter( &self, start_version: Version, num_transactions: usize, ) -> Result)>> + '_,>> { - gauged_api("get_db_backup_iter", || { + gauged_api("get_db_tailor_iter", || { self.error_if_ledger_pruned("Transaction", start_version)?; let txn_iter = self .ledger_db .transaction_db() .get_transaction_iter(start_version, num_transactions)?; - let mut event_vec_iter = self + let event_vec_iter = self .ledger_db .event_db() .get_events_by_version_iter(start_version, num_transactions)?; - let zipped = txn_iter.enumerate().map(move |(idx, txn_res)| { + + let zipped = zip_eq(txn_iter, event_vec_iter).enumerate().map(move |(idx, (txn_res, event_vec_res))| { let version = start_version + idx as u64; // overflow is impossible since it's check upon txn_iter construction. let txn = txn_res?; - let event_vec = event_vec_iter.next().ok_or_else(|| { + let event_vec = event_vec_res.map_err(|_e| { AptosDbError::NotFound(format!( "Events not found when Transaction exists., version {}", version )) - })??; + })?; Ok((txn, event_vec)) }); Ok(Box::new(zipped) as Box)>> + '_,> ) diff --git a/storage/indexer/src/db_tailer.rs b/storage/indexer/src/db_tailer.rs index 014fda1ae3514a..27380a0f836eec 100644 --- a/storage/indexer/src/db_tailer.rs +++ b/storage/indexer/src/db_tailer.rs @@ -20,7 +20,7 @@ use aptos_types::{ account_address::AccountAddress, contract_event::{ContractEvent, EventWithVersion}, event::EventKey, - indexer::db_tailer_reader::{IndexerTransactionEventReader, Order}, + indexer::db_tailer_reader::Order, transaction::{AccountTransactionsWithProof, Version}, }; use std::sync::Arc; @@ -63,10 +63,9 @@ impl DBTailer { >, > = self .main_db_reader - .get_db_backup_iter(version, self.batch_size) + .get_db_tailor_iter(version, self.batch_size) .expect("Cannot create db tailer iterator"); let batch = SchemaBatch::new(); - let metadata_batch = SchemaBatch::new(); db_iter.for_each(|res| { res.map(|(txn, events)| { if let Some(txn) = txn.try_as_signed_user_txn() { @@ -98,11 +97,8 @@ impl DBTailer { }) .expect("Failed to iterate db tailer iterator"); }); - // write to index db + batch.put::(&version, &())?; self.db.write_schemas(batch)?; - // update the metadata - metadata_batch.put::(&version, &())?; - self.db.write_schemas(metadata_batch)?; Ok(version) } @@ -194,10 +190,8 @@ impl DBTailer { (event_key, txn_version, seq_num, idx) }))) } -} -impl IndexerTransactionEventReader for DBTailer { - fn get_events( + pub fn get_events( &self, event_key: &EventKey, start: u64, @@ -208,7 +202,7 @@ impl IndexerTransactionEventReader for DBTailer { self.get_events_by_event_key(event_key, start, order, limit, ledger_version) } - fn get_events_by_event_key( + pub fn get_events_by_event_key( &self, event_key: &EventKey, start_seq_num: u64, @@ -275,7 +269,7 @@ impl IndexerTransactionEventReader for DBTailer { Ok(events_with_version) } - fn get_account_transactions( + pub fn get_account_transactions( &self, address: AccountAddress, start_seq_num: u64, diff --git a/storage/indexer/src/indexer_reader.rs b/storage/indexer/src/indexer_reader.rs new file mode 100644 index 00000000000000..0e2e5e86e6c6f8 --- /dev/null +++ b/storage/indexer/src/indexer_reader.rs @@ -0,0 +1,98 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{db_tailer::DBTailer, db_v2::IndexerAsyncV2}; +use anyhow::{bail, Result}; +use aptos_types::{ + account_address::AccountAddress, + contract_event::EventWithVersion, + event::EventKey, + indexer::db_tailer_reader::{IndexerReader, Order}, + state_store::table::{TableHandle, TableInfo}, + transaction::{AccountTransactionsWithProof, Version}, +}; +use std::sync::Arc; + +pub struct IndexerReaders { + table_info_reader: Option>, + db_tailer_reader: Option>, +} + +impl IndexerReaders { + pub fn new( + table_info_reader: Option>, + db_tailer_reader: Option>, + ) -> Option { + if table_info_reader.is_none() && db_tailer_reader.is_none() { + None + } else { + Some(Self { + table_info_reader, + db_tailer_reader, + }) + } + } +} + +impl IndexerReader for IndexerReaders { + fn get_table_info(&self, handle: TableHandle) -> Result> { + if let Some(table_info_reader) = &self.table_info_reader { + return Ok(table_info_reader.get_table_info_with_retry(handle)?); + } + bail!("Table info reader is not available") + } + + fn get_events( + &self, + event_key: &EventKey, + start: u64, + order: Order, + limit: u64, + ledger_version: Version, + ) -> Result> { + if let Some(db_tailer_reader) = &self.db_tailer_reader { + return db_tailer_reader.get_events(event_key, start, order, limit, ledger_version); + } + bail!("DB tailer reader is not available") + } + + fn get_events_by_event_key( + &self, + event_key: &EventKey, + start_seq_num: u64, + order: Order, + limit: u64, + ledger_version: Version, + ) -> Result> { + if let Some(db_tailer_reader) = &self.db_tailer_reader { + return db_tailer_reader.get_events_by_event_key( + event_key, + start_seq_num, + order, + limit, + ledger_version, + ); + } + bail!("DB tailer reader is not available") + } + + fn get_account_transactions( + &self, + address: AccountAddress, + start_seq_num: u64, + limit: u64, + include_events: bool, + ledger_version: Version, + ) -> Result { + if let Some(db_tailer_reader) = &self.db_tailer_reader { + return db_tailer_reader.get_account_transactions( + address, + start_seq_num, + limit, + include_events, + ledger_version, + ); + } + bail!("DB tailer reader is not available") + } +} diff --git a/storage/indexer/src/lib.rs b/storage/indexer/src/lib.rs index 01990ae2475cab..a273ecce258643 100644 --- a/storage/indexer/src/lib.rs +++ b/storage/indexer/src/lib.rs @@ -6,9 +6,9 @@ mod db; pub mod db_ops; pub mod db_tailer; pub mod db_v2; +pub mod indexer_reader; mod metadata; mod schema; -pub mod table_info_reader; mod utils; use crate::{ diff --git a/storage/indexer/src/schema/mod.rs b/storage/indexer/src/schema/mod.rs index 844e8e9487e392..6b0bfa688066ee 100644 --- a/storage/indexer/src/schema/mod.rs +++ b/storage/indexer/src/schema/mod.rs @@ -15,7 +15,7 @@ use aptos_schemadb::ColumnFamilyName; pub const DEFAULT_COLUMN_FAMILY_NAME: ColumnFamilyName = "default"; pub const INDEXER_METADATA_CF_NAME: ColumnFamilyName = "indexer_metadata"; -pub const TAILER_METADATA_CF_NAME: ColumnFamilyName = "event_metadata"; +pub const TAILER_METADATA_CF_NAME: ColumnFamilyName = "tailer_metadata"; pub const TABLE_INFO_CF_NAME: ColumnFamilyName = "table_info"; pub const EVENT_BY_KEY_CF_NAME: ColumnFamilyName = "event_by_key"; pub const EVENT_BY_VERSION_CF_NAME: ColumnFamilyName = "event_by_version"; diff --git a/storage/indexer/src/table_info_reader.rs b/storage/indexer/src/table_info_reader.rs deleted file mode 100644 index 5cb8b05897c451..00000000000000 --- a/storage/indexer/src/table_info_reader.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::db_v2::IndexerAsyncV2; -use anyhow::Result; -use aptos_types::{ - indexer::table_info_reader::TableInfoReader, - state_store::table::{TableHandle, TableInfo}, -}; - -/// Table info reader is to create a thin interface for other services to read the db data, -/// this standalone db is officially not part of the AptosDB anymore. -/// For services that need table info mapping, they need to acquire this reader in the FN bootstrapping stage. - -impl TableInfoReader for IndexerAsyncV2 { - fn get_table_info(&self, handle: TableHandle) -> Result> { - Ok(self.get_table_info_with_retry(handle)?) - } -} diff --git a/storage/storage-interface/src/lib.rs b/storage/storage-interface/src/lib.rs index 5432dd1c3ddf3f..74e829be4382ae 100644 --- a/storage/storage-interface/src/lib.rs +++ b/storage/storage-interface/src/lib.rs @@ -465,7 +465,7 @@ pub trait DbReader: Send + Sync { /// Returns state storage usage at the end of an epoch. fn get_state_storage_usage(&self, version: Option) -> Result; - fn get_db_backup_iter( + fn get_db_tailor_iter( &self, start_version: Version, num_transactions: usize, diff --git a/types/src/indexer/db_tailer_reader.rs b/types/src/indexer/db_tailer_reader.rs index 9253c39db139b3..9aac0e3d29fb39 100644 --- a/types/src/indexer/db_tailer_reader.rs +++ b/types/src/indexer/db_tailer_reader.rs @@ -5,6 +5,7 @@ use crate::{ account_address::AccountAddress, contract_event::EventWithVersion, event::EventKey, + state_store::table::{TableHandle, TableInfo}, transaction::{AccountTransactionsWithProof, Version}, }; use anyhow::Result; @@ -15,7 +16,9 @@ pub enum Order { Descending, } -pub trait IndexerTransactionEventReader: Send + Sync { +pub trait IndexerReader: Send + Sync { + fn get_table_info(&self, handle: TableHandle) -> Result>; + fn get_events( &self, event_key: &EventKey, diff --git a/types/src/indexer/mod.rs b/types/src/indexer/mod.rs index 2793462e3844f8..f4327aa9641d3d 100644 --- a/types/src/indexer/mod.rs +++ b/types/src/indexer/mod.rs @@ -2,4 +2,3 @@ // SPDX-License-Identifier: Apache-2.0 pub mod db_tailer_reader; -pub mod table_info_reader; diff --git a/types/src/indexer/table_info_reader.rs b/types/src/indexer/table_info_reader.rs deleted file mode 100644 index 5d6910e24b9190..00000000000000 --- a/types/src/indexer/table_info_reader.rs +++ /dev/null @@ -1,12 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::state_store::table::{TableHandle, TableInfo}; -use anyhow::Result; - -/// Table info reader is to create a thin interface for other services to read the db data, -/// this standalone db is officially not part of the AptosDB anymore. -/// For services that need table info mapping, they need to acquire this reader in the FN bootstrapping stage. -pub trait TableInfoReader: Send + Sync { - fn get_table_info(&self, handle: TableHandle) -> Result>; -}