Skip to content

Commit

Permalink
sleep shorter and return 0 instead of throwing error
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed Sep 26, 2024
1 parent 8689b9a commit 0015e2c
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 111 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 39 additions & 27 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,37 +296,49 @@ impl Context {
&self,
) -> Result<LedgerInfo, E> {
if let Some(indexer_reader) = self.indexer_reader.as_ref() {
if let Some(latest_version) = indexer_reader
.get_latest_internal_indexer_ledger_version()
.map_err(|err| {
E::service_unavailable_with_code_no_info(err, AptosErrorCode::InternalError)
})?
{
let (_, _, new_block_event) = self
.db
.get_block_info_by_version(latest_version)
.map_err(|_| {
E::service_unavailable_with_code_no_info(
"Failed to get block",
AptosErrorCode::InternalError,
)
})?;
let (oldest_version, oldest_block_height) =
self.get_oldest_version_and_block_height()?;
return Ok(LedgerInfo::new_ledger_info(
&self.chain_id(),
new_block_event.epoch(),
latest_version,
oldest_version,
oldest_block_height,
new_block_event.height(),
new_block_event.proposed_time(),
));
if indexer_reader.is_internal_indexer_enabled() {
if let Some(mut latest_version) = indexer_reader
.get_latest_internal_indexer_ledger_version()
.map_err(|err| {
E::service_unavailable_with_code_no_info(err, AptosErrorCode::InternalError)
})?
{
// The internal indexer version can be ahead of the storage committed version since it syncs to db's latest synced version
let last_storage_version =
self.get_latest_storage_ledger_info()?.ledger_version.0;
latest_version = std::cmp::min(latest_version, last_storage_version);

let (_, _, new_block_event) = self
.db
.get_block_info_by_version(latest_version)
.map_err(|_| {
E::service_unavailable_with_code_no_info(
"Failed to get block",
AptosErrorCode::InternalError,
)
})?;
let (oldest_version, oldest_block_height) =
self.get_oldest_version_and_block_height()?;
return Ok(LedgerInfo::new_ledger_info(
&self.chain_id(),
new_block_event.epoch(),
latest_version,
oldest_version,
oldest_block_height,
new_block_event.height(),
new_block_event.proposed_time(),
));
} else {
return Err(E::service_unavailable_with_code_no_info(
"Indexer reader doesn't have data.",
AptosErrorCode::InternalError,
));
}
}
}

Err(E::service_unavailable_with_code_no_info(
"Indexer reader doesn't exist, or doesn't have data.",
"Indexer reader doesn't exist",
AptosErrorCode::InternalError,
))
}
Expand Down
10 changes: 8 additions & 2 deletions aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ pub fn setup_environment_and_start_node(
let mut admin_service = services::start_admin_service(&node_config);

// Set up the storage database and any RocksDB checkpoints
let (db_rw, backup_service, genesis_waypoint, indexer_db_opt) =
let (db_rw, backup_service, genesis_waypoint, indexer_db_opt, update_receiver) =
storage::initialize_database_and_checkpoints(&mut node_config)?;

admin_service.set_aptos_db(db_rw.clone().into());
Expand Down Expand Up @@ -687,7 +687,13 @@ pub fn setup_environment_and_start_node(
indexer_runtime,
indexer_grpc_runtime,
internal_indexer_db_runtime,
) = services::bootstrap_api_and_indexer(&node_config, db_rw.clone(), chain_id, indexer_db_opt)?;
) = services::bootstrap_api_and_indexer(
&node_config,
db_rw.clone(),
chain_id,
indexer_db_opt,
update_receiver,
)?;

// Create mempool and get the consensus to mempool sender
let (mempool_runtime, consensus_to_mempool_sender) =
Expand Down
20 changes: 14 additions & 6 deletions aptos-node/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ use aptos_types::{chain_id::ChainId, indexer::indexer_db_reader::IndexerReader};
use aptos_validator_transaction_pool::VTxnPoolState;
use futures::channel::{mpsc, mpsc::Sender};
use std::{sync::Arc, time::Instant};
use tokio::runtime::{Handle, Runtime};
use tokio::{
runtime::{Handle, Runtime},
sync::watch::Receiver as WatchReceiver,
};

const AC_SMP_CHANNEL_BUFFER_SIZE: usize = 1_024;
const INTRA_NODE_CHANNEL_BUFFER_SIZE: usize = 1;
Expand All @@ -46,6 +49,7 @@ pub fn bootstrap_api_and_indexer(
db_rw: DbReaderWriter,
chain_id: ChainId,
internal_indexer_db: Option<InternalIndexerDB>,
update_receiver: Option<WatchReceiver<u64>>,
) -> anyhow::Result<(
Receiver<MempoolClientRequest>,
Option<Runtime>,
Expand All @@ -68,11 +72,15 @@ pub fn bootstrap_api_and_indexer(
None => (None, None),
};

let (db_indexer_runtime, txn_event_reader) =
match bootstrap_internal_indexer_db(node_config, db_rw.clone(), internal_indexer_db) {
Some((runtime, db_indexer)) => (Some(runtime), Some(db_indexer)),
None => (None, None),
};
let (db_indexer_runtime, txn_event_reader) = match bootstrap_internal_indexer_db(
node_config,
db_rw.clone(),
internal_indexer_db,
update_receiver,
) {
Some((runtime, db_indexer)) => (Some(runtime), Some(db_indexer)),
None => (None, None),
};

let indexer_readers = IndexerReaders::new(indexer_async_v2, txn_event_reader);

Expand Down
104 changes: 63 additions & 41 deletions aptos-node/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ use aptos_executor::db_bootstrapper::maybe_bootstrap;
use aptos_indexer_grpc_table_info::internal_indexer_db_service::InternalIndexerDBService;
use aptos_logger::{debug, info};
use aptos_storage_interface::{DbReader, DbReaderWriter};
use aptos_types::{ledger_info::LedgerInfoWithSignatures, waypoint::Waypoint};
use aptos_types::{
ledger_info::LedgerInfoWithSignatures, transaction::Version, waypoint::Waypoint,
};
use aptos_vm::AptosVM;
use either::Either;
use std::{fs, path::Path, sync::Arc, time::Instant};
use tokio::runtime::Runtime;
use tokio::{
runtime::Runtime,
sync::watch::{channel, Receiver as WatchReceiver},
};

pub(crate) fn maybe_apply_genesis(
db_rw: &DbReaderWriter,
Expand Down Expand Up @@ -45,46 +50,60 @@ pub(crate) fn bootstrap_db(
DbReaderWriter,
Option<Runtime>,
Option<InternalIndexerDB>,
Option<WatchReceiver<u64>>,
)> {
let internal_indexer_db = InternalIndexerDBService::get_indexer_db(node_config);
let (aptos_db_reader, db_rw, backup_service) =
match FastSyncStorageWrapper::initialize_dbs(node_config, internal_indexer_db.clone())? {
Either::Left(db) => {
let (db_arc, db_rw) = DbReaderWriter::wrap(db);
let db_backup_service = start_backup_service(
node_config.storage.backup_service_address,
db_arc.clone(),
);
maybe_apply_genesis(&db_rw, node_config)?;
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
Either::Right(fast_sync_db_wrapper) => {
let temp_db = fast_sync_db_wrapper.get_temporary_db_with_genesis();
maybe_apply_genesis(&DbReaderWriter::from_arc(temp_db), node_config)?;
let (db_arc, db_rw) = DbReaderWriter::wrap(fast_sync_db_wrapper);
let fast_sync_db = db_arc.get_fast_sync_db();
// FastSyncDB requires ledger info at epoch 0 to establish provenance to genesis
let ledger_info = db_arc
.get_temporary_db_with_genesis()
.get_epoch_ending_ledger_info(0)
.expect("Genesis ledger info must exist");

if fast_sync_db
.get_latest_ledger_info_option()
.expect("should returns Ok results")
.is_none()
{
// it means the DB is empty and we need to
// commit the genesis ledger info to the DB.
fast_sync_db.commit_genesis_ledger_info(&ledger_info)?;
}

let db_backup_service =
start_backup_service(node_config.storage.backup_service_address, fast_sync_db);
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
};
Ok((aptos_db_reader, db_rw, backup_service, internal_indexer_db))
let (update_sender, update_receiver) = if internal_indexer_db.is_some() {
let (sender, receiver) = channel::<u64>(0);
(Some(sender), Some(receiver))
} else {
(None, None)
};

let (aptos_db_reader, db_rw, backup_service) = match FastSyncStorageWrapper::initialize_dbs(
node_config,
internal_indexer_db.clone(),
update_sender,
)? {
Either::Left(db) => {
let (db_arc, db_rw) = DbReaderWriter::wrap(db);
let db_backup_service =
start_backup_service(node_config.storage.backup_service_address, db_arc.clone());
maybe_apply_genesis(&db_rw, node_config)?;
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
Either::Right(fast_sync_db_wrapper) => {
let temp_db = fast_sync_db_wrapper.get_temporary_db_with_genesis();
maybe_apply_genesis(&DbReaderWriter::from_arc(temp_db), node_config)?;
let (db_arc, db_rw) = DbReaderWriter::wrap(fast_sync_db_wrapper);
let fast_sync_db = db_arc.get_fast_sync_db();
// FastSyncDB requires ledger info at epoch 0 to establish provenance to genesis
let ledger_info = db_arc
.get_temporary_db_with_genesis()
.get_epoch_ending_ledger_info(0)
.expect("Genesis ledger info must exist");

if fast_sync_db
.get_latest_ledger_info_option()
.expect("should returns Ok results")
.is_none()
{
// it means the DB is empty and we need to
// commit the genesis ledger info to the DB.
fast_sync_db.commit_genesis_ledger_info(&ledger_info)?;
}
let db_backup_service =
start_backup_service(node_config.storage.backup_service_address, fast_sync_db);
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
};
Ok((
aptos_db_reader,
db_rw,
backup_service,
internal_indexer_db,
update_receiver,
))
}

/// In consensus-only mode, return a in-memory based [FakeAptosDB] and
Expand Down Expand Up @@ -157,6 +176,7 @@ pub fn initialize_database_and_checkpoints(
Option<Runtime>,
Waypoint,
Option<InternalIndexerDB>,
Option<WatchReceiver<Version>>,
)> {
// If required, create RocksDB checkpoints and change the working directory.
// This is test-only.
Expand All @@ -166,7 +186,8 @@ pub fn initialize_database_and_checkpoints(

// Open the database
let instant = Instant::now();
let (_aptos_db, db_rw, backup_service, indexer_db_opt) = bootstrap_db(node_config)?;
let (_aptos_db, db_rw, backup_service, indexer_db_opt, update_receiver) =
bootstrap_db(node_config)?;

// Log the duration to open storage
debug!(
Expand All @@ -179,5 +200,6 @@ pub fn initialize_database_and_checkpoints(
backup_service,
node_config.base.waypoint.genesis_waypoint(),
indexer_db_opt,
update_receiver,
))
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,26 @@ use std::{
path::{Path, PathBuf},
sync::Arc,
};
use tokio::runtime::Handle;
use tokio::{runtime::Handle, sync::watch::Receiver as WatchReceiver};

const SERVICE_TYPE: &str = "internal_indexer_db_service";
const INTERNAL_INDEXER_DB: &str = "internal_indexer_db";

pub struct InternalIndexerDBService {
pub db_indexer: Arc<DBIndexer>,
pub update_receiver: Option<WatchReceiver<Version>>,
}

impl InternalIndexerDBService {
pub fn new(db_reader: Arc<dyn DbReader>, internal_indexer_db: InternalIndexerDB) -> Self {
pub fn new(
db_reader: Arc<dyn DbReader>,
internal_indexer_db: InternalIndexerDB,
update_receiver: Option<WatchReceiver<Version>>,
) -> Self {
let internal_db_indexer = Arc::new(DBIndexer::new(internal_indexer_db, db_reader));
Self {
db_indexer: internal_db_indexer,
update_receiver,
}
}

Expand Down Expand Up @@ -137,25 +143,34 @@ impl InternalIndexerDBService {

loop {
let start_time: std::time::Instant = std::time::Instant::now();
let next_version = self.db_indexer.process_a_batch(start_version)?;

if next_version == start_version {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
continue;
let end_version = self
.update_receiver
.as_mut()
.map(|receiver| *receiver.borrow_and_update())
.unwrap_or(self.db_indexer.main_db_reader.ensure_synced_version()?);
while start_version <= end_version {
let next_version = self
.db_indexer
.process_a_batch(start_version, end_version)?;
log_grpc_step(
SERVICE_TYPE,
IndexerGrpcStep::InternalIndexerDBProcessed,
Some(start_version as i64),
Some(next_version as i64),
None,
None,
Some(start_time.elapsed().as_secs_f64()),
None,
Some((next_version - start_version) as i64),
None,
);
start_version = next_version;
}
if let Some(receiver) = self.update_receiver.as_mut() {
receiver.changed().await?;
} else {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
log_grpc_step(
SERVICE_TYPE,
IndexerGrpcStep::InternalIndexerDBProcessed,
Some(start_version as i64),
Some(next_version as i64),
None,
None,
Some(start_time.elapsed().as_secs_f64()),
None,
Some((next_version - start_version) as i64),
None,
);
start_version = next_version;
}
}
}
Expand All @@ -179,7 +194,7 @@ impl MockInternalIndexerDBService {

let db = InternalIndexerDBService::get_indexer_db(node_config).unwrap();
let handle = Handle::current();
let mut internal_indexer_db_service = InternalIndexerDBService::new(db_reader, db);
let mut internal_indexer_db_service = InternalIndexerDBService::new(db_reader, db, None);
let db_indexer = internal_indexer_db_service.get_db_indexer();
let config_clone = node_config.to_owned();
handle.spawn(async move {
Expand Down
Loading

0 comments on commit 0015e2c

Please sign in to comment.