Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace sleep with notification from aptosdb #14756

Merged
merged 2 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 39 additions & 27 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,37 +296,49 @@ impl Context {
&self,
) -> Result<LedgerInfo, E> {
if let Some(indexer_reader) = self.indexer_reader.as_ref() {
if let Some(latest_version) = indexer_reader
.get_latest_internal_indexer_ledger_version()
.map_err(|err| {
E::service_unavailable_with_code_no_info(err, AptosErrorCode::InternalError)
})?
{
let (_, _, new_block_event) = self
.db
.get_block_info_by_version(latest_version)
.map_err(|_| {
E::service_unavailable_with_code_no_info(
"Failed to get block",
AptosErrorCode::InternalError,
)
})?;
let (oldest_version, oldest_block_height) =
self.get_oldest_version_and_block_height()?;
return Ok(LedgerInfo::new_ledger_info(
&self.chain_id(),
new_block_event.epoch(),
latest_version,
oldest_version,
oldest_block_height,
new_block_event.height(),
new_block_event.proposed_time(),
));
if indexer_reader.is_internal_indexer_enabled() {
if let Some(mut latest_version) = indexer_reader
.get_latest_internal_indexer_ledger_version()
.map_err(|err| {
E::service_unavailable_with_code_no_info(err, AptosErrorCode::InternalError)
})?
{
// The internal indexer version can be ahead of the storage committed version since it syncs to db's latest synced version
let last_storage_version =
self.get_latest_storage_ledger_info()?.ledger_version.0;
latest_version = std::cmp::min(latest_version, last_storage_version);
let (_, block_end_version, new_block_event) = self
.db
.get_block_info_by_version(latest_version)
.map_err(|_| {
E::service_unavailable_with_code_no_info(
"Failed to get block",
AptosErrorCode::InternalError,
)
})?;
let (oldest_version, oldest_block_height) =
self.get_oldest_version_and_block_height()?;
return Ok(LedgerInfo::new_ledger_info(
&self.chain_id(),
new_block_event.epoch(),
block_end_version,
oldest_version,
oldest_block_height,
new_block_event.height(),
new_block_event.proposed_time(),
));
} else {
// Indexer doesn't have data yet as DB is boostrapping.
return Err(E::service_unavailable_with_code_no_info(
"DB is bootstrapping",
AptosErrorCode::InternalError,
));
}
}
}

Err(E::service_unavailable_with_code_no_info(
"Indexer reader doesn't exist, or doesn't have data.",
"Indexer reader doesn't exist",
AptosErrorCode::InternalError,
))
}
Expand Down
13 changes: 8 additions & 5 deletions api/test-context/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use aptos_types::{
ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
transaction::{
signature_verified_transaction::into_signature_verified_block, Transaction,
TransactionPayload, TransactionStatus,
TransactionPayload, TransactionStatus, Version,
},
};
use aptos_vm::AptosVM;
Expand All @@ -53,6 +53,7 @@ use hyper::{HeaderMap, Response};
use rand::SeedableRng;
use serde_json::{json, Value};
use std::{boxed::Box, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use tokio::sync::watch::channel;
use warp::{http::header::CONTENT_TYPE, Filter, Rejection, Reply};
use warp_reverse_proxy::reverse_proxy_filter;

Expand Down Expand Up @@ -118,12 +119,14 @@ pub fn new_test_context(
let (root_key, genesis, genesis_waypoint, validators) = builder.build(&mut rng).unwrap();
let (validator_identity, _, _, _) = validators[0].get_key_objects(None).unwrap();
let validator_owner = validator_identity.account_address.unwrap();

let (sender, recver) = channel::<Version>(0);
let (db, db_rw) = if use_db_with_indexer {
DbReaderWriter::wrap(AptosDB::new_for_test_with_indexer(
let mut aptos_db = AptosDB::new_for_test_with_indexer(
&tmp_dir,
node_config.storage.rocksdb_configs.enable_storage_sharding,
))
);
aptos_db.add_version_update_subscriber(sender).unwrap();
DbReaderWriter::wrap(aptos_db)
} else {
DbReaderWriter::wrap(
AptosDB::open(
Expand Down Expand Up @@ -155,7 +158,7 @@ pub fn new_test_context(
.storage
.set_data_dir(tmp_dir.path().to_path_buf());
let mock_indexer_service =
MockInternalIndexerDBService::new_for_test(db_rw.reader.clone(), &node_config);
MockInternalIndexerDBService::new_for_test(db_rw.reader.clone(), &node_config, recver);

let context = Context::new(
ChainId::test(),
Expand Down
10 changes: 8 additions & 2 deletions aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ pub fn setup_environment_and_start_node(
let mut admin_service = services::start_admin_service(&node_config);

// Set up the storage database and any RocksDB checkpoints
let (db_rw, backup_service, genesis_waypoint, indexer_db_opt) =
let (db_rw, backup_service, genesis_waypoint, indexer_db_opt, update_receiver) =
storage::initialize_database_and_checkpoints(&mut node_config)?;

admin_service.set_aptos_db(db_rw.clone().into());
Expand Down Expand Up @@ -687,7 +687,13 @@ pub fn setup_environment_and_start_node(
indexer_runtime,
indexer_grpc_runtime,
internal_indexer_db_runtime,
) = services::bootstrap_api_and_indexer(&node_config, db_rw.clone(), chain_id, indexer_db_opt)?;
) = services::bootstrap_api_and_indexer(
&node_config,
db_rw.clone(),
chain_id,
indexer_db_opt,
update_receiver,
)?;

// Create mempool and get the consensus to mempool sender
let (mempool_runtime, consensus_to_mempool_sender) =
Expand Down
20 changes: 14 additions & 6 deletions aptos-node/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ use aptos_types::{chain_id::ChainId, indexer::indexer_db_reader::IndexerReader};
use aptos_validator_transaction_pool::VTxnPoolState;
use futures::channel::{mpsc, mpsc::Sender};
use std::{sync::Arc, time::Instant};
use tokio::runtime::{Handle, Runtime};
use tokio::{
runtime::{Handle, Runtime},
sync::watch::Receiver as WatchReceiver,
};

const AC_SMP_CHANNEL_BUFFER_SIZE: usize = 1_024;
const INTRA_NODE_CHANNEL_BUFFER_SIZE: usize = 1;
Expand All @@ -46,6 +49,7 @@ pub fn bootstrap_api_and_indexer(
db_rw: DbReaderWriter,
chain_id: ChainId,
internal_indexer_db: Option<InternalIndexerDB>,
update_receiver: Option<WatchReceiver<u64>>,
) -> anyhow::Result<(
Receiver<MempoolClientRequest>,
Option<Runtime>,
Expand All @@ -68,11 +72,15 @@ pub fn bootstrap_api_and_indexer(
None => (None, None),
};

let (db_indexer_runtime, txn_event_reader) =
match bootstrap_internal_indexer_db(node_config, db_rw.clone(), internal_indexer_db) {
Some((runtime, db_indexer)) => (Some(runtime), Some(db_indexer)),
None => (None, None),
};
let (db_indexer_runtime, txn_event_reader) = match bootstrap_internal_indexer_db(
node_config,
db_rw.clone(),
internal_indexer_db,
update_receiver,
) {
Some((runtime, db_indexer)) => (Some(runtime), Some(db_indexer)),
None => (None, None),
};

let indexer_readers = IndexerReaders::new(indexer_async_v2, txn_event_reader);

Expand Down
104 changes: 63 additions & 41 deletions aptos-node/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ use aptos_executor::db_bootstrapper::maybe_bootstrap;
use aptos_indexer_grpc_table_info::internal_indexer_db_service::InternalIndexerDBService;
use aptos_logger::{debug, info};
use aptos_storage_interface::{DbReader, DbReaderWriter};
use aptos_types::{ledger_info::LedgerInfoWithSignatures, waypoint::Waypoint};
use aptos_types::{
ledger_info::LedgerInfoWithSignatures, transaction::Version, waypoint::Waypoint,
};
use aptos_vm::AptosVM;
use either::Either;
use std::{fs, path::Path, sync::Arc, time::Instant};
use tokio::runtime::Runtime;
use tokio::{
runtime::Runtime,
sync::watch::{channel, Receiver as WatchReceiver},
};

pub(crate) fn maybe_apply_genesis(
db_rw: &DbReaderWriter,
Expand Down Expand Up @@ -45,46 +50,60 @@ pub(crate) fn bootstrap_db(
DbReaderWriter,
Option<Runtime>,
Option<InternalIndexerDB>,
Option<WatchReceiver<u64>>,
)> {
let internal_indexer_db = InternalIndexerDBService::get_indexer_db(node_config);
let (aptos_db_reader, db_rw, backup_service) =
match FastSyncStorageWrapper::initialize_dbs(node_config, internal_indexer_db.clone())? {
Either::Left(db) => {
let (db_arc, db_rw) = DbReaderWriter::wrap(db);
let db_backup_service = start_backup_service(
node_config.storage.backup_service_address,
db_arc.clone(),
);
maybe_apply_genesis(&db_rw, node_config)?;
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
Either::Right(fast_sync_db_wrapper) => {
let temp_db = fast_sync_db_wrapper.get_temporary_db_with_genesis();
maybe_apply_genesis(&DbReaderWriter::from_arc(temp_db), node_config)?;
let (db_arc, db_rw) = DbReaderWriter::wrap(fast_sync_db_wrapper);
let fast_sync_db = db_arc.get_fast_sync_db();
// FastSyncDB requires ledger info at epoch 0 to establish provenance to genesis
let ledger_info = db_arc
.get_temporary_db_with_genesis()
.get_epoch_ending_ledger_info(0)
.expect("Genesis ledger info must exist");

if fast_sync_db
.get_latest_ledger_info_option()
.expect("should returns Ok results")
.is_none()
{
// it means the DB is empty and we need to
// commit the genesis ledger info to the DB.
fast_sync_db.commit_genesis_ledger_info(&ledger_info)?;
}

let db_backup_service =
start_backup_service(node_config.storage.backup_service_address, fast_sync_db);
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
};
Ok((aptos_db_reader, db_rw, backup_service, internal_indexer_db))
let (update_sender, update_receiver) = if internal_indexer_db.is_some() {
let (sender, receiver) = channel::<u64>(0);
(Some(sender), Some(receiver))
} else {
(None, None)
};

let (aptos_db_reader, db_rw, backup_service) = match FastSyncStorageWrapper::initialize_dbs(
node_config,
internal_indexer_db.clone(),
update_sender,
)? {
Either::Left(db) => {
let (db_arc, db_rw) = DbReaderWriter::wrap(db);
let db_backup_service =
start_backup_service(node_config.storage.backup_service_address, db_arc.clone());
maybe_apply_genesis(&db_rw, node_config)?;
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
Either::Right(fast_sync_db_wrapper) => {
let temp_db = fast_sync_db_wrapper.get_temporary_db_with_genesis();
maybe_apply_genesis(&DbReaderWriter::from_arc(temp_db), node_config)?;
let (db_arc, db_rw) = DbReaderWriter::wrap(fast_sync_db_wrapper);
let fast_sync_db = db_arc.get_fast_sync_db();
// FastSyncDB requires ledger info at epoch 0 to establish provenance to genesis
let ledger_info = db_arc
.get_temporary_db_with_genesis()
.get_epoch_ending_ledger_info(0)
.expect("Genesis ledger info must exist");

if fast_sync_db
.get_latest_ledger_info_option()
.expect("should returns Ok results")
.is_none()
{
// it means the DB is empty and we need to
// commit the genesis ledger info to the DB.
fast_sync_db.commit_genesis_ledger_info(&ledger_info)?;
}
let db_backup_service =
start_backup_service(node_config.storage.backup_service_address, fast_sync_db);
(db_arc as Arc<dyn DbReader>, db_rw, Some(db_backup_service))
},
};
Ok((
aptos_db_reader,
db_rw,
backup_service,
internal_indexer_db,
update_receiver,
))
}

/// In consensus-only mode, return a in-memory based [FakeAptosDB] and
Expand Down Expand Up @@ -157,6 +176,7 @@ pub fn initialize_database_and_checkpoints(
Option<Runtime>,
Waypoint,
Option<InternalIndexerDB>,
Option<WatchReceiver<Version>>,
)> {
// If required, create RocksDB checkpoints and change the working directory.
// This is test-only.
Expand All @@ -166,7 +186,8 @@ pub fn initialize_database_and_checkpoints(

// Open the database
let instant = Instant::now();
let (_aptos_db, db_rw, backup_service, indexer_db_opt) = bootstrap_db(node_config)?;
let (_aptos_db, db_rw, backup_service, indexer_db_opt, update_receiver) =
bootstrap_db(node_config)?;

// Log the duration to open storage
debug!(
Expand All @@ -179,5 +200,6 @@ pub fn initialize_database_and_checkpoints(
backup_service,
node_config.base.waypoint.genesis_waypoint(),
indexer_db_opt,
update_receiver,
))
}
1 change: 0 additions & 1 deletion crates/aptos-faucet/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
tokio = { workspace = true }
url = { workspace = true }

[features]
integration-tests = []
Loading
Loading