Skip to content

Commit

Permalink
Fix progress.
Browse files Browse the repository at this point in the history
  • Loading branch information
grao1991 committed Jul 24, 2024
1 parent 2e83bfa commit 441c7e9
Show file tree
Hide file tree
Showing 12 changed files with 158 additions and 86 deletions.
26 changes: 13 additions & 13 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,20 +309,20 @@ impl Context {
pub fn get_latest_internal_indexer_ledger_version_and_main_db_info<E: StdApiError>(
&self,
) -> Result<(LedgerInfo, Version), E> {
if self.indexer_reader.is_none() {
return Err(E::internal_with_code_no_info(
"Indexer reader doesn't exist",
AptosErrorCode::InternalError,
));
if let Some(indexer_reader) = self.indexer_reader.as_ref() {
if let Some(latest_version) = indexer_reader
.get_latest_internal_indexer_ledger_version()
.map_err(|err| E::internal_with_code_no_info(err, AptosErrorCode::InternalError))?
{
let latest_ledger_info = self.get_latest_ledger_info()?;
return Ok((latest_ledger_info, latest_version));
}
}
let latest_version = self
.indexer_reader
.as_ref()
.unwrap()
.get_latest_internal_indexer_ledger_version()
.map_err(|err| E::internal_with_code_no_info(err, AptosErrorCode::InternalError))?;
let latest_ledger_info = self.get_latest_ledger_info()?;
Ok((latest_ledger_info, latest_version))

Err(E::internal_with_code_no_info(
"Indexer reader doesn't exist, or doesn't have data.",
AptosErrorCode::InternalError,
))
}

pub fn get_latest_ledger_info_with_signatures(&self) -> Result<LedgerInfoWithSignatures> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,49 +77,70 @@ impl InternalIndexerDBService {
}

pub async fn get_start_version(&self, node_config: &NodeConfig) -> Result<Version> {
let indexer_version = self.db_indexer.indexer_db.get_persisted_version()?;
let fast_sync_enabled = node_config
.state_sync
.state_sync_driver
.bootstrapping_mode
.is_fast_sync();
let mut db_min_version = self.db_indexer.get_main_db_lowest_viable_version()?;
let mut main_db_synced_version = self.db_indexer.main_db_reader.get_synced_version()?;

// Wait till fast sync is done
while fast_sync_enabled && main_db_synced_version == 0 {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
main_db_synced_version = self.db_indexer.main_db_reader.get_synced_version()?;
}
let fast_sync_version_opt = self

let start_version = self
.db_indexer
.indexer_db
.get_restore_progress(db_min_version)?;
.get_persisted_version()?
.map_or(0, |v| v + 1);

if node_config.indexer_db_config.enable_statekeys() {
let state_start_version = self
.db_indexer
.indexer_db
.get_state_version()?
.map_or(0, |v| v + 1);
if start_version != state_start_version {
panic!("Cannot start state indexer because the progress doesn't match.");
}
}

if node_config.indexer_db_config.enable_statekeys()
&& fast_sync_enabled
&& fast_sync_version_opt.is_none()
{
panic!("Internal indexer db don't have state keys restored. Please run state sync with state keys enabled.");
if node_config.indexer_db_config.enable_transaction() {
let transaction_start_version = self
.db_indexer
.indexer_db
.get_transaction_version()?
.map_or(0, |v| v + 1);
if start_version != transaction_start_version {
panic!("Cannot start transaction indexer because the progress doesn't match.");
}
}

if indexer_version >= db_min_version {
Ok(indexer_version)
} else {
Ok(db_min_version)
if node_config.indexer_db_config.enable_event() {
let event_start_version = self
.db_indexer
.indexer_db
.get_event_version()?
.map_or(0, |v| v + 1);
if start_version != event_start_version {
panic!("Cannot start event indexer because the progress doesn't match.");
}
}

Ok(start_version)
}

pub async fn run(&mut self, node_config: &NodeConfig) -> Result<()> {
let mut start_version = self.get_start_version(node_config).await?;

loop {
let start_time: std::time::Instant = std::time::Instant::now();
let next_version = self.db_indexer.process_a_batch(Some(start_version))?;
let next_version = self.db_indexer.process_a_batch(start_version)?;

if next_version == start_version {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
continue;
}
log_grpc_step(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,10 @@ impl Filterable<Transaction> for APIFilter {
#[cfg(test)]
mod test {
use super::*;
use crate::{
filters::{
event::EventFilterBuilder, move_module::MoveStructTagFilterBuilder,
/*user_transaction::EntryFunctionFilter,*/ TransactionRootFilterBuilder,
UserTransactionFilterBuilder, /*UserTransactionPayloadFilterBuilder,*/
},
/*test_lib::load_graffio_fixture,*/
use crate::filters::{
event::EventFilterBuilder, move_module::MoveStructTagFilterBuilder,
/*user_transaction::EntryFunctionFilter,*/ TransactionRootFilterBuilder,
UserTransactionFilterBuilder, /*UserTransactionPayloadFilterBuilder,*/
};

// Disabled for now while we investigate an issue with lz4 in aptos-core:
Expand Down
11 changes: 6 additions & 5 deletions execution/executor/tests/internal_indexer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,18 @@ fn test_db_indexer_data() {

let db_indexer = DBIndexer::new(internal_indexer_db.clone(), aptos_db.clone());
// assert the data matches the expected data
let mut version = internal_indexer_db.get_persisted_version().unwrap();
assert_eq!(version, 0);
while version < total_version {
version = db_indexer.process_a_batch(Some(version)).unwrap();
let version = internal_indexer_db.get_persisted_version().unwrap();
assert_eq!(version, None);
let mut start_version = version.map_or(0, |v| v + 1);
while start_version < total_version {
start_version = db_indexer.process_a_batch(start_version).unwrap();
}
// wait for the commit to finish
thread::sleep(Duration::from_millis(100));
// indexer has process all the transactions
assert_eq!(
internal_indexer_db.get_persisted_version().unwrap(),
total_version
Some(total_version)
);

let txn_iter = internal_indexer_db
Expand Down
17 changes: 3 additions & 14 deletions storage/aptosdb/src/state_restore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub trait StateValueWriter<K, V>: Send + Sync {
progress: StateSnapshotProgress,
) -> Result<()>;

fn write_usage(&self, version: Version, usage: StateStorageUsage) -> Result<()>;
fn kv_finish(&self, version: Version, usage: StateStorageUsage) -> Result<()>;

fn get_progress(&self, version: Version) -> Result<Option<StateSnapshotProgress>>;
}
Expand Down Expand Up @@ -127,7 +127,7 @@ impl<K: Key + CryptoHash + Eq + Hash, V: Value> StateValueRestore<K, V> {

pub fn finish(self) -> Result<()> {
let progress = self.db.get_progress(self.version)?;
self.db.write_usage(
self.db.kv_finish(
self.version,
progress.map_or(StateStorageUsage::zero(), |p| p.usage),
)
Expand Down Expand Up @@ -277,17 +277,6 @@ impl<K: Key + CryptoHash + Hash + Eq, V: Value> StateSnapshotReceiver<K, V>
}

fn finish_box(self: Box<Self>) -> Result<()> {
match self.restore_mode {
StateSnapshotRestoreMode::KvOnly => self.kv_restore.lock().take().unwrap().finish()?,
StateSnapshotRestoreMode::TreeOnly => {
self.tree_restore.lock().take().unwrap().finish_impl()?
},
StateSnapshotRestoreMode::Default => {
// for tree only mode, we also need to write the usage to DB
self.kv_restore.lock().take().unwrap().finish()?;
self.tree_restore.lock().take().unwrap().finish_impl()?
},
}
Ok(())
self.finish()
}
}
2 changes: 1 addition & 1 deletion storage/aptosdb/src/state_restore/restore_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ where
Ok(())
}

fn write_usage(&self, version: Version, usage: StateStorageUsage) -> Result<()> {
fn kv_finish(&self, version: Version, usage: StateStorageUsage) -> Result<()> {
self.usage_store.write().insert(version, usage);
Ok(())
}
Expand Down
41 changes: 38 additions & 3 deletions storage/aptosdb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ use aptos_crypto::{
HashValue,
};
use aptos_db_indexer::db_indexer::InternalIndexerDB;
use aptos_db_indexer_schemas::metadata::StateSnapshotProgress;
use aptos_db_indexer_schemas::{
metadata::{MetadataKey, MetadataValue, StateSnapshotProgress},
schema::indexer_metadata::InternalIndexerMetadataSchema,
};
use aptos_executor::components::in_memory_state_calculator_v2::InMemoryStateCalculatorV2;
use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER;
use aptos_infallible::Mutex;
Expand Down Expand Up @@ -1189,8 +1192,40 @@ impl StateValueWriter<StateKey, StateValue> for StateStore {
.commit(version, batch, sharded_schema_batch)
}

fn write_usage(&self, version: Version, usage: StateStorageUsage) -> Result<()> {
self.ledger_db.metadata_db().put_usage(version, usage)
fn kv_finish(&self, version: Version, usage: StateStorageUsage) -> Result<()> {
self.ledger_db.metadata_db().put_usage(version, usage)?;
if let Some(internal_indexer_db) = self.internal_indexer_db.as_ref() {
if version > 0 {
let batch = SchemaBatch::new();
batch.put::<InternalIndexerMetadataSchema>(
&MetadataKey::LatestVersion,
&MetadataValue::Version(version - 1),
)?;
if internal_indexer_db.statekeys_enabled() {
batch.put::<InternalIndexerMetadataSchema>(
&MetadataKey::StateVersion,
&MetadataValue::Version(version - 1),
)?;
}
if internal_indexer_db.transaction_enabled() {
batch.put::<InternalIndexerMetadataSchema>(
&MetadataKey::TransactionVersion,
&MetadataValue::Version(version - 1),
)?;
}
if internal_indexer_db.event_enabled() {
batch.put::<InternalIndexerMetadataSchema>(
&MetadataKey::EventVersion,
&MetadataValue::Version(version - 1),
)?;
}
internal_indexer_db
.get_inner_db_ref()
.write_schemas(batch)?;
}
}

Ok(())
}

fn get_progress(&self, version: Version) -> Result<Option<StateSnapshotProgress>> {
Expand Down
2 changes: 1 addition & 1 deletion storage/backup/backup-cli/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl StateValueWriter<StateKey, StateValue> for MockStore {
Ok(())
}

fn write_usage(&self, _version: Version, _usage: StateStorageUsage) -> Result<()> {
fn kv_finish(&self, _version: Version, _usage: StateStorageUsage) -> Result<()> {
Ok(())
}

Expand Down
74 changes: 50 additions & 24 deletions storage/indexer/src/db_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,20 @@ impl InternalIndexerDB {
Ok(())
}

pub fn get_persisted_version(&self) -> Result<Version> {
// read the latest key from the db
self.db
.get::<InternalIndexerMetadataSchema>(&MetadataKey::LatestVersion)?
.map_or(Ok(0), |metavalue| Ok(metavalue.expect_version()))
pub fn get_persisted_version(&self) -> Result<Option<Version>> {
self.get_version(&MetadataKey::LatestVersion)
}

pub fn get_event_version(&self) -> Result<Option<Version>> {
self.get_version(&MetadataKey::EventVersion)
}

pub fn get_state_version(&self) -> Result<Option<Version>> {
self.get_version(&MetadataKey::StateVersion)
}

pub fn get_transaction_version(&self) -> Result<Option<Version>> {
self.get_version(&MetadataKey::TransactionVersion)
}

pub fn event_enabled(&self) -> bool {
Expand Down Expand Up @@ -136,11 +145,13 @@ impl InternalIndexerDB {

pub fn ensure_cover_ledger_version(&self, ledger_version: Version) -> Result<()> {
let indexer_latest_version = self.get_persisted_version()?;
ensure!(
indexer_latest_version >= ledger_version,
"ledger version too new"
);
Ok(())
if let Some(indexer_latest_version) = indexer_latest_version {
if indexer_latest_version >= ledger_version {
return Ok(());
}
}

bail!("ledger version too new")
}

pub fn get_account_transaction_version_iter(
Expand Down Expand Up @@ -255,6 +266,13 @@ impl InternalIndexerDB {
(event_key, txn_version, seq_num, idx)
})))
}

fn get_version(&self, key: &MetadataKey) -> Result<Option<Version>> {
Ok(self
.db
.get::<InternalIndexerMetadataSchema>(key)?
.map(|v| v.expect_version()))
}
}

pub struct DBIndexer {
Expand Down Expand Up @@ -302,15 +320,6 @@ impl DBIndexer {
.expect("main db lowest viable version doesn't exist")
}

pub fn ensure_cover_ledger_version(&self, ledger_version: Version) -> Result<()> {
let indexer_latest_version = self.indexer_db.get_persisted_version()?;
ensure!(
indexer_latest_version >= ledger_version,
"ledger version too new"
);
Ok(())
}

fn get_main_db_iter(
&self,
start_version: Version,
Expand Down Expand Up @@ -345,15 +354,14 @@ impl DBIndexer {
}
// we want to include the last transaction since the iterator interface will is right exclusive.
let num_of_transaction = min(
(self.indexer_db.config.batch_size + 1) as u64,
self.indexer_db.config.batch_size as u64,
highest_version + 1 - version,
);
Ok(num_of_transaction)
}

pub fn process_a_batch(&self, start_version: Option<Version>) -> Result<Version> {
let mut version = start_version.unwrap_or(0);

pub fn process_a_batch(&self, start_version: Version) -> Result<Version> {
let mut version = start_version;
let num_transactions = self.get_num_of_transactions(version)?;
let mut db_iter = self.get_main_db_iter(version, num_transactions)?;
let batch = SchemaBatch::new();
Expand Down Expand Up @@ -399,7 +407,25 @@ impl DBIndexer {
version += 1;
Ok::<(), AptosDbError>(())
})?;
assert_eq!(num_transactions, version - start_version.unwrap_or(0));
assert_eq!(num_transactions, version - start_version);
if self.indexer_db.transaction_enabled() {
batch.put::<InternalIndexerMetadataSchema>(
&MetadataKey::TransactionVersion,
&MetadataValue::Version(version),
)?;
}
if self.indexer_db.event_enabled() {
batch.put::<InternalIndexerMetadataSchema>(
&MetadataKey::EventVersion,
&MetadataValue::Version(version),
)?;
}
if self.indexer_db.statekeys_enabled() {
batch.put::<InternalIndexerMetadataSchema>(
&MetadataKey::StateVersion,
&MetadataValue::Version(version),
)?;
}
batch.put::<InternalIndexerMetadataSchema>(
&MetadataKey::LatestVersion,
&MetadataValue::Version(version - 1),
Expand Down
Loading

0 comments on commit 441c7e9

Please sign in to comment.