Skip to content

Commit

Permalink
Fix progress.
Browse files Browse the repository at this point in the history
  • Loading branch information
grao1991 committed Jul 25, 2024
1 parent 2e83bfa commit 26a2863
Show file tree
Hide file tree
Showing 16 changed files with 164 additions and 90 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 13 additions & 13 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,20 +309,20 @@ impl Context {
pub fn get_latest_internal_indexer_ledger_version_and_main_db_info<E: StdApiError>(
&self,
) -> Result<(LedgerInfo, Version), E> {
if self.indexer_reader.is_none() {
return Err(E::internal_with_code_no_info(
"Indexer reader doesn't exist",
AptosErrorCode::InternalError,
));
if let Some(indexer_reader) = self.indexer_reader.as_ref() {
if let Some(latest_version) = indexer_reader
.get_latest_internal_indexer_ledger_version()
.map_err(|err| E::internal_with_code_no_info(err, AptosErrorCode::InternalError))?
{
let latest_ledger_info = self.get_latest_ledger_info()?;
return Ok((latest_ledger_info, latest_version));
}
}
let latest_version = self
.indexer_reader
.as_ref()
.unwrap()
.get_latest_internal_indexer_ledger_version()
.map_err(|err| E::internal_with_code_no_info(err, AptosErrorCode::InternalError))?;
let latest_ledger_info = self.get_latest_ledger_info()?;
Ok((latest_ledger_info, latest_version))

Err(E::internal_with_code_no_info(
"Indexer reader doesn't exist, or doesn't have data.",
AptosErrorCode::InternalError,
))
}

pub fn get_latest_ledger_info_with_signatures(&self) -> Result<LedgerInfoWithSignatures> {
Expand Down
6 changes: 5 additions & 1 deletion api/src/tests/accounts_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use aptos_api_test_context::{current_function_name, find_value, TestContext};
use aptos_api_types::{MoveModuleBytecode, MoveResource, MoveStructTag, StateKeyWrapper};
use aptos_cached_packages::aptos_stdlib;
use serde_json::json;
use std::str::FromStr;
use std::{str::FromStr, time::Duration};

/* TODO: reactivate once cause of failure for `"8"` vs `8` in the JSON output is known.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand Down Expand Up @@ -114,6 +114,8 @@ async fn test_account_resources_by_ledger_version_with_context(mut context: Test
let txn = context.create_user_account(&account).await;
context.commit_block(&vec![txn.clone()]).await;

tokio::time::sleep(Duration::from_millis(200)).await;

let ledger_version_1_resources = context
.get(&account_resources(
&context.root_account().await.address().to_hex_literal(),
Expand Down Expand Up @@ -181,6 +183,8 @@ async fn test_get_account_modules_by_ledger_version_with_context(mut context: Te
root_account.sign_with_transaction_builder(context.transaction_factory().payload(payload));
context.commit_block(&vec![txn.clone()]).await;

tokio::time::sleep(Duration::from_millis(200)).await;

let modules = context
.get(&account_modules(
&context.root_account().await.address().to_hex_literal(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,49 +77,70 @@ impl InternalIndexerDBService {
}

pub async fn get_start_version(&self, node_config: &NodeConfig) -> Result<Version> {
let indexer_version = self.db_indexer.indexer_db.get_persisted_version()?;
let fast_sync_enabled = node_config
.state_sync
.state_sync_driver
.bootstrapping_mode
.is_fast_sync();
let mut db_min_version = self.db_indexer.get_main_db_lowest_viable_version()?;
let mut main_db_synced_version = self.db_indexer.main_db_reader.get_synced_version()?;

// Wait till fast sync is done
while fast_sync_enabled && main_db_synced_version == 0 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
main_db_synced_version = self.db_indexer.main_db_reader.get_synced_version()?;
}
let fast_sync_version_opt = self

let start_version = self
.db_indexer
.indexer_db
.get_restore_progress(db_min_version)?;
.get_persisted_version()?
.map_or(0, |v| v + 1);

if node_config.indexer_db_config.enable_statekeys() {
let state_start_version = self
.db_indexer
.indexer_db
.get_state_version()?
.map_or(0, |v| v + 1);
if start_version != state_start_version {
panic!("Cannot start state indexer because the progress doesn't match.");
}
}

if node_config.indexer_db_config.enable_statekeys()
&& fast_sync_enabled
&& fast_sync_version_opt.is_none()
{
panic!("Internal indexer db don't have state keys restored. Please run state sync with state keys enabled.");
if node_config.indexer_db_config.enable_transaction() {
let transaction_start_version = self
.db_indexer
.indexer_db
.get_transaction_version()?
.map_or(0, |v| v + 1);
if start_version != transaction_start_version {
panic!("Cannot start transaction indexer because the progress doesn't match.");
}
}

if indexer_version >= db_min_version {
Ok(indexer_version)
} else {
Ok(db_min_version)
if node_config.indexer_db_config.enable_event() {
let event_start_version = self
.db_indexer
.indexer_db
.get_event_version()?
.map_or(0, |v| v + 1);
if start_version != event_start_version {
panic!("Cannot start event indexer because the progress doesn't match.");
}
}

Ok(start_version)
}

pub async fn run(&mut self, node_config: &NodeConfig) -> Result<()> {
let mut start_version = self.get_start_version(node_config).await?;

loop {
let start_time: std::time::Instant = std::time::Instant::now();
let next_version = self.db_indexer.process_a_batch(Some(start_version))?;
let next_version = self.db_indexer.process_a_batch(start_version)?;

if next_version == start_version {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
continue;
}
log_grpc_step(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,10 @@ impl Filterable<Transaction> for APIFilter {
#[cfg(test)]
mod test {
use super::*;
use crate::{
filters::{
event::EventFilterBuilder, move_module::MoveStructTagFilterBuilder,
/*user_transaction::EntryFunctionFilter,*/ TransactionRootFilterBuilder,
UserTransactionFilterBuilder, /*UserTransactionPayloadFilterBuilder,*/
},
/*test_lib::load_graffio_fixture,*/
use crate::filters::{
event::EventFilterBuilder, move_module::MoveStructTagFilterBuilder,
/*user_transaction::EntryFunctionFilter,*/ TransactionRootFilterBuilder,
UserTransactionFilterBuilder, /*UserTransactionPayloadFilterBuilder,*/
};

// Disabled for now while we investigate an issue with lz4 in aptos-core:
Expand Down
11 changes: 6 additions & 5 deletions execution/executor/tests/internal_indexer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,18 @@ fn test_db_indexer_data() {

let db_indexer = DBIndexer::new(internal_indexer_db.clone(), aptos_db.clone());
// assert the data matches the expected data
let mut version = internal_indexer_db.get_persisted_version().unwrap();
assert_eq!(version, 0);
while version < total_version {
version = db_indexer.process_a_batch(Some(version)).unwrap();
let version = internal_indexer_db.get_persisted_version().unwrap();
assert_eq!(version, None);
let mut start_version = version.map_or(0, |v| v + 1);
while start_version < total_version {
start_version = db_indexer.process_a_batch(start_version).unwrap();
}
// wait for the commit to finish
thread::sleep(Duration::from_millis(100));
// indexer has process all the transactions
assert_eq!(
internal_indexer_db.get_persisted_version().unwrap(),
total_version
Some(total_version)
);

let txn_iter = internal_indexer_db
Expand Down
1 change: 0 additions & 1 deletion state-sync/state-sync-driver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ aptos-consensus-notifications = { workspace = true }
aptos-crypto = { workspace = true }
aptos-data-client = { workspace = true }
aptos-data-streaming-service = { workspace = true }
aptos-db-indexer = { workspace = true }
aptos-event-notifications = { workspace = true }
aptos-executor-types = { workspace = true }
aptos-infallible = { workspace = true }
Expand Down
17 changes: 3 additions & 14 deletions storage/aptosdb/src/state_restore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub trait StateValueWriter<K, V>: Send + Sync {
progress: StateSnapshotProgress,
) -> Result<()>;

fn write_usage(&self, version: Version, usage: StateStorageUsage) -> Result<()>;
fn kv_finish(&self, version: Version, usage: StateStorageUsage) -> Result<()>;

fn get_progress(&self, version: Version) -> Result<Option<StateSnapshotProgress>>;
}
Expand Down Expand Up @@ -127,7 +127,7 @@ impl<K: Key + CryptoHash + Eq + Hash, V: Value> StateValueRestore<K, V> {

pub fn finish(self) -> Result<()> {
let progress = self.db.get_progress(self.version)?;
self.db.write_usage(
self.db.kv_finish(
self.version,
progress.map_or(StateStorageUsage::zero(), |p| p.usage),
)
Expand Down Expand Up @@ -277,17 +277,6 @@ impl<K: Key + CryptoHash + Hash + Eq, V: Value> StateSnapshotReceiver<K, V>
}

fn finish_box(self: Box<Self>) -> Result<()> {
match self.restore_mode {
StateSnapshotRestoreMode::KvOnly => self.kv_restore.lock().take().unwrap().finish()?,
StateSnapshotRestoreMode::TreeOnly => {
self.tree_restore.lock().take().unwrap().finish_impl()?
},
StateSnapshotRestoreMode::Default => {
// for tree only mode, we also need to write the usage to DB
self.kv_restore.lock().take().unwrap().finish()?;
self.tree_restore.lock().take().unwrap().finish_impl()?
},
}
Ok(())
self.finish()
}
}
2 changes: 1 addition & 1 deletion storage/aptosdb/src/state_restore/restore_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ where
Ok(())
}

fn write_usage(&self, version: Version, usage: StateStorageUsage) -> Result<()> {
fn kv_finish(&self, version: Version, usage: StateStorageUsage) -> Result<()> {
self.usage_store.write().insert(version, usage);
Ok(())
}
Expand Down
41 changes: 38 additions & 3 deletions storage/aptosdb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ use aptos_crypto::{
HashValue,
};
use aptos_db_indexer::db_indexer::InternalIndexerDB;
use aptos_db_indexer_schemas::metadata::StateSnapshotProgress;
use aptos_db_indexer_schemas::{
metadata::{MetadataKey, MetadataValue, StateSnapshotProgress},
schema::indexer_metadata::InternalIndexerMetadataSchema,
};
use aptos_executor::components::in_memory_state_calculator_v2::InMemoryStateCalculatorV2;
use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER;
use aptos_infallible::Mutex;
Expand Down Expand Up @@ -1189,8 +1192,40 @@ impl StateValueWriter<StateKey, StateValue> for StateStore {
.commit(version, batch, sharded_schema_batch)
}

fn write_usage(&self, version: Version, usage: StateStorageUsage) -> Result<()> {
self.ledger_db.metadata_db().put_usage(version, usage)
fn kv_finish(&self, version: Version, usage: StateStorageUsage) -> Result<()> {
self.ledger_db.metadata_db().put_usage(version, usage)?;
if let Some(internal_indexer_db) = self.internal_indexer_db.as_ref() {
if version > 0 {
let batch = SchemaBatch::new();
batch.put::<InternalIndexerMetadataSchema>(
&MetadataKey::LatestVersion,
&MetadataValue::Version(version - 1),
)?;
if internal_indexer_db.statekeys_enabled() {
batch.put::<InternalIndexerMetadataSchema>(
&MetadataKey::StateVersion,
&MetadataValue::Version(version - 1),
)?;
}
if internal_indexer_db.transaction_enabled() {
batch.put::<InternalIndexerMetadataSchema>(
&MetadataKey::TransactionVersion,
&MetadataValue::Version(version - 1),
)?;
}
if internal_indexer_db.event_enabled() {
batch.put::<InternalIndexerMetadataSchema>(
&MetadataKey::EventVersion,
&MetadataValue::Version(version - 1),
)?;
}
internal_indexer_db
.get_inner_db_ref()
.write_schemas(batch)?;
}
}

Ok(())
}

fn get_progress(&self, version: Version) -> Result<Option<StateSnapshotProgress>> {
Expand Down
2 changes: 1 addition & 1 deletion storage/backup/backup-cli/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl StateValueWriter<StateKey, StateValue> for MockStore {
Ok(())
}

fn write_usage(&self, _version: Version, _usage: StateStorageUsage) -> Result<()> {
fn kv_finish(&self, _version: Version, _usage: StateStorageUsage) -> Result<()> {
Ok(())
}

Expand Down
Loading

0 comments on commit 26a2863

Please sign in to comment.