Skip to content

Commit

Permalink
sleep shorter and return 0 instead of throwing error
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed Sep 26, 2024
1 parent 8689b9a commit 4959977
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 84 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ pub fn setup_environment_and_start_node(
let mut admin_service = services::start_admin_service(&node_config);

// Set up the storage database and any RocksDB checkpoints
let (db_rw, backup_service, genesis_waypoint, indexer_db_opt) =
let (db_rw, backup_service, genesis_waypoint, indexer_db_opt, update_receiver) =
storage::initialize_database_and_checkpoints(&mut node_config)?;

admin_service.set_aptos_db(db_rw.clone().into());
Expand Down Expand Up @@ -687,7 +687,13 @@ pub fn setup_environment_and_start_node(
indexer_runtime,
indexer_grpc_runtime,
internal_indexer_db_runtime,
) = services::bootstrap_api_and_indexer(&node_config, db_rw.clone(), chain_id, indexer_db_opt)?;
) = services::bootstrap_api_and_indexer(
&node_config,
db_rw.clone(),
chain_id,
indexer_db_opt,
update_receiver,
)?;

// Create mempool and get the consensus to mempool sender
let (mempool_runtime, consensus_to_mempool_sender) =
Expand Down
20 changes: 14 additions & 6 deletions aptos-node/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ use aptos_types::{chain_id::ChainId, indexer::indexer_db_reader::IndexerReader};
use aptos_validator_transaction_pool::VTxnPoolState;
use futures::channel::{mpsc, mpsc::Sender};
use std::{sync::Arc, time::Instant};
use tokio::runtime::{Handle, Runtime};
use tokio::{
runtime::{Handle, Runtime},
sync::watch::Receiver as WatchReceiver,
};

