Skip to content

Commit

Permalink
[sharded] use internal indexer ledger version in API context
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed Jul 2, 2024
1 parent 075f189 commit ba05c27
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 8 deletions.
22 changes: 22 additions & 0 deletions api/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,28 @@ impl Account {
})
}

// Return the latest ledger info from internal indexer
pub fn new_with_internal_indexer(
context: Arc<Context>,
address: Address,
start: Option<StateKey>,
limit: Option<u16>,
) -> Result<Self, BasicErrorWith404> {
match context.get_latest_internal_indexer_ledger_version_and_info::<BasicErrorWith404>() {
Ok((latest_internal_indexer_ledger_info, latest_internal_indexer_ledger_version)) => {
Ok(Self {
context,
address,
ledger_version: latest_internal_indexer_ledger_version,
start,
limit,
latest_ledger_info: latest_internal_indexer_ledger_info,
})
},
Err(_) => Account::new(context, address, None, start, limit),
}
}

// These functions map directly to endpoint functions.

/// Retrieves the [`AccountData`] for the associated account
Expand Down
15 changes: 15 additions & 0 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,21 @@ impl Context {
Ok((latest_ledger_info, requested_ledger_version))
}

pub fn get_latest_internal_indexer_ledger_version_and_info<E: StdApiError>(
&self,
) -> Result<(LedgerInfo, Version), E> {
let latest_version = self
.indexer_reader
.as_ref()
.map(|e| e.get_latest_internal_indexer_ledger_version())
.transpose()
.map_err(|err| E::internal_with_code_no_info(err, AptosErrorCode::InternalError))?
.unwrap_or(0);
let mut latest_ledger_info = self.get_latest_ledger_info()?;
latest_ledger_info.ledger_version = aptos_api_types::U64(latest_version);
Ok((latest_ledger_info, latest_version))
}

