Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[sharded] use internal indexer ledger version in API context
Browse files Browse the repository at this point in the history
areshand committed Jul 5, 2024
1 parent 00f5930 commit cfc622e
Showing 12 changed files with 198 additions and 44 deletions.
63 changes: 59 additions & 4 deletions api/src/accounts.rs
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ use aptos_types::{
account_config::{AccountResource, ObjectGroupResource},
event::{EventHandle, EventKey},
state_store::state_key::StateKey,
transaction::Version,
};
use move_core_types::{
identifier::Identifier, language_storage::StructTag, move_resource::MoveStructType,
@@ -112,13 +113,20 @@ impl AccountsApi {

let context = self.context.clone();
api_spawn_blocking(move || {
let account = Account::new(
let account = Account::new_with_internal_indexer(
context,
address.0,
ledger_version.0,
start.0.map(StateKey::from),
limit.0,
)?;
if let Some(version) = ledger_version.0 {
if account.ledger_version < Version::from(version) {
return Err(BasicErrorWith404::bad_request_with_code_no_info(
"Leger version too new",
AptosErrorCode::VersionNotFound,
));
}
}
account.resources(&accept_type)
})
.await
@@ -164,13 +172,21 @@ impl AccountsApi {

let context = self.context.clone();
api_spawn_blocking(move || {
let account = Account::new(
let account = Account::new_with_internal_indexer(
context,
address.0,
ledger_version.0,
start.0.map(StateKey::from),
limit.0,
)?;

if let Some(version) = ledger_version.0 {
if account.ledger_version < Version::from(version) {
return Err(BasicErrorWith404::bad_request_with_code_no_info(
"Leger version too new",
AptosErrorCode::VersionNotFound,
));
}
}
account.modules(&accept_type)
})
.await
@@ -218,6 +234,45 @@ impl Account {
})
}

// Return the latest ledger info from internal indexer
pub fn new_with_internal_indexer(
context: Arc<Context>,
address: Address,
start: Option<StateKey>,
limit: Option<u16>,
) -> Result<Self, BasicErrorWith404> {
let sharding_enabled = context
.node_config
.storage
.rocksdb_configs
.enable_storage_sharding;
let internal_indexer_enabled = context
.node_config
.indexer_db_config
.is_internal_indexer_db_enabled();

if sharding_enabled {
if !internal_indexer_enabled {
return Err(BasicErrorWith404::internal_with_code_no_info(
"Internal indexer is not enabled",
AptosErrorCode::InternalError,
));
}
let (latest_internal_indexer_ledger_info, latest_internal_indexer_ledger_version) =
context.get_latest_internal_indexer_ledger_version_and_info()?;
Ok(Self {
context,
address,
ledger_version: latest_internal_indexer_ledger_version,
start,
limit,
latest_ledger_info: latest_internal_indexer_ledger_info,
})
} else {
Account::new(context, address, None, start, limit)
}
}

// These functions map directly to endpoint functions.

/// Retrieves the [`AccountData`] for the associated account
20 changes: 20 additions & 0 deletions api/src/context.rs
Original file line number Diff line number Diff line change
@@ -277,6 +277,26 @@ impl Context {
Ok((latest_ledger_info, requested_ledger_version))
}

pub fn get_latest_internal_indexer_ledger_version_and_info<E: StdApiError>(
&self,
) -> Result<(LedgerInfo, Version), E> {
if self.indexer_reader.is_none() {
return Err(E::internal_with_code_no_info(
"Indexer reader doesn't exist",
AptosErrorCode::InternalError,
));
}
let latest_version = self
.indexer_reader
.as_ref()
.unwrap()
.get_latest_internal_indexer_ledger_version()
.map_err(|err| E::internal_with_code_no_info(err, AptosErrorCode::InternalError))?;
let mut latest_ledger_info = self.get_latest_ledger_info()?;
latest_ledger_info.ledger_version = aptos_api_types::U64(latest_version);
Ok((latest_ledger_info, latest_version))
}

pub fn get_latest_ledger_info_with_signatures(&self) -> Result<LedgerInfoWithSignatures> {
Ok(self.db.get_latest_ledger_info()?)
}
6 changes: 4 additions & 2 deletions api/src/events.rs
Original file line number Diff line number Diff line change
@@ -77,7 +77,8 @@ impl EventsApi {
// Ensure that account exists
let api = self.clone();
api_spawn_blocking(move || {
let account = Account::new(api.context.clone(), address.0, None, None, None)?;
let account =
Account::new_with_internal_indexer(api.context.clone(), address.0, None, None)?;
account.verify_account_or_object_resource()?;
api.list(
account.latest_ledger_info,
@@ -144,7 +145,8 @@ impl EventsApi {

let api = self.clone();
api_spawn_blocking(move || {
let account = Account::new(api.context.clone(), address.0, None, None, None)?;
let account =
Account::new_with_internal_indexer(api.context.clone(), address.0, None, None)?;
let key = account.find_event_key(event_handle.0, field_name.0.into())?;
api.list(account.latest_ledger_info, accept_type, page, key)
})
3 changes: 2 additions & 1 deletion api/src/transactions.rs
Original file line number Diff line number Diff line change
@@ -986,7 +986,8 @@ impl TransactionsApi {
address: Address,
) -> BasicResultWith404<Vec<Transaction>> {
// Verify the account exists
let account = Account::new(self.context.clone(), address, None, None, None)?;
let account =
Account::new_with_internal_indexer(self.context.clone(), address, None, None)?;
account.get_account_resource()?;

let latest_ledger_info = account.latest_ledger_info;
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ use aptos_db_indexer::{
use aptos_indexer_grpc_utils::counters::{log_grpc_step, IndexerGrpcStep};
use aptos_schemadb::DB;
use aptos_storage_interface::DbReader;
use aptos_types::indexer::indexer_db_reader::IndexerReader;
use aptos_types::{indexer::indexer_db_reader::IndexerReader, transaction::Version};
use std::sync::Arc;
use tokio::runtime::Handle;

@@ -59,8 +59,53 @@ impl InternalIndexerDBService {
Arc::clone(&self.db_indexer)
}

pub async fn run(&mut self) {
let mut start_version = self.db_indexer.get_persisted_version().unwrap_or(0);
pub fn get_start_version(&self, node_config: &NodeConfig) -> Version {
let indexer_version = self
.db_indexer
.get_persisted_version()
.unwrap_or_else(|err| {
panic!(
"Failed to get persisted version from internal indexer db: {}",
err
)
});
let fast_sync_enabled = node_config
.state_sync
.state_sync_driver
.bootstrapping_mode
.is_fast_sync();
let mut db_min_version = self
.db_indexer
.get_main_db_lowest_viable_version()
.unwrap_or_else(|err| panic!("Failed to get main db min viable version: {}", err));

// Wait till fast sync is done
while fast_sync_enabled && db_min_version == 0 {
db_min_version = self
.db_indexer
.get_main_db_lowest_viable_version()
.unwrap_or_else(|err| panic!("Failed to get main db min viable version: {}", err));
std::thread::sleep(std::time::Duration::from_millis(100));
}
let fast_sync_version_opt: Option<u64> = self.db_indexer.get_fast_sync_version();
if indexer_version >= db_min_version {
indexer_version
} else if node_config.indexer_db_config.enable_statekeys() {
// if fast sync fill to the min viable version it is Ok, otherwise panic
if let Some(fast_sync_version) = fast_sync_version_opt {
if fast_sync_version >= db_min_version {
return db_min_version;
}
}
panic!("Internal indexer version is lower than main db min viable version. Internal state keys indexer cannot catch up");
} else {
db_min_version
}
}

pub async fn run(&mut self, node_config: &NodeConfig) {
let mut start_version = self.get_start_version(node_config);

loop {
let start_time: std::time::Instant = std::time::Instant::now();
let next_version = self
@@ -111,8 +156,9 @@ impl MockInternalIndexerDBService {
let mut internal_indexer_db_service =
InternalIndexerDBService::new(db_reader, node_config, db);
let db_indexer = internal_indexer_db_service.get_db_indexer();
let config_clone = node_config.to_owned();
handle.spawn(async move {
internal_indexer_db_service.run().await;
internal_indexer_db_service.run(&config_clone).await;
});
Self {
indexer_readers: IndexerReaders::new(None, Some(db_indexer)),
Original file line number Diff line number Diff line change
@@ -30,8 +30,9 @@ pub fn bootstrap_internal_indexer_db(
InternalIndexerDBService::new(db_rw.reader, config, internal_indexer_db.unwrap());
let db_indexer = indexer_service.get_db_indexer();
// Spawn task for db indexer
let config_clone = config.to_owned();
runtime.spawn(async move {
indexer_service.run().await;
indexer_service.run(&config_clone).await;
});

Some((runtime, db_indexer))
21 changes: 18 additions & 3 deletions state-sync/state-sync-driver/src/storage_synchronizer.rs
Original file line number Diff line number Diff line change
@@ -15,7 +15,10 @@ use crate::{
};
use aptos_config::config::StateSyncDriverConfig;
use aptos_data_streaming_service::data_notification::NotificationId;
use aptos_db_indexer_schemas::schema::state_keys::StateKeysSchema;
use aptos_db_indexer_schemas::{
metadata::{MetadataKey, MetadataValue},
schema::{indexer_metadata::InternalIndexerMetadataSchema, state_keys::StateKeysSchema},
};
use aptos_event_notifications::EventSubscriptionService;
use aptos_executor_types::{ChunkCommitNotification, ChunkExecutorTrait};
use aptos_infallible::Mutex;
@@ -799,13 +802,21 @@ fn spawn_commit_post_processor<
fn write_kv_to_indexer_db(
internal_indexer_db: &Option<Arc<DB>>,
kvs: &Vec<(StateKey, StateValue)>,
snapshot_version: Version,
write_version: bool,
) -> aptos_storage_interface::Result<()> {
// add state value to internal indexer
if let Some(indexer_db) = internal_indexer_db.as_ref() {
let batch = SchemaBatch::new();
for (state_key, _) in kvs {
batch.put::<StateKeysSchema>(state_key, &())?;
}
if write_version {
batch.put::<InternalIndexerMetadataSchema>(
&MetadataKey::FastSyncVersion,
&MetadataValue::Version(snapshot_version),
)?;
}
indexer_db.write_schemas(batch)?;
}
Ok(())
@@ -861,8 +872,12 @@ fn spawn_state_snapshot_receiver<
let all_states_synced = states_with_proof.is_last_chunk();
let last_committed_state_index = states_with_proof.last_index;
let num_state_values = states_with_proof.raw_values.len();
let indexer_results: Result<(), AptosDbError> =
write_kv_to_indexer_db(&internal_indexer_db, &states_with_proof.raw_values);
let indexer_results: Result<(), AptosDbError> = write_kv_to_indexer_db(
&internal_indexer_db,
&states_with_proof.raw_values,
version,
all_states_synced,
);

let result = state_snapshot_receiver.add_chunk(
states_with_proof.raw_values,
22 changes: 22 additions & 0 deletions storage/indexer/src/db_indexer.rs
Original file line number Diff line number Diff line change
@@ -111,6 +111,13 @@ impl DBIndexer {
}
}

pub fn get_main_db_lowest_viable_version(&self) -> Result<Version> {
self.main_db_reader
.get_first_txn_version()
.transpose()
.expect("main db lowest viable version doesn't exist")
}

pub fn ensure_cover_ledger_version(&self, ledger_version: Version) -> Result<()> {
let indexer_latest_version = self.get_persisted_version()?;
ensure!(
@@ -458,4 +465,19 @@ impl DBIndexer {
ledger_version,
)
}

pub fn get_fast_sync_version(&self) -> Option<Version> {
// read the latest key from the db
match self
.db
.get::<InternalIndexerMetadataSchema>(&MetadataKey::FastSyncVersion)
{
Ok(Some(metavalue)) => Some(metavalue.expect_version()),
Ok(None) => None,
Err(e) => panic!(
"Failed to get fast sync version from internal indexer db: {}",
e
),
}
}
}
7 changes: 7 additions & 0 deletions storage/indexer/src/indexer_reader.rs
Original file line number Diff line number Diff line change
@@ -47,6 +47,13 @@ impl IndexerReader for IndexerReaders {
anyhow::bail!("Table info reader is not available")
}

fn get_latest_internal_indexer_ledger_version(&self) -> anyhow::Result<Version> {
if let Some(db_indexer) = &self.db_indexer_reader {
return Ok(db_indexer.get_persisted_version()?);
}
anyhow::bail!("DB indexer reader is not available")
}

fn get_events(
&self,
event_key: &EventKey,
3 changes: 2 additions & 1 deletion storage/indexer_schemas/src/metadata.rs
Original file line number Diff line number Diff line change
@@ -18,8 +18,9 @@ impl MetadataValue {
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize, Hash, PartialOrd, Ord)]
#[cfg_attr(any(test, feature = "fuzzing"), derive(proptest_derive::Arbitrary))]
pub enum MetadataKey {
LatestVersion,
FastSyncVersion,
}
Loading

0 comments on commit cfc622e

Please sign in to comment.