const AC_SMP_CHANNEL_BUFFER_SIZE: usize = 1_024;
const INTRA_NODE_CHANNEL_BUFFER_SIZE: usize = 1;
Expand All @@ -46,6 +49,7 @@ pub fn bootstrap_api_and_indexer(
db_rw: DbReaderWriter,
chain_id: ChainId,
internal_indexer_db: Option<InternalIndexerDB>,
update_receiver: Option<WatchReceiver<u64>>,
) -> anyhow::Result<(
Receiver<MempoolClientRequest>,
Option<Runtime>,
Expand All @@ -68,11 +72,15 @@ pub fn bootstrap_api_and_indexer(
None => (None, None),
};

let (db_indexer_runtime, txn_event_reader) =
match bootstrap_internal_indexer_db(node_config, db_rw.clone(), internal_indexer_db) {
Some((runtime, db_indexer)) => (Some(runtime), Some(db_indexer)),
None => (None, None),
};
let (db_indexer_runtime, txn_event_reader) = match bootstrap_internal_indexer_db(
node_config,
db_rw.clone(),
internal_indexer_db,
update_receiver,
) {
Some((runtime, db_indexer)) => (Some(runtime), Some(db_indexer)),
None => (None, None),
};

let indexer_readers = IndexerReaders::new(indexer_async_v2, txn_event_reader);

Expand Down
104 changes: 63 additions & 41 deletions aptos-node/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ use aptos_executor::db_bootstrapper::maybe_bootstrap;
use aptos_indexer_grpc_table_info::internal_indexer_db_service::InternalIndexerDBService;
use aptos_logger::{debug, info};
use aptos_storage_interface::{DbReader, DbReaderWriter};
use aptos_types::{ledger_info::LedgerInfoWithSignatures, waypoint::Waypoint};
use aptos_types::{
ledger_info::LedgerInfoWithSignatures, transaction::Version, waypoint::Waypoint,
};
use aptos_vm::AptosVM;
use either::Either;
use std::{fs, path::Path, sync::Arc, time::Instant};
use tokio::runtime::Runtime;
use tokio::{
runtime::Runtime,
sync::watch::{channel, Receiver as WatchReceiver},
};

pub(crate) fn maybe_apply_genesis(
db_rw: &DbReaderWriter,
Expand Down Expand Up @@ -45,46 +50,60 @@ pub(crate) fn bootstrap_db(
DbReaderWriter,
Option<Runtime>,
Option<InternalIndexerDB>,
Option<WatchReceiver<u64>>,
)> {
let internal_indexer_db = InternalIndexerDBService::get_indexer_db(node_config);
let (aptos_db_reader, db_rw, backup_service) =
match FastSyncStorageWrapper::initialize_dbs(node_config, internal_indexer_db.clone())? {
Either::Left(db) => {
let (db_arc, db_rw) = DbReaderWriter::wrap(db);
let db_backup_service = start_backup_service(
node_config.storage.backup_service_address,
db_arc.clone(),
);
maybe_apply_genesis(&db_rw, node_config)?;
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
Either::Right(fast_sync_db_wrapper) => {
let temp_db = fast_sync_db_wrapper.get_temporary_db_with_genesis();
maybe_apply_genesis(&DbReaderWriter::from_arc(temp_db), node_config)?;
let (db_arc, db_rw) = DbReaderWriter::wrap(fast_sync_db_wrapper);
let fast_sync_db = db_arc.get_fast_sync_db();
// FastSyncDB requires ledger info at epoch 0 to establish provenance to genesis
let ledger_info = db_arc
.get_temporary_db_with_genesis()
.get_epoch_ending_ledger_info(0)
.expect("Genesis ledger info must exist");

if fast_sync_db
.get_latest_ledger_info_option()
.expect("should returns Ok results")
.is_none()
{
// it means the DB is empty and we need to
// commit the genesis ledger info to the DB.
fast_sync_db.commit_genesis_ledger_info(&ledger_info)?;
}

let db_backup_service =
start_backup_service(node_config.storage.backup_service_address, fast_sync_db);
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
};
Ok((aptos_db_reader, db_rw, backup_service, internal_indexer_db))
let (update_sender, update_receiver) = if internal_indexer_db.is_some() {
let (sender, receiver) = channel::<u64>(0);
(Some(sender), Some(receiver))
} else {
(None, None)
};

let (aptos_db_reader, db_rw, backup_service) = match FastSyncStorageWrapper::initialize_dbs(
node_config,
internal_indexer_db.clone(),
update_sender,
)? {
Either::Left(db) => {
let (db_arc, db_rw) = DbReaderWriter::wrap(db);
let db_backup_service =
start_backup_service(node_config.storage.backup_service_address, db_arc.clone());
maybe_apply_genesis(&db_rw, node_config)?;
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
Either::Right(fast_sync_db_wrapper) => {
let temp_db = fast_sync_db_wrapper.get_temporary_db_with_genesis();
maybe_apply_genesis(&DbReaderWriter::from_arc(temp_db), node_config)?;
let (db_arc, db_rw) = DbReaderWriter::wrap(fast_sync_db_wrapper);
let fast_sync_db = db_arc.get_fast_sync_db();
// FastSyncDB requires ledger info at epoch 0 to establish provenance to genesis
let ledger_info = db_arc
.get_temporary_db_with_genesis()
.get_epoch_ending_ledger_info(0)
.expect("Genesis ledger info must exist");

if fast_sync_db
.get_latest_ledger_info_option()
.expect("should returns Ok results")
.is_none()
{
// it means the DB is empty and we need to
// commit the genesis ledger info to the DB.
fast_sync_db.commit_genesis_ledger_info(&ledger_info)?;
}
let db_backup_service =
start_backup_service(node_config.storage.backup_service_address, fast_sync_db);
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
};
Ok((
aptos_db_reader,
db_rw,
backup_service,
internal_indexer_db,
update_receiver,
))
}

/// In consensus-only mode, return a in-memory based [FakeAptosDB] and
Expand Down Expand Up @@ -157,6 +176,7 @@ pub fn initialize_database_and_checkpoints(
Option<Runtime>,
Waypoint,
Option<InternalIndexerDB>,
Option<WatchReceiver<Version>>,
)> {
// If required, create RocksDB checkpoints and change the working directory.
// This is test-only.
Expand All @@ -166,7 +186,8 @@ pub fn initialize_database_and_checkpoints(

// Open the database
let instant = Instant::now();
let (_aptos_db, db_rw, backup_service, indexer_db_opt) = bootstrap_db(node_config)?;
let (_aptos_db, db_rw, backup_service, indexer_db_opt, update_receiver) =
bootstrap_db(node_config)?;

// Log the duration to open storage
debug!(
Expand All @@ -179,5 +200,6 @@ pub fn initialize_database_and_checkpoints(
backup_service,
node_config.base.waypoint.genesis_waypoint(),
indexer_db_opt,
update_receiver,
))
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,26 @@ use std::{
path::{Path, PathBuf},
sync::Arc,
};
use tokio::runtime::Handle;
use tokio::{runtime::Handle, sync::watch::Receiver as WatchReceiver};

const SERVICE_TYPE: &str = "internal_indexer_db_service";
const INTERNAL_INDEXER_DB: &str = "internal_indexer_db";

pub struct InternalIndexerDBService {
pub db_indexer: Arc<DBIndexer>,
pub update_receiver: Option<WatchReceiver<Version>>,
}

impl InternalIndexerDBService {
pub fn new(db_reader: Arc<dyn DbReader>, internal_indexer_db: InternalIndexerDB) -> Self {
pub fn new(
db_reader: Arc<dyn DbReader>,
internal_indexer_db: InternalIndexerDB,
update_receiver: Option<WatchReceiver<Version>>,
) -> Self {
let internal_db_indexer = Arc::new(DBIndexer::new(internal_indexer_db, db_reader));
Self {
db_indexer: internal_db_indexer,
update_receiver,
}
}

Expand Down Expand Up @@ -137,25 +143,34 @@ impl InternalIndexerDBService {

loop {
let start_time: std::time::Instant = std::time::Instant::now();
let next_version = self.db_indexer.process_a_batch(start_version)?;

if next_version == start_version {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
continue;
let end_version = self
.update_receiver
.as_mut()
.map(|receiver| *receiver.borrow_and_update())
.unwrap_or(self.db_indexer.main_db_reader.ensure_synced_version()?);
while start_version <= end_version {
let next_version = self
.db_indexer
.process_a_batch(start_version, end_version)?;
log_grpc_step(
SERVICE_TYPE,
IndexerGrpcStep::InternalIndexerDBProcessed,
Some(start_version as i64),
Some(next_version as i64),
None,
None,
Some(start_time.elapsed().as_secs_f64()),
None,
Some((next_version - start_version) as i64),
None,
);
start_version = next_version;
}
if let Some(receiver) = self.update_receiver.as_mut() {
receiver.changed().await?;
} else {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
log_grpc_step(
SERVICE_TYPE,
IndexerGrpcStep::InternalIndexerDBProcessed,
Some(start_version as i64),
Some(next_version as i64),
None,
None,
Some(start_time.elapsed().as_secs_f64()),
None,
Some((next_version - start_version) as i64),
None,
);
start_version = next_version;
}
}
}
Expand All @@ -179,7 +194,7 @@ impl MockInternalIndexerDBService {

let db = InternalIndexerDBService::get_indexer_db(node_config).unwrap();
let handle = Handle::current();
let mut internal_indexer_db_service = InternalIndexerDBService::new(db_reader, db);
let mut internal_indexer_db_service = InternalIndexerDBService::new(db_reader, db, None);
let db_indexer = internal_indexer_db_service.get_db_indexer();
let config_clone = node_config.to_owned();
handle.spawn(async move {
Expand Down
7 changes: 4 additions & 3 deletions ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,25 @@ use aptos_db_indexer::{
};
use aptos_mempool::MempoolClientSender;
use aptos_storage_interface::DbReaderWriter;
use aptos_types::chain_id::ChainId;
use aptos_types::{chain_id::ChainId, transaction::Version};
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::{runtime::Runtime, sync::watch::Receiver};

const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db";

pub fn bootstrap_internal_indexer_db(
config: &NodeConfig,
db_rw: DbReaderWriter,
internal_indexer_db: Option<InternalIndexerDB>,
update_receiver: Option<Receiver<Version>>,
) -> Option<(Runtime, Arc<DBIndexer>)> {
if !config.indexer_db_config.is_internal_indexer_db_enabled() || internal_indexer_db.is_none() {
return None;
}
let runtime = aptos_runtimes::spawn_named_runtime("index-db".to_string(), None);
// Set up db config and open up the db initially to read metadata
let mut indexer_service =
InternalIndexerDBService::new(db_rw.reader, internal_indexer_db.unwrap());
InternalIndexerDBService::new(db_rw.reader, internal_indexer_db.unwrap(), update_receiver);
let db_indexer = indexer_service.get_db_indexer();
// Spawn task for db indexer
let config_clone = config.to_owned();
Expand Down
4 changes: 3 additions & 1 deletion execution/executor/tests/internal_indexer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ fn test_db_indexer_data() {
assert_eq!(version, None);
let mut start_version = version.map_or(0, |v| v + 1);
while start_version < total_version {
start_version = db_indexer.process_a_batch(start_version).unwrap();
start_version = db_indexer
.process_a_batch(start_version, total_version)
.unwrap();
}
// wait for the commit to finish
thread::sleep(Duration::from_millis(100));
Expand Down
1 change: 1 addition & 0 deletions storage/aptosdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ rayon = { workspace = true }
serde = { workspace = true }
static_assertions = { workspace = true }
status-line = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
aptos-executor-types = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions storage/aptosdb/src/db/include/aptosdb_internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl AptosDB {
commit_lock: std::sync::Mutex::new(()),
indexer: None,
skip_index_and_usage,
update_subscriber: None,
}
}

Expand Down
8 changes: 8 additions & 0 deletions storage/aptosdb/src/db/include/aptosdb_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,14 @@ impl AptosDB {

COMMITTED_TXNS.inc_by(num_txns);
LATEST_TXN_VERSION.set(version as i64);
if self.update_subscriber.is_some() {
self.update_subscriber
.as_ref()
.unwrap()
.send(version).map_err(| _ | {
AptosDbError::Other(format!("Failed to send update to subscriber: {}", version))
})?;
}
// Activate the ledger pruner and state kv pruner.
// Note the state merkle pruner is activated when state snapshots are persisted
// in their async thread.
Expand Down
Loading

0 comments on commit 4959977

Please sign in to comment.