Skip to content

Commit

Permalink
[sharded] use internal indexer ledger version in API context
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed Jul 5, 2024
1 parent 00f5930 commit ad2a40a
Show file tree
Hide file tree
Showing 13 changed files with 206 additions and 52 deletions.
79 changes: 68 additions & 11 deletions api/src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use crate::{
failpoint::fail_point_poem,
page::determine_limit,
response::{
account_not_found, resource_not_found, struct_field_not_found, BadRequestError,
BasicErrorWith404, BasicResponse, BasicResponseStatus, BasicResultWith404, InternalError,
account_not_found, resource_not_found, struct_field_not_found, version_not_found,
version_pruned, BadRequestError, BasicErrorWith404, BasicResponse, BasicResponseStatus,
BasicResultWith404, InternalError, NotFoundError,
},
ApiTags,
};
Expand All @@ -22,6 +23,7 @@ use aptos_types::{
account_config::{AccountResource, ObjectGroupResource},
event::{EventHandle, EventKey},
state_store::state_key::StateKey,
transaction::Version,
};
use move_core_types::{
identifier::Identifier, language_storage::StructTag, move_resource::MoveStructType,
Expand Down Expand Up @@ -66,7 +68,7 @@ impl AccountsApi {

let context = self.context.clone();
api_spawn_blocking(move || {
let account = Account::new(context, address.0, ledger_version.0, None, None)?;
let account = Account::new(context, address.0, ledger_version.0, None, None, true)?;
account.account(&accept_type)
})
.await
Expand Down Expand Up @@ -118,7 +120,19 @@ impl AccountsApi {
ledger_version.0,
start.0.map(StateKey::from),
limit.0,
false,
)?;
if let Some(version) = ledger_version.0 {
if account.ledger_version < Version::from(version) {
return Err(BasicErrorWith404::not_found_with_code_no_info(
format!(
"Request version {} larger than latest indexer version {}",
version, account.ledger_version
),
AptosErrorCode::VersionNotFound,
));
}
}
account.resources(&accept_type)
})
.await
Expand Down Expand Up @@ -170,6 +184,7 @@ impl AccountsApi {
ledger_version.0,
start.0.map(StateKey::from),
limit.0,
false,
)?;
account.modules(&accept_type)
})
Expand All @@ -193,25 +208,67 @@ pub struct Account {
}

