Skip to content

Commit

Permalink
sleep shorter and return 0 instead of throwing error
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed Sep 26, 2024
1 parent 8689b9a commit f9f13ea
Show file tree
Hide file tree
Showing 14 changed files with 200 additions and 113 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 40 additions & 27 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,37 +296,50 @@ impl Context {
&self,
) -> Result<LedgerInfo, E> {
if let Some(indexer_reader) = self.indexer_reader.as_ref() {
if let Some(latest_version) = indexer_reader
.get_latest_internal_indexer_ledger_version()
.map_err(|err| {
E::service_unavailable_with_code_no_info(err, AptosErrorCode::InternalError)
})?
{
let (_, _, new_block_event) = self
.db
.get_block_info_by_version(latest_version)
.map_err(|_| {
E::service_unavailable_with_code_no_info(
"Failed to get block",
AptosErrorCode::InternalError,
)
})?;
let (oldest_version, oldest_block_height) =
self.get_oldest_version_and_block_height()?;
return Ok(LedgerInfo::new_ledger_info(
&self.chain_id(),
new_block_event.epoch(),
latest_version,
oldest_version,
oldest_block_height,
new_block_event.height(),
new_block_event.proposed_time(),
));
if indexer_reader.is_internal_indexer_enabled() {
if let Some(mut latest_version) = indexer_reader
.get_latest_internal_indexer_ledger_version()
.map_err(|err| {
E::service_unavailable_with_code_no_info(err, AptosErrorCode::InternalError)
})?
{
// The internal indexer version can be ahead of the storage committed version since it syncs to db's latest synced version
let last_storage_version =
self.get_latest_storage_ledger_info()?.ledger_version.0;
latest_version = std::cmp::min(latest_version, last_storage_version);

let (_, _, new_block_event) = self
.db
.get_block_info_by_version(latest_version)
.map_err(|_| {
E::service_unavailable_with_code_no_info(
"Failed to get block",
AptosErrorCode::InternalError,
)
})?;
let (oldest_version, oldest_block_height) =
self.get_oldest_version_and_block_height()?;
return Ok(LedgerInfo::new_ledger_info(
&self.chain_id(),
new_block_event.epoch(),
latest_version,
oldest_version,
oldest_block_height,
new_block_event.height(),
new_block_event.proposed_time(),
));
} else {
// Indexer doesn't have data yet as DB is boostrapping.
return Err(E::service_unavailable_with_code_no_info(
"DB is bootstrapping",
AptosErrorCode::InternalError,
));
}
}
}

Err(E::service_unavailable_with_code_no_info(
"Indexer reader doesn't exist, or doesn't have data.",
"Indexer reader doesn't exist",
AptosErrorCode::InternalError,
))
}
Expand Down
49 changes: 26 additions & 23 deletions api/test-context/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use aptos_types::{
ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
transaction::{
signature_verified_transaction::into_signature_verified_block, Transaction,
TransactionPayload, TransactionStatus,
TransactionPayload, TransactionStatus, Version,
},
};
use aptos_vm::AptosVM;
Expand All @@ -53,6 +53,7 @@ use hyper::{HeaderMap, Response};
use rand::SeedableRng;
use serde_json::{json, Value};
use std::{boxed::Box, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use tokio::sync::watch::channel;
use warp::{http::header::CONTENT_TYPE, Filter, Rejection, Reply};
use warp_reverse_proxy::reverse_proxy_filter;

Expand Down Expand Up @@ -118,32 +119,34 @@ pub fn new_test_context(
let (root_key, genesis, genesis_waypoint, validators) = builder.build(&mut rng).unwrap();
let (validator_identity, _, _, _) = validators[0].get_key_objects(None).unwrap();
let validator_owner = validator_identity.account_address.unwrap();

let (sender, recver) = channel::<Version>(0);
let (db, db_rw) = if use_db_with_indexer {
DbReaderWriter::wrap(AptosDB::new_for_test_with_indexer(
let mut aptos_db = AptosDB::new_for_test_with_indexer(
&tmp_dir,
node_config.storage.rocksdb_configs.enable_storage_sharding,
))
);
aptos_db.add_version_update_subscriber(sender).unwrap();
DbReaderWriter::wrap(aptos_db)
} else {
DbReaderWriter::wrap(
AptosDB::open(
StorageDirPaths::from_path(&tmp_dir),
false, /* readonly */
NO_OP_STORAGE_PRUNER_CONFIG, /* pruner */
RocksdbConfigs {
enable_storage_sharding: node_config
.storage
.rocksdb_configs
.enable_storage_sharding,
..Default::default()
},
false, /* indexer */
BUFFERED_STATE_TARGET_ITEMS_FOR_TEST,
DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD,
None,
)
.unwrap(),
let mut aptos_db = AptosDB::open(
StorageDirPaths::from_path(&tmp_dir),
false, /* readonly */
NO_OP_STORAGE_PRUNER_CONFIG, /* pruner */
RocksdbConfigs {
enable_storage_sharding: node_config
.storage
.rocksdb_configs
.enable_storage_sharding,
..Default::default()
},
false, /* indexer */
BUFFERED_STATE_TARGET_ITEMS_FOR_TEST,
DEFAULT_MAX_NUM_NODES_PER_LRU_CACHE_SHARD,
None,
)
.unwrap();
aptos_db.add_version_update_subscriber(sender).unwrap();
DbReaderWriter::wrap(aptos_db)
};
let ret =
db_bootstrapper::maybe_bootstrap::<AptosVM>(&db_rw, &genesis, genesis_waypoint).unwrap();
Expand All @@ -155,7 +158,7 @@ pub fn new_test_context(
.storage
.set_data_dir(tmp_dir.path().to_path_buf());
let mock_indexer_service =
MockInternalIndexerDBService::new_for_test(db_rw.reader.clone(), &node_config);
MockInternalIndexerDBService::new_for_test(db_rw.reader.clone(), &node_config, recver);

let context = Context::new(
ChainId::test(),
Expand Down
10 changes: 8 additions & 2 deletions aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ pub fn setup_environment_and_start_node(
let mut admin_service = services::start_admin_service(&node_config);

// Set up the storage database and any RocksDB checkpoints
let (db_rw, backup_service, genesis_waypoint, indexer_db_opt) =
let (db_rw, backup_service, genesis_waypoint, indexer_db_opt, update_receiver) =
storage::initialize_database_and_checkpoints(&mut node_config)?;

admin_service.set_aptos_db(db_rw.clone().into());
Expand Down Expand Up @@ -687,7 +687,13 @@ pub fn setup_environment_and_start_node(
indexer_runtime,
indexer_grpc_runtime,
internal_indexer_db_runtime,
) = services::bootstrap_api_and_indexer(&node_config, db_rw.clone(), chain_id, indexer_db_opt)?;
) = services::bootstrap_api_and_indexer(
&node_config,
db_rw.clone(),
chain_id,
indexer_db_opt,
update_receiver,
)?;

// Create mempool and get the consensus to mempool sender
let (mempool_runtime, consensus_to_mempool_sender) =
Expand Down
20 changes: 14 additions & 6 deletions aptos-node/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ use aptos_types::{chain_id::ChainId, indexer::indexer_db_reader::IndexerReader};
use aptos_validator_transaction_pool::VTxnPoolState;
use futures::channel::{mpsc, mpsc::Sender};
use std::{sync::Arc, time::Instant};
use tokio::runtime::{Handle, Runtime};
use tokio::{
runtime::{Handle, Runtime},
sync::watch::Receiver as WatchReceiver,
};

const AC_SMP_CHANNEL_BUFFER_SIZE: usize = 1_024;
const INTRA_NODE_CHANNEL_BUFFER_SIZE: usize = 1;
Expand All @@ -46,6 +49,7 @@ pub fn bootstrap_api_and_indexer(
db_rw: DbReaderWriter,
chain_id: ChainId,
internal_indexer_db: Option<InternalIndexerDB>,
update_receiver: Option<WatchReceiver<u64>>,
) -> anyhow::Result<(
Receiver<MempoolClientRequest>,
Option<Runtime>,
Expand All @@ -68,11 +72,15 @@ pub fn bootstrap_api_and_indexer(
None => (None, None),
};

let (db_indexer_runtime, txn_event_reader) =
match bootstrap_internal_indexer_db(node_config, db_rw.clone(), internal_indexer_db) {
Some((runtime, db_indexer)) => (Some(runtime), Some(db_indexer)),
None => (None, None),
};
let (db_indexer_runtime, txn_event_reader) = match bootstrap_internal_indexer_db(
node_config,
db_rw.clone(),
internal_indexer_db,
update_receiver,
) {
Some((runtime, db_indexer)) => (Some(runtime), Some(db_indexer)),
None => (None, None),
};

let indexer_readers = IndexerReaders::new(indexer_async_v2, txn_event_reader);

Expand Down
104 changes: 63 additions & 41 deletions aptos-node/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ use aptos_executor::db_bootstrapper::maybe_bootstrap;
use aptos_indexer_grpc_table_info::internal_indexer_db_service::InternalIndexerDBService;
use aptos_logger::{debug, info};
use aptos_storage_interface::{DbReader, DbReaderWriter};
use aptos_types::{ledger_info::LedgerInfoWithSignatures, waypoint::Waypoint};
use aptos_types::{
ledger_info::LedgerInfoWithSignatures, transaction::Version, waypoint::Waypoint,
};
use aptos_vm::AptosVM;
use either::Either;
use std::{fs, path::Path, sync::Arc, time::Instant};
use tokio::runtime::Runtime;
use tokio::{
runtime::Runtime,
sync::watch::{channel, Receiver as WatchReceiver},
};

pub(crate) fn maybe_apply_genesis(
db_rw: &DbReaderWriter,
Expand Down Expand Up @@ -45,46 +50,60 @@ pub(crate) fn bootstrap_db(
DbReaderWriter,
Option<Runtime>,
Option<InternalIndexerDB>,
Option<WatchReceiver<u64>>,
)> {
let internal_indexer_db = InternalIndexerDBService::get_indexer_db(node_config);
let (aptos_db_reader, db_rw, backup_service) =
match FastSyncStorageWrapper::initialize_dbs(node_config, internal_indexer_db.clone())? {
Either::Left(db) => {
let (db_arc, db_rw) = DbReaderWriter::wrap(db);
let db_backup_service = start_backup_service(
node_config.storage.backup_service_address,
db_arc.clone(),
);
maybe_apply_genesis(&db_rw, node_config)?;
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
Either::Right(fast_sync_db_wrapper) => {
let temp_db = fast_sync_db_wrapper.get_temporary_db_with_genesis();
maybe_apply_genesis(&DbReaderWriter::from_arc(temp_db), node_config)?;
let (db_arc, db_rw) = DbReaderWriter::wrap(fast_sync_db_wrapper);
let fast_sync_db = db_arc.get_fast_sync_db();
// FastSyncDB requires ledger info at epoch 0 to establish provenance to genesis
let ledger_info = db_arc
.get_temporary_db_with_genesis()
.get_epoch_ending_ledger_info(0)
.expect("Genesis ledger info must exist");

if fast_sync_db
.get_latest_ledger_info_option()
.expect("should returns Ok results")
.is_none()
{
// it means the DB is empty and we need to
// commit the genesis ledger info to the DB.
fast_sync_db.commit_genesis_ledger_info(&ledger_info)?;
}

let db_backup_service =
start_backup_service(node_config.storage.backup_service_address, fast_sync_db);
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
};
Ok((aptos_db_reader, db_rw, backup_service, internal_indexer_db))
let (update_sender, update_receiver) = if internal_indexer_db.is_some() {
let (sender, receiver) = channel::<u64>(0);
(Some(sender), Some(receiver))
} else {
(None, None)
};

let (aptos_db_reader, db_rw, backup_service) = match FastSyncStorageWrapper::initialize_dbs(
node_config,
internal_indexer_db.clone(),
update_sender,
)? {
Either::Left(db) => {
let (db_arc, db_rw) = DbReaderWriter::wrap(db);
let db_backup_service =
start_backup_service(node_config.storage.backup_service_address, db_arc.clone());
maybe_apply_genesis(&db_rw, node_config)?;
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
Either::Right(fast_sync_db_wrapper) => {
let temp_db = fast_sync_db_wrapper.get_temporary_db_with_genesis();
maybe_apply_genesis(&DbReaderWriter::from_arc(temp_db), node_config)?;
let (db_arc, db_rw) = DbReaderWriter::wrap(fast_sync_db_wrapper);
let fast_sync_db = db_arc.get_fast_sync_db();
// FastSyncDB requires ledger info at epoch 0 to establish provenance to genesis
let ledger_info = db_arc
.get_temporary_db_with_genesis()
.get_epoch_ending_ledger_info(0)
.expect("Genesis ledger info must exist");

if fast_sync_db
.get_latest_ledger_info_option()
.expect("should returns Ok results")
.is_none()
{
// it means the DB is empty and we need to
// commit the genesis ledger info to the DB.
fast_sync_db.commit_genesis_ledger_info(&ledger_info)?;
}
let db_backup_service =
start_backup_service(node_config.storage.backup_service_address, fast_sync_db);
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
};
Ok((
aptos_db_reader,
db_rw,
backup_service,
internal_indexer_db,
update_receiver,
))
}

/// In consensus-only mode, return a in-memory based [FakeAptosDB] and
Expand Down Expand Up @@ -157,6 +176,7 @@ pub fn initialize_database_and_checkpoints(
Option<Runtime>,
Waypoint,
Option<InternalIndexerDB>,
Option<WatchReceiver<Version>>,
)> {
// If required, create RocksDB checkpoints and change the working directory.
// This is test-only.
Expand All @@ -166,7 +186,8 @@ pub fn initialize_database_and_checkpoints(

// Open the database
let instant = Instant::now();
let (_aptos_db, db_rw, backup_service, indexer_db_opt) = bootstrap_db(node_config)?;
let (_aptos_db, db_rw, backup_service, indexer_db_opt, update_receiver) =
bootstrap_db(node_config)?;

// Log the duration to open storage
debug!(
Expand All @@ -179,5 +200,6 @@ pub fn initialize_database_and_checkpoints(
backup_service,
node_config.base.waypoint.genesis_waypoint(),
indexer_db_opt,
update_receiver,
))
}
Loading

0 comments on commit f9f13ea

Please sign in to comment.