From 5e78baff41f907204b8462878879f95812b5dc0c Mon Sep 17 00:00:00 2001 From: Bo Wu Date: Wed, 25 Sep 2024 13:09:20 -0700 Subject: [PATCH] sleep shorter and return 0 instead of throwing error --- Cargo.lock | 1 + aptos-node/src/lib.rs | 10 +- aptos-node/src/services.rs | 20 +++- aptos-node/src/storage.rs | 104 +++++++++++------- .../src/internal_indexer_db_service.rs | 52 +++++---- .../indexer-grpc-table-info/src/runtime.rs | 7 +- .../executor/tests/internal_indexer_test.rs | 4 +- storage/aptosdb/Cargo.toml | 1 + .../src/db/include/aptosdb_internal.rs | 1 + .../aptosdb/src/db/include/aptosdb_writer.rs | 8 ++ storage/aptosdb/src/db/mod.rs | 8 +- .../aptosdb/src/fast_sync_storage_wrapper.rs | 7 +- storage/indexer/src/db_indexer.rs | 13 +-- 13 files changed, 153 insertions(+), 83 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f619517cfcdbf..4d40d7db12ae86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1184,6 +1184,7 @@ dependencies = [ "serde", "static_assertions", "status-line", + "tokio", ] [[package]] diff --git a/aptos-node/src/lib.rs b/aptos-node/src/lib.rs index 4634c22c3efb75..93acbfd7e7e44e 100644 --- a/aptos-node/src/lib.rs +++ b/aptos-node/src/lib.rs @@ -605,7 +605,7 @@ pub fn setup_environment_and_start_node( let mut admin_service = services::start_admin_service(&node_config); // Set up the storage database and any RocksDB checkpoints - let (db_rw, backup_service, genesis_waypoint, indexer_db_opt) = + let (db_rw, backup_service, genesis_waypoint, indexer_db_opt, update_receiver) = storage::initialize_database_and_checkpoints(&mut node_config)?; admin_service.set_aptos_db(db_rw.clone().into()); @@ -687,7 +687,13 @@ pub fn setup_environment_and_start_node( indexer_runtime, indexer_grpc_runtime, internal_indexer_db_runtime, - ) = services::bootstrap_api_and_indexer(&node_config, db_rw.clone(), chain_id, indexer_db_opt)?; + ) = services::bootstrap_api_and_indexer( + &node_config, + db_rw.clone(), + chain_id, + indexer_db_opt, + update_receiver, + )?; // Create mempool and get the consensus to mempool sender let (mempool_runtime, consensus_to_mempool_sender) = diff --git a/aptos-node/src/services.rs b/aptos-node/src/services.rs index a6b94bde33bc83..2a686806ae360b 100644 --- a/aptos-node/src/services.rs +++ b/aptos-node/src/services.rs @@ -34,7 +34,10 @@ use aptos_types::{chain_id::ChainId, indexer::indexer_db_reader::IndexerReader}; use aptos_validator_transaction_pool::VTxnPoolState; use futures::channel::{mpsc, mpsc::Sender}; use std::{sync::Arc, time::Instant}; -use tokio::runtime::{Handle, Runtime}; +use tokio::{ + runtime::{Handle, Runtime}, + sync::watch::Receiver as WatchReceiver, +}; const AC_SMP_CHANNEL_BUFFER_SIZE: usize = 1_024; const INTRA_NODE_CHANNEL_BUFFER_SIZE: usize = 1; @@ -46,6 +49,7 @@ pub fn bootstrap_api_and_indexer( db_rw: DbReaderWriter, chain_id: ChainId, internal_indexer_db: Option, + update_receiver: Option>, ) -> anyhow::Result<( Receiver, Option, @@ -68,11 +72,15 @@ pub fn bootstrap_api_and_indexer( None => (None, None), }; - let (db_indexer_runtime, txn_event_reader) = - match bootstrap_internal_indexer_db(node_config, db_rw.clone(), internal_indexer_db) { - Some((runtime, db_indexer)) => (Some(runtime), Some(db_indexer)), - None => (None, None), - }; + let (db_indexer_runtime, txn_event_reader) = match bootstrap_internal_indexer_db( + node_config, + db_rw.clone(), + internal_indexer_db, + update_receiver, + ) { + Some((runtime, db_indexer)) => (Some(runtime), Some(db_indexer)), + None => (None, None), + }; let indexer_readers = IndexerReaders::new(indexer_async_v2, txn_event_reader); diff --git a/aptos-node/src/storage.rs b/aptos-node/src/storage.rs index 0089a7961b2ead..63ef184ea31be3 100644 --- a/aptos-node/src/storage.rs +++ b/aptos-node/src/storage.rs @@ -10,11 +10,16 @@ use aptos_executor::db_bootstrapper::maybe_bootstrap; use aptos_indexer_grpc_table_info::internal_indexer_db_service::InternalIndexerDBService; use aptos_logger::{debug, info}; use aptos_storage_interface::{DbReader, DbReaderWriter}; -use aptos_types::{ledger_info::LedgerInfoWithSignatures, waypoint::Waypoint}; +use aptos_types::{ + ledger_info::LedgerInfoWithSignatures, transaction::Version, waypoint::Waypoint, +}; use aptos_vm::AptosVM; use either::Either; use std::{fs, path::Path, sync::Arc, time::Instant}; -use tokio::runtime::Runtime; +use tokio::{ + runtime::Runtime, + sync::watch::{channel, Receiver}, +}; pub(crate) fn maybe_apply_genesis( db_rw: &DbReaderWriter, @@ -45,46 +50,60 @@ pub(crate) fn bootstrap_db( DbReaderWriter, Option, Option, + Option>, )> { let internal_indexer_db = InternalIndexerDBService::get_indexer_db(node_config); - let (aptos_db_reader, db_rw, backup_service) = - match FastSyncStorageWrapper::initialize_dbs(node_config, internal_indexer_db.clone())? { - Either::Left(db) => { - let (db_arc, db_rw) = DbReaderWriter::wrap(db); - let db_backup_service = start_backup_service( - node_config.storage.backup_service_address, - db_arc.clone(), - ); - maybe_apply_genesis(&db_rw, node_config)?; - (db_arc as Arc, db_rw, Some(db_backup_service)) - }, - Either::Right(fast_sync_db_wrapper) => { - let temp_db = fast_sync_db_wrapper.get_temporary_db_with_genesis(); - maybe_apply_genesis(&DbReaderWriter::from_arc(temp_db), node_config)?; - let (db_arc, db_rw) = DbReaderWriter::wrap(fast_sync_db_wrapper); - let fast_sync_db = db_arc.get_fast_sync_db(); - // FastSyncDB requires ledger info at epoch 0 to establish provenance to genesis - let ledger_info = db_arc - .get_temporary_db_with_genesis() - .get_epoch_ending_ledger_info(0) - .expect("Genesis ledger info must exist"); - - if fast_sync_db - .get_latest_ledger_info_option() - .expect("should returns Ok results") - .is_none() - { - // it means the DB is empty and we need to - // commit the genesis ledger info to the DB. - fast_sync_db.commit_genesis_ledger_info(&ledger_info)?; - } - - let db_backup_service = - start_backup_service(node_config.storage.backup_service_address, fast_sync_db); - (db_arc as Arc, db_rw, Some(db_backup_service)) - }, - }; - Ok((aptos_db_reader, db_rw, backup_service, internal_indexer_db)) + let (update_sender, update_receiver) = if internal_indexer_db.is_some() { + let (sender, receiver) = channel::(0); + (Some(sender), Some(receiver)) + } else { + (None, None) + }; + + let (aptos_db_reader, db_rw, backup_service) = match FastSyncStorageWrapper::initialize_dbs( + node_config, + internal_indexer_db.clone(), + update_sender, + )? { + Either::Left(db) => { + let (db_arc, db_rw) = DbReaderWriter::wrap(db); + let db_backup_service = + start_backup_service(node_config.storage.backup_service_address, db_arc.clone()); + maybe_apply_genesis(&db_rw, node_config)?; + (db_arc as Arc, db_rw, Some(db_backup_service)) + }, + Either::Right(fast_sync_db_wrapper) => { + let temp_db = fast_sync_db_wrapper.get_temporary_db_with_genesis(); + maybe_apply_genesis(&DbReaderWriter::from_arc(temp_db), node_config)?; + let (db_arc, db_rw) = DbReaderWriter::wrap(fast_sync_db_wrapper); + let fast_sync_db = db_arc.get_fast_sync_db(); + // FastSyncDB requires ledger info at epoch 0 to establish provenance to genesis + let ledger_info = db_arc + .get_temporary_db_with_genesis() + .get_epoch_ending_ledger_info(0) + .expect("Genesis ledger info must exist"); + + if fast_sync_db + .get_latest_ledger_info_option() + .expect("should returns Ok results") + .is_none() + { + // it means the DB is empty and we need to + // commit the genesis ledger info to the DB. + fast_sync_db.commit_genesis_ledger_info(&ledger_info)?; + } + let db_backup_service = + start_backup_service(node_config.storage.backup_service_address, fast_sync_db); + (db_arc as Arc, db_rw, Some(db_backup_service)) + }, + }; + Ok(( + aptos_db_reader, + db_rw, + backup_service, + internal_indexer_db, + update_receiver, + )) } /// In consensus-only mode, return a in-memory based [FakeAptosDB] and @@ -157,6 +176,7 @@ pub fn initialize_database_and_checkpoints( Option, Waypoint, Option, + Option>, )> { // If required, create RocksDB checkpoints and change the working directory. // This is test-only. @@ -166,7 +186,8 @@ pub fn initialize_database_and_checkpoints( // Open the database let instant = Instant::now(); - let (_aptos_db, db_rw, backup_service, indexer_db_opt) = bootstrap_db(node_config)?; + let (_aptos_db, db_rw, backup_service, indexer_db_opt, update_receiver) = + bootstrap_db(node_config)?; // Log the duration to open storage debug!( @@ -179,5 +200,6 @@ pub fn initialize_database_and_checkpoints( backup_service, node_config.base.waypoint.genesis_waypoint(), indexer_db_opt, + update_receiver, )) } diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs index ca26fd0c6e49e7..a62c3fa04c49e1 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs @@ -15,20 +15,26 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; -use tokio::runtime::Handle; +use tokio::{runtime::Handle, sync::watch::Receiver}; const SERVICE_TYPE: &str = "internal_indexer_db_service"; const INTERNAL_INDEXER_DB: &str = "internal_indexer_db"; pub struct InternalIndexerDBService { pub db_indexer: Arc, + pub update_receiver: Option>, } impl InternalIndexerDBService { - pub fn new(db_reader: Arc, internal_indexer_db: InternalIndexerDB) -> Self { + pub fn new( + db_reader: Arc, + internal_indexer_db: InternalIndexerDB, + update_receiver: Option>, + ) -> Self { let internal_db_indexer = Arc::new(DBIndexer::new(internal_indexer_db, db_reader)); Self { db_indexer: internal_db_indexer, + update_receiver, } } @@ -137,25 +143,29 @@ impl InternalIndexerDBService { loop { let start_time: std::time::Instant = std::time::Instant::now(); - let next_version = self.db_indexer.process_a_batch(start_version)?; - - if next_version == start_version { - tokio::time::sleep(std::time::Duration::from_millis(100)).await; - continue; + let end_version = self + .update_receiver + .as_mut() + .map(|receiver| *receiver.borrow_and_update()) + .unwrap_or(Version::MAX); + while start_version < end_version { + let next_version = self + .db_indexer + .process_a_batch(start_version, end_version)?; + log_grpc_step( + SERVICE_TYPE, + IndexerGrpcStep::InternalIndexerDBProcessed, + Some(start_version as i64), + Some(next_version as i64), + None, + None, + Some(start_time.elapsed().as_secs_f64()), + None, + Some((next_version - start_version) as i64), + None, + ); + start_version = next_version; } - log_grpc_step( - SERVICE_TYPE, - IndexerGrpcStep::InternalIndexerDBProcessed, - Some(start_version as i64), - Some(next_version as i64), - None, - None, - Some(start_time.elapsed().as_secs_f64()), - None, - Some((next_version - start_version) as i64), - None, - ); - start_version = next_version; } } } @@ -179,7 +189,7 @@ impl MockInternalIndexerDBService { let db = InternalIndexerDBService::get_indexer_db(node_config).unwrap(); let handle = Handle::current(); - let mut internal_indexer_db_service = InternalIndexerDBService::new(db_reader, db); + let mut internal_indexer_db_service = InternalIndexerDBService::new(db_reader, db, None); let db_indexer = internal_indexer_db_service.get_db_indexer(); let config_clone = node_config.to_owned(); handle.spawn(async move { diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs index ff5c17d5d9b494..8251f1d416ed5c 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs @@ -14,9 +14,9 @@ use aptos_db_indexer::{ }; use aptos_mempool::MempoolClientSender; use aptos_storage_interface::DbReaderWriter; -use aptos_types::chain_id::ChainId; +use aptos_types::{chain_id::ChainId, transaction::Version}; use std::sync::Arc; -use tokio::runtime::Runtime; +use tokio::{runtime::Runtime, sync::watch::Receiver}; const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db"; @@ -24,6 +24,7 @@ pub fn bootstrap_internal_indexer_db( config: &NodeConfig, db_rw: DbReaderWriter, internal_indexer_db: Option, + update_receiver: Option>, ) -> Option<(Runtime, Arc)> { if !config.indexer_db_config.is_internal_indexer_db_enabled() || internal_indexer_db.is_none() { return None; @@ -31,7 +32,7 @@ pub fn bootstrap_internal_indexer_db( let runtime = aptos_runtimes::spawn_named_runtime("index-db".to_string(), None); // Set up db config and open up the db initially to read metadata let mut indexer_service = - InternalIndexerDBService::new(db_rw.reader, internal_indexer_db.unwrap()); + InternalIndexerDBService::new(db_rw.reader, internal_indexer_db.unwrap(), update_receiver); let db_indexer = indexer_service.get_db_indexer(); // Spawn task for db indexer let config_clone = config.to_owned(); diff --git a/execution/executor/tests/internal_indexer_test.rs b/execution/executor/tests/internal_indexer_test.rs index ba6b004de55c9a..e55bea9b9ce2f1 100644 --- a/execution/executor/tests/internal_indexer_test.rs +++ b/execution/executor/tests/internal_indexer_test.rs @@ -153,7 +153,9 @@ fn test_db_indexer_data() { assert_eq!(version, None); let mut start_version = version.map_or(0, |v| v + 1); while start_version < total_version { - start_version = db_indexer.process_a_batch(start_version).unwrap(); + start_version = db_indexer + .process_a_batch(start_version, total_version + 1) + .unwrap(); } // wait for the commit to finish thread::sleep(Duration::from_millis(100)); diff --git a/storage/aptosdb/Cargo.toml b/storage/aptosdb/Cargo.toml index cc86b63746f0ef..86477b75f36761 100644 --- a/storage/aptosdb/Cargo.toml +++ b/storage/aptosdb/Cargo.toml @@ -57,6 +57,7 @@ rayon = { workspace = true } serde = { workspace = true } static_assertions = { workspace = true } status-line = { workspace = true } +tokio = { workspace = true } [dev-dependencies] aptos-executor-types = { workspace = true } diff --git a/storage/aptosdb/src/db/include/aptosdb_internal.rs b/storage/aptosdb/src/db/include/aptosdb_internal.rs index 5556d8a41d6f1b..73cc62d2dafd02 100644 --- a/storage/aptosdb/src/db/include/aptosdb_internal.rs +++ b/storage/aptosdb/src/db/include/aptosdb_internal.rs @@ -63,6 +63,7 @@ impl AptosDB { commit_lock: std::sync::Mutex::new(()), indexer: None, skip_index_and_usage, + update_subscriber: None, } } diff --git a/storage/aptosdb/src/db/include/aptosdb_writer.rs b/storage/aptosdb/src/db/include/aptosdb_writer.rs index 36a6931137b52f..1a711a11e565ab 100644 --- a/storage/aptosdb/src/db/include/aptosdb_writer.rs +++ b/storage/aptosdb/src/db/include/aptosdb_writer.rs @@ -669,6 +669,14 @@ impl AptosDB { COMMITTED_TXNS.inc_by(num_txns); LATEST_TXN_VERSION.set(version as i64); + if self.update_subscriber.is_some() { + self.update_subscriber + .as_ref() + .unwrap() + .send(version).map_err(| _ | { + AptosDbError::Other(format!("Failed to send update to subscriber: {}", version)) + })?; + } // Activate the ledger pruner and state kv pruner. // Note the state merkle pruner is activated when state snapshots are persisted // in their async thread. diff --git a/storage/aptosdb/src/db/mod.rs b/storage/aptosdb/src/db/mod.rs index 51a85a940a7792..2608e7ed527079 100644 --- a/storage/aptosdb/src/db/mod.rs +++ b/storage/aptosdb/src/db/mod.rs @@ -81,7 +81,7 @@ use std::{ sync::Arc, time::Instant, }; - +use tokio::sync::watch::Sender; #[cfg(test)] mod aptosdb_test; #[cfg(any(test, feature = "fuzzing"))] @@ -101,6 +101,7 @@ pub struct AptosDB { commit_lock: std::sync::Mutex<()>, indexer: Option, skip_index_and_usage: bool, + update_subscriber: Option>, } // DbReader implementations and private functions used by them. @@ -186,6 +187,11 @@ impl AptosDB { Ok((ledger_db, state_merkle_db, state_kv_db)) } + pub fn add_version_update_subscriber(&mut self, sender: Sender) -> Result<()> { + self.update_subscriber = Some(sender); + Ok(()) + } + /// Gets an instance of `BackupHandler` for data backup purpose. pub fn get_backup_handler(&self) -> BackupHandler { BackupHandler::new(Arc::clone(&self.state_store), Arc::clone(&self.ledger_db)) diff --git a/storage/aptosdb/src/fast_sync_storage_wrapper.rs b/storage/aptosdb/src/fast_sync_storage_wrapper.rs index ca5f1fe2f009aa..703cfe9326f37b 100644 --- a/storage/aptosdb/src/fast_sync_storage_wrapper.rs +++ b/storage/aptosdb/src/fast_sync_storage_wrapper.rs @@ -18,6 +18,7 @@ use aptos_types::{ }; use either::Either; use std::sync::Arc; +use tokio::sync::watch::Sender; pub const SECONDARY_DB_DIR: &str = "fast_sync_secondary"; @@ -44,8 +45,9 @@ impl FastSyncStorageWrapper { pub fn initialize_dbs( config: &NodeConfig, internal_indexer_db: Option, + update_sender: Option>, ) -> Result> { - let db_main = AptosDB::open( + let mut db_main = AptosDB::open( config.storage.get_dir_paths(), /*readonly=*/ false, config.storage.storage_pruner_config, @@ -56,6 +58,9 @@ impl FastSyncStorageWrapper { internal_indexer_db, ) .map_err(|err| anyhow!("fast sync DB failed to open {}", err))?; + if let Some(sender) = update_sender { + db_main.add_version_update_subscriber(sender)?; + } let mut db_dir = config.storage.dir(); // when the db is empty and configured to do fast sync, we will create a second DB diff --git a/storage/indexer/src/db_indexer.rs b/storage/indexer/src/db_indexer.rs index f21aade0905a5a..e269a14401be27 100644 --- a/storage/indexer/src/db_indexer.rs +++ b/storage/indexer/src/db_indexer.rs @@ -346,23 +346,22 @@ impl DBIndexer { Ok(zipped) } - fn get_num_of_transactions(&self, version: Version) -> Result { - let highest_version = self.main_db_reader.ensure_synced_version()?; - if version > highest_version { + fn get_num_of_transactions(&self, version: Version, end_version: Version) -> Result { + if version > end_version { // In case main db is not synced yet or recreated return Ok(0); } - // we want to include the last transaction since the iterator interface will is right exclusive. + // we want to include the last transaction since the iterator interface is right exclusive. let num_of_transaction = min( self.indexer_db.config.batch_size as u64, - highest_version + 1 - version, + end_version + 1 - version, ); Ok(num_of_transaction) } - pub fn process_a_batch(&self, start_version: Version) -> Result { + pub fn process_a_batch(&self, start_version: Version, end_version: Version) -> Result { let mut version = start_version; - let num_transactions = self.get_num_of_transactions(version)?; + let num_transactions = self.get_num_of_transactions(version, end_version)?; let mut db_iter = self.get_main_db_iter(version, num_transactions)?; let batch = SchemaBatch::new(); db_iter.try_for_each(|res| {