impl Account {
/// Creates a new account struct and determines the current ledger info, and determines the
/// ledger version to query
pub fn new(
context: Arc<Context>,
address: Address,
requested_ledger_version: Option<U64>,
start: Option<StateKey>,
limit: Option<u16>,
no_internal_indexer_required: bool,
) -> Result<Self, BasicErrorWith404> {
// Use the latest ledger version, or the requested associated version
let (latest_ledger_info, requested_ledger_version) = context
.get_latest_ledger_info_and_verify_lookup_version(
requested_ledger_version.map(|inner| inner.0),
)?;
let sharding_enabled = context
.node_config
.storage
.rocksdb_configs
.enable_storage_sharding;
let internal_indexer_enabled = context
.node_config
.indexer_db_config
.is_internal_indexer_db_enabled();

let (latest_ledger_info, requested_version) =
if sharding_enabled && !no_internal_indexer_required {
if !internal_indexer_enabled {
return Err(BasicErrorWith404::internal_with_code_no_info(
"Internal indexer is not enabled",
AptosErrorCode::InternalError,
));
}
let (latest_internal_indexer_ledger_info, latest_internal_indexer_ledger_version) =
context.get_latest_internal_indexer_ledger_version_and_info()?;
if let Some(version) = requested_ledger_version {
let request_ledger_version = Version::from(version);
if latest_internal_indexer_ledger_version < request_ledger_version {
return Err(version_not_found(
request_ledger_version,
&latest_internal_indexer_ledger_info,
));
} else if request_ledger_version
< latest_internal_indexer_ledger_info.oldest_ledger_version.0
{
return Err(version_pruned(
request_ledger_version,
&latest_internal_indexer_ledger_info,
));
}
(latest_internal_indexer_ledger_info, request_ledger_version)
} else {
(
latest_internal_indexer_ledger_info,
latest_internal_indexer_ledger_version,
)
}
} else {
// Use the latest ledger version, or the requested associated version
context.get_latest_ledger_info_and_verify_lookup_version(
requested_ledger_version.map(|inner| inner.0),
)?
};

Ok(Self {
context,
address,
ledger_version: requested_ledger_version,
ledger_version: requested_version,
start,
limit,
latest_ledger_info,
Expand Down
20 changes: 20 additions & 0 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,26 @@ impl Context {
Ok((latest_ledger_info, requested_ledger_version))
}

pub fn get_latest_internal_indexer_ledger_version_and_info<E: StdApiError>(
&self,
) -> Result<(LedgerInfo, Version), E> {
if self.indexer_reader.is_none() {
return Err(E::internal_with_code_no_info(
"Indexer reader doesn't exist",
AptosErrorCode::InternalError,
));
}
let latest_version = self
.indexer_reader
.as_ref()
.unwrap()
.get_latest_internal_indexer_ledger_version()
.map_err(|err| E::internal_with_code_no_info(err, AptosErrorCode::InternalError))?;
let mut latest_ledger_info = self.get_latest_ledger_info()?;
latest_ledger_info.ledger_version = aptos_api_types::U64(latest_version);
Ok((latest_ledger_info, latest_version))
}

pub fn get_latest_ledger_info_with_signatures(&self) -> Result<LedgerInfoWithSignatures> {
Ok(self.db.get_latest_ledger_info()?)
}
Expand Down
4 changes: 2 additions & 2 deletions api/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl EventsApi {
// Ensure that account exists
let api = self.clone();
api_spawn_blocking(move || {
let account = Account::new(api.context.clone(), address.0, None, None, None)?;
let account = Account::new(api.context.clone(), address.0, None, None, None, false)?;
account.verify_account_or_object_resource()?;
api.list(
account.latest_ledger_info,
Expand Down Expand Up @@ -144,7 +144,7 @@ impl EventsApi {

let api = self.clone();
api_spawn_blocking(move || {
let account = Account::new(api.context.clone(), address.0, None, None, None)?;
let account = Account::new(api.context.clone(), address.0, None, None, None, false)?;
let key = account.find_event_key(event_handle.0, field_name.0.into())?;
api.list(account.latest_ledger_info, accept_type, page, key)
})
Expand Down
3 changes: 2 additions & 1 deletion api/src/tests/accounts_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,11 @@ async fn test_get_account_resources_by_ledger_version() {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_get_account_resources_by_too_large_ledger_version() {
let mut context = new_test_context(current_function_name!());
let account = context.root_account().await;
let resp = context
.expect_status_code(404)
.get(&account_resources_with_ledger_version(
&context.root_account().await.address().to_hex_literal(),
&account.address().to_hex_literal(),
1000000000000000000,
))
.await;
Expand Down
2 changes: 1 addition & 1 deletion api/src/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ impl TransactionsApi {
address: Address,
) -> BasicResultWith404<Vec<Transaction>> {
// Verify the account exists
let account = Account::new(self.context.clone(), address, None, None, None)?;
let account = Account::new(self.context.clone(), address, None, None, None, false)?;
account.get_account_resource()?;

let latest_ledger_info = account.latest_ledger_info;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use aptos_db_indexer::{
use aptos_indexer_grpc_utils::counters::{log_grpc_step, IndexerGrpcStep};
use aptos_schemadb::DB;
use aptos_storage_interface::DbReader;
use aptos_types::indexer::indexer_db_reader::IndexerReader;
use aptos_types::{indexer::indexer_db_reader::IndexerReader, transaction::Version};
use std::sync::Arc;
use tokio::runtime::Handle;

Expand Down Expand Up @@ -59,8 +59,53 @@ impl InternalIndexerDBService {
Arc::clone(&self.db_indexer)
}

pub async fn run(&mut self) {
let mut start_version = self.db_indexer.get_persisted_version().unwrap_or(0);
pub fn get_start_version(&self, node_config: &NodeConfig) -> Version {
let indexer_version = self
.db_indexer
.get_persisted_version()
.unwrap_or_else(|err| {
panic!(
"Failed to get persisted version from internal indexer db: {}",
err
)
});
let fast_sync_enabled = node_config
.state_sync
.state_sync_driver
.bootstrapping_mode
.is_fast_sync();
let mut db_min_version = self
.db_indexer
.get_main_db_lowest_viable_version()
.unwrap_or_else(|err| panic!("Failed to get main db min viable version: {}", err));

// Wait till fast sync is done
while fast_sync_enabled && db_min_version == 0 {
db_min_version = self
.db_indexer
.get_main_db_lowest_viable_version()
.unwrap_or_else(|err| panic!("Failed to get main db min viable version: {}", err));
std::thread::sleep(std::time::Duration::from_millis(100));
}
let fast_sync_version_opt: Option<u64> = self.db_indexer.get_fast_sync_version();
if indexer_version >= db_min_version {
indexer_version
} else if node_config.indexer_db_config.enable_statekeys() {
// if fast sync fill to the min viable version it is Ok, otherwise panic
if let Some(fast_sync_version) = fast_sync_version_opt {
if fast_sync_version >= db_min_version {
return db_min_version;
}
}
panic!("Internal indexer version is lower than main db min viable version. Internal state keys indexer cannot catch up");
} else {
db_min_version
}
}

pub async fn run(&mut self, node_config: &NodeConfig) {
let mut start_version = self.get_start_version(node_config);

loop {
let start_time: std::time::Instant = std::time::Instant::now();
let next_version = self
Expand Down Expand Up @@ -111,8 +156,9 @@ impl MockInternalIndexerDBService {
let mut internal_indexer_db_service =
InternalIndexerDBService::new(db_reader, node_config, db);
let db_indexer = internal_indexer_db_service.get_db_indexer();
let config_clone = node_config.to_owned();
handle.spawn(async move {
internal_indexer_db_service.run().await;
internal_indexer_db_service.run(&config_clone).await;
});
Self {
indexer_readers: IndexerReaders::new(None, Some(db_indexer)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ pub fn bootstrap_internal_indexer_db(
InternalIndexerDBService::new(db_rw.reader, config, internal_indexer_db.unwrap());
let db_indexer = indexer_service.get_db_indexer();
// Spawn task for db indexer
let config_clone = config.to_owned();
runtime.spawn(async move {
indexer_service.run().await;
indexer_service.run(&config_clone).await;
});

Some((runtime, db_indexer))
Expand Down
21 changes: 18 additions & 3 deletions state-sync/state-sync-driver/src/storage_synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use crate::{
};
use aptos_config::config::StateSyncDriverConfig;
use aptos_data_streaming_service::data_notification::NotificationId;
use aptos_db_indexer_schemas::schema::state_keys::StateKeysSchema;
use aptos_db_indexer_schemas::{
metadata::{MetadataKey, MetadataValue},
schema::{indexer_metadata::InternalIndexerMetadataSchema, state_keys::StateKeysSchema},
};
use aptos_event_notifications::EventSubscriptionService;
use aptos_executor_types::{ChunkCommitNotification, ChunkExecutorTrait};
use aptos_infallible::Mutex;
Expand Down Expand Up @@ -799,13 +802,21 @@ fn spawn_commit_post_processor<
fn write_kv_to_indexer_db(
internal_indexer_db: &Option<Arc<DB>>,
kvs: &Vec<(StateKey, StateValue)>,
snapshot_version: Version,
write_version: bool,
) -> aptos_storage_interface::Result<()> {
// add state value to internal indexer
if let Some(indexer_db) = internal_indexer_db.as_ref() {
let batch = SchemaBatch::new();
for (state_key, _) in kvs {
batch.put::<StateKeysSchema>(state_key, &())?;
}
if write_version {
batch.put::<InternalIndexerMetadataSchema>(
&MetadataKey::FastSyncVersion,
&MetadataValue::Version(snapshot_version),
)?;
}
indexer_db.write_schemas(batch)?;
}
Ok(())
Expand Down Expand Up @@ -861,8 +872,12 @@ fn spawn_state_snapshot_receiver<
let all_states_synced = states_with_proof.is_last_chunk();
let last_committed_state_index = states_with_proof.last_index;
let num_state_values = states_with_proof.raw_values.len();
let indexer_results: Result<(), AptosDbError> =
write_kv_to_indexer_db(&internal_indexer_db, &states_with_proof.raw_values);
let indexer_results: Result<(), AptosDbError> = write_kv_to_indexer_db(
&internal_indexer_db,
&states_with_proof.raw_values,
version,
all_states_synced,
);

let result = state_snapshot_receiver.add_chunk(
states_with_proof.raw_values,
Expand Down
22 changes: 22 additions & 0 deletions storage/indexer/src/db_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ impl DBIndexer {
}
}

pub fn get_main_db_lowest_viable_version(&self) -> Result<Version> {
self.main_db_reader
.get_first_txn_version()
.transpose()
.expect("main db lowest viable version doesn't exist")
}

pub fn ensure_cover_ledger_version(&self, ledger_version: Version) -> Result<()> {
let indexer_latest_version = self.get_persisted_version()?;
ensure!(
Expand Down Expand Up @@ -458,4 +465,19 @@ impl DBIndexer {
ledger_version,
)
}

pub fn get_fast_sync_version(&self) -> Option<Version> {
// read the latest key from the db
match self
.db
.get::<InternalIndexerMetadataSchema>(&MetadataKey::FastSyncVersion)
{
Ok(Some(metavalue)) => Some(metavalue.expect_version()),
Ok(None) => None,
Err(e) => panic!(
"Failed to get fast sync version from internal indexer db: {}",
e
),
}
}
}
7 changes: 7 additions & 0 deletions storage/indexer/src/indexer_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ impl IndexerReader for IndexerReaders {
anyhow::bail!("Table info reader is not available")
}

fn get_latest_internal_indexer_ledger_version(&self) -> anyhow::Result<Version> {
if let Some(db_indexer) = &self.db_indexer_reader {
return Ok(db_indexer.get_persisted_version()?);
}
anyhow::bail!("DB indexer reader is not available")
}

fn get_events(
&self,
event_key: &EventKey,
Expand Down
3 changes: 2 additions & 1 deletion storage/indexer_schemas/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ impl MetadataValue {
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize, Hash, PartialOrd, Ord)]
#[cfg_attr(any(test, feature = "fuzzing"), derive(proptest_derive::Arbitrary))]
pub enum MetadataKey {
LatestVersion,
FastSyncVersion,
}
Loading

0 comments on commit ad2a40a

Please sign in to comment.