pub fn get_latest_ledger_info_with_signatures(&self) -> Result<LedgerInfoWithSignatures> {
Ok(self.db.get_latest_ledger_info()?)
}
Expand Down
6 changes: 4 additions & 2 deletions api/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ impl EventsApi {
// Ensure that account exists
let api = self.clone();
api_spawn_blocking(move || {
let account = Account::new(api.context.clone(), address.0, None, None, None)?;
let account =
Account::new_with_internal_indexer(api.context.clone(), address.0, None, None)?;
account.verify_account_or_object_resource()?;
api.list(
account.latest_ledger_info,
Expand Down Expand Up @@ -144,7 +145,8 @@ impl EventsApi {

let api = self.clone();
api_spawn_blocking(move || {
let account = Account::new(api.context.clone(), address.0, None, None, None)?;
let account =
Account::new_with_internal_indexer(api.context.clone(), address.0, None, None)?;
let key = account.find_event_key(event_handle.0, field_name.0.into())?;
api.list(account.latest_ledger_info, accept_type, page, key)
})
Expand Down
3 changes: 2 additions & 1 deletion api/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,8 @@ impl TransactionsApi {
address: Address,
) -> BasicResultWith404<Vec<Transaction>> {
// Verify the account exists
let account = Account::new(self.context.clone(), address, None, None, None)?;
let account =
Account::new_with_internal_indexer(self.context.clone(), address, None, None)?;
account.get_account_resource()?;

let latest_ledger_info = account.latest_ledger_info;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use aptos_db_indexer::{
use aptos_indexer_grpc_utils::counters::{log_grpc_step, IndexerGrpcStep};
use aptos_schemadb::DB;
use aptos_storage_interface::DbReader;
use aptos_types::indexer::indexer_db_reader::IndexerReader;
use aptos_types::{indexer::indexer_db_reader::IndexerReader, transaction::Version};
use std::sync::Arc;
use tokio::runtime::Handle;

Expand Down Expand Up @@ -59,8 +59,36 @@ impl InternalIndexerDBService {
Arc::clone(&self.db_indexer)
}

pub async fn run(&mut self) {
let mut start_version = self.db_indexer.get_persisted_version().unwrap_or(0);
// If the main DB is opened and statekey index is on, we should panic since we will miss all statekeys during the fast sync bootstrap
// If node use fast sync, we should start from the fast synced version
pub fn get_start_version(&self, node_config: &NodeConfig) -> Version {
let indexer_version = self.db_indexer.get_persisted_version().unwrap_or(0);
let is_statekeys_empty = self.db_indexer.is_statekeys_empty().unwrap_or(true);
let db_version = self.db_indexer.get_main_db_latest_version().unwrap_or(0);
// internal indexer started after the state sync
if is_statekeys_empty
&& db_version > 0
&& self.db_indexer.statekeys_enabled()
&& node_config
.state_sync
.state_sync_driver
.bootstrapping_mode
.is_fast_sync()
{
panic!("Internal statekey indexer started after fast synced node running");
}

// During fast sync, we will have a 2nd db to store genesis and internal indexer will start with genesis processed
if indexer_version <= 1 && db_version > 0 {
db_version
} else {
indexer_version
}
}

pub async fn run(&mut self, node_config: &NodeConfig) {
let mut start_version = self.get_start_version(node_config);

loop {
let start_time: std::time::Instant = std::time::Instant::now();
let next_version = self
Expand Down Expand Up @@ -111,8 +139,9 @@ impl MockInternalIndexerDBService {
let mut internal_indexer_db_service =
InternalIndexerDBService::new(db_reader, node_config, db);
let db_indexer = internal_indexer_db_service.get_db_indexer();
let config_clone = node_config.to_owned();
handle.spawn(async move {
internal_indexer_db_service.run().await;
internal_indexer_db_service.run(&config_clone).await;
});
Self {
indexer_readers: IndexerReaders::new(None, Some(db_indexer)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ pub fn bootstrap_internal_indexer_db(
InternalIndexerDBService::new(db_rw.reader, config, internal_indexer_db.unwrap());
let db_indexer = indexer_service.get_db_indexer();
// Spawn task for db indexer
let config_clone = config.to_owned();
runtime.spawn(async move {
indexer_service.run().await;
indexer_service.run(&config_clone).await;
});

Some((runtime, db_indexer))
Expand Down
10 changes: 10 additions & 0 deletions storage/indexer/src/db_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ impl DBIndexer {
}
}

pub fn get_main_db_latest_version(&self) -> Result<Version> {
self.main_db_reader.get_synced_version()
}

pub fn ensure_cover_ledger_version(&self, ledger_version: Version) -> Result<()> {
let indexer_latest_version = self.get_persisted_version()?;
ensure!(
Expand Down Expand Up @@ -443,6 +447,12 @@ impl DBIndexer {
Ok(AccountTransactionsWithProof::new(txns_with_proofs))
}

pub fn is_statekeys_empty(&self) -> Result<bool> {
let mut iter = self.db.iter::<StateKeysSchema>()?;
iter.seek_to_first();
Ok(iter.next().is_none())
}

pub fn get_prefixed_state_value_iterator(
&self,
key_prefix: &StateKeyPrefix,
Expand Down
7 changes: 7 additions & 0 deletions storage/indexer/src/indexer_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ impl IndexerReader for IndexerReaders {
anyhow::bail!("Table info reader is not available")
}

fn get_latest_internal_indexer_ledger_version(&self) -> anyhow::Result<Version> {
if let Some(db_indexer) = &self.db_indexer_reader {
return Ok(db_indexer.get_persisted_version()?);
}
anyhow::bail!("DB indexer reader is not available")
}

fn get_events(
&self,
event_key: &EventKey,
Expand Down
2 changes: 2 additions & 0 deletions types/src/indexer/indexer_db_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@ pub trait IndexerReader: Send + Sync {
cursor: Option<&StateKey>,
version: Version,
) -> Result<Box<dyn Iterator<Item = Result<(StateKey, StateValue)>> + '_>>;

fn get_latest_internal_indexer_ledger_version(&self) -> Result<Version>;
}

0 comments on commit ba05c27

Please sign in to comment.