From 791478260c9cd82db30816d8034570e0c42ced88 Mon Sep 17 00:00:00 2001 From: Bo Wu Date: Fri, 20 Sep 2024 20:59:14 -0700 Subject: [PATCH] use storage ledger info when getting transaction by hash save progress --- ...by_hash_with_delayed_internal_indexer.json | 179 ++++++++++++++++++ api/src/context.rs | 18 ++ api/src/tests/accounts_test.rs | 3 + api/src/tests/mod.rs | 18 +- api/src/tests/transactions_test.rs | 21 ++ api/src/transactions.rs | 96 ++++++---- api/test-context/src/test_context.rs | 32 +++- api/types/src/transaction.rs | 22 ++- .../src/internal_indexer_db_service.rs | 22 ++- storage/indexer/src/db_indexer.rs | 2 +- 10 files changed, 364 insertions(+), 49 deletions(-) create mode 100644 api/goldens/aptos_api__tests__transactions_test__test_get_transaction_by_hash_with_delayed_internal_indexer.json diff --git a/api/goldens/aptos_api__tests__transactions_test__test_get_transaction_by_hash_with_delayed_internal_indexer.json b/api/goldens/aptos_api__tests__transactions_test__test_get_transaction_by_hash_with_delayed_internal_indexer.json new file mode 100644 index 0000000000000..e5625afa64977 --- /dev/null +++ b/api/goldens/aptos_api__tests__transactions_test__test_get_transaction_by_hash_with_delayed_internal_indexer.json @@ -0,0 +1,179 @@ +{ + "version": "2", + "hash": "", + "state_change_hash": "", + "event_root_hash": "", + "state_checkpoint_hash": null, + "gas_used": "9", + "success": true, + "vm_status": "Executed successfully", + "accumulator_root_hash": "", + "changes": [ + { + "address": "0xa550c18", + "state_key_hash": "", + "data": { + "type": "0x1::account::Account", + "data": { + "authentication_key": "0xcef8ffd1ab9017e96132df8a56b22de39a8155e1c3fc32affbbf93eb624b532a", + "coin_register_events": { + "counter": "1", + "guid": { + "id": { + "addr": "0xa550c18", + "creation_num": "0" + } + } + }, + "guid_creation_num": "4", + "key_rotation_events": { + "counter": "0", + "guid": { + "id": { + "addr": "0xa550c18", + "creation_num": "1" + } + } + }, + "rotation_capability_offer": { + "for": { + "vec": [] + } + }, + "sequence_number": "1", + "signer_capability_offer": { + "for": { + "vec": [] + } + } + } + }, + "type": "write_resource" + }, + { + "address": "0x34bf7e2d17674feb234371a7ea58efd715f0e56ba20ebf13789480d9d643afaf", + "state_key_hash": "", + "data": { + "type": "0x1::coin::CoinStore<0x1::aptos_coin::AptosCoin>", + "data": { + "coin": { + "value": "0" + }, + "deposit_events": { + "counter": "0", + "guid": { + "id": { + "addr": "0x34bf7e2d17674feb234371a7ea58efd715f0e56ba20ebf13789480d9d643afaf", + "creation_num": "2" + } + } + }, + "frozen": false, + "withdraw_events": { + "counter": "0", + "guid": { + "id": { + "addr": "0x34bf7e2d17674feb234371a7ea58efd715f0e56ba20ebf13789480d9d643afaf", + "creation_num": "3" + } + } + } + } + }, + "type": "write_resource" + }, + { + "address": "0x34bf7e2d17674feb234371a7ea58efd715f0e56ba20ebf13789480d9d643afaf", + "state_key_hash": "", + "data": { + "type": "0x1::account::Account", + "data": { + "authentication_key": "0x34bf7e2d17674feb234371a7ea58efd715f0e56ba20ebf13789480d9d643afaf", + "coin_register_events": { + "counter": "1", + "guid": { + "id": { + "addr": "0x34bf7e2d17674feb234371a7ea58efd715f0e56ba20ebf13789480d9d643afaf", + "creation_num": "0" + } + } + }, + "guid_creation_num": "4", + "key_rotation_events": { + "counter": "0", + "guid": { + "id": { + "addr": "0x34bf7e2d17674feb234371a7ea58efd715f0e56ba20ebf13789480d9d643afaf", + "creation_num": "1" + } + } + }, + "rotation_capability_offer": { + "for": { + "vec": [] + } + }, + "sequence_number": "0", + "signer_capability_offer": { + "for": { + "vec": [] + } + } + } + }, + "type": "write_resource" + } + ], + "sender": "0xa550c18", + "sequence_number": "0", + "max_gas_amount": "100000000", + "gas_unit_price": "0", + "expiration_timestamp_secs": "18446744073709551615", + "payload": { + "function": "0x1::aptos_account::create_account", + "type_arguments": [], + "arguments": [ + "0x34bf7e2d17674feb234371a7ea58efd715f0e56ba20ebf13789480d9d643afaf" + ], + "type": "entry_function_payload" + }, + "signature": { + "public_key": "0x14418f867a0bd6d42abb2daa50cd68a5a869ce208282481f57504f630510d0d3", + "signature": "0x95915d42cd822b6195581e9be3c164b70afeb9228ebb68c2e3f14240e3f43a164caabae8096163c6a341fc3830b36618b4619b7d5f2edcd603690e91a62fdb05", + "type": "ed25519_signature" + }, + "events": [ + { + "guid": { + "creation_number": "0", + "account_address": "0x34bf7e2d17674feb234371a7ea58efd715f0e56ba20ebf13789480d9d643afaf" + }, + "sequence_number": "0", + "type": "0x1::account::CoinRegisterEvent", + "data": { + "type_info": { + "account_address": "0x1", + "module_name": "0x6170746f735f636f696e", + "struct_name": "0x4170746f73436f696e" + } + } + }, + { + "guid": { + "creation_number": "0", + "account_address": "0x0" + }, + "sequence_number": "0", + "type": "0x1::transaction_fee::FeeStatement", + "data": { + "execution_gas_units": "5", + "io_gas_units": "4", + "storage_fee_octas": "0", + "storage_fee_refund_octas": "0", + "total_charge_gas_units": "9" + } + } + ], + "timestamp": "500000", + "type": "user_transaction" +} diff --git a/api/src/context.rs b/api/src/context.rs index 3ba77fe8923eb..2de3374a5c00a 100644 --- a/api/src/context.rs +++ b/api/src/context.rs @@ -267,6 +267,20 @@ impl Context { self.get_latest_storage_ledger_info() } + pub fn get_latest_internal_and_storage_ledger_info( + &self, + ) -> Result<(Option, LedgerInfo), E> { + if let Some(indexer_reader) = self.indexer_reader.as_ref() { + if indexer_reader.is_internal_indexer_enabled() { + return Ok(( + Some(self.get_latest_internal_indexer_ledger_info()?), + self.get_latest_storage_ledger_info()?, + )); + } + } + Ok((None, self.get_latest_storage_ledger_info()?)) + } + pub fn get_latest_ledger_info_and_verify_lookup_version( &self, requested_ledger_version: Option, @@ -954,6 +968,10 @@ impl Context { } } + pub fn get_indexer_reader(&self) -> Option<&Arc> { + self.indexer_reader.as_ref() + } + fn next_bucket(&self, gas_unit_price: u64) -> u64 { match self .node_config diff --git a/api/src/tests/accounts_test.rs b/api/src/tests/accounts_test.rs index a5199cf61c452..b5bac410dc0c9 100644 --- a/api/src/tests/accounts_test.rs +++ b/api/src/tests/accounts_test.rs @@ -144,6 +144,9 @@ async fn test_account_resources_by_ledger_version_with_context(mut context: Test async fn test_get_account_resources_by_ledger_version() { let context = new_test_context(current_function_name!()); test_account_resources_by_ledger_version_with_context(context).await; +} +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_get_account_resources_by_ledger_version_with_shard_context() { let shard_context = new_test_context_with_db_sharding_and_internal_indexer(current_function_name!()); test_account_resources_by_ledger_version_with_context(shard_context).await; diff --git a/api/src/tests/mod.rs b/api/src/tests/mod.rs index e7978f66a126e..5e7573d892029 100644 --- a/api/src/tests/mod.rs +++ b/api/src/tests/mod.rs @@ -21,7 +21,7 @@ mod transactions_test; mod view_function; mod webauthn_secp256r1_ecdsa; -use aptos_api_test_context::{new_test_context as super_new_test_context, TestContext}; +use aptos_api_test_context::{new_test_context_inner as super_new_test_context, TestContext}; use aptos_config::config::{internal_indexer_db_config::InternalIndexerDBConfig, NodeConfig}; fn new_test_context(test_name: String) -> TestContext { @@ -29,12 +29,22 @@ fn new_test_context(test_name: String) -> TestContext { } fn new_test_context_with_config(test_name: String, node_config: NodeConfig) -> TestContext { - super_new_test_context(test_name, node_config, false) + super_new_test_context(test_name, node_config, false, None) } fn new_test_context_with_db_sharding_and_internal_indexer(test_name: String) -> TestContext { let mut node_config = NodeConfig::default(); node_config.storage.rocksdb_configs.enable_storage_sharding = true; - node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, 10_000); - super_new_test_context(test_name, node_config, true) + node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, 10); + super_new_test_context(test_name, node_config, true, None) +} + +fn new_test_context_with_sharding_and_delayed_internal_indexer( + test_name: String, + end_version: Option, +) -> TestContext { + let mut node_config = NodeConfig::default(); + node_config.storage.rocksdb_configs.enable_storage_sharding = true; + node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, 1); + super_new_test_context(test_name, node_config, true, end_version) } diff --git a/api/src/tests/transactions_test.rs b/api/src/tests/transactions_test.rs index 292c5318f8e6d..2b3050f730718 100644 --- a/api/src/tests/transactions_test.rs +++ b/api/src/tests/transactions_test.rs @@ -5,6 +5,7 @@ use super::new_test_context; use crate::tests::{ new_test_context_with_config, new_test_context_with_db_sharding_and_internal_indexer, + new_test_context_with_sharding_and_delayed_internal_indexer, }; use aptos_api_test_context::{assert_json, current_function_name, pretty, TestContext}; use aptos_config::config::{GasEstimationStaticOverride, NodeConfig}; @@ -491,6 +492,26 @@ async fn test_get_transaction_by_hash() { assert_json(resp, txns[0].clone()); } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_get_transaction_by_hash_with_delayed_internal_indexer() { + let mut context = new_test_context_with_sharding_and_delayed_internal_indexer( + current_function_name!(), + Some(1), + ); + let account = context.gen_account(); + let txn = context.create_user_account(&account).await; + let committed_hash = txn.committed_hash().to_hex_literal(); + context.commit_block(&vec![txn.clone()]).await; + let _ = context + .get_indexer_reader() + .unwrap() + .wait_for_internal_indexer(1); + let resp = context + .get(&format!("/transactions/by_hash/{}", committed_hash)) + .await; + context.check_golden_output(resp); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_get_transaction_by_hash_not_found() { let mut context = new_test_context(current_function_name!()); diff --git a/api/src/transactions.rs b/api/src/transactions.rs index 1e1214361961b..56495de0ad2c6 100644 --- a/api/src/transactions.rs +++ b/api/src/transactions.rs @@ -793,32 +793,38 @@ impl TransactionsApi { let context = self.context.clone(); let accept_type = accept_type.clone(); - let ledger_info = api_spawn_blocking(move || context.get_latest_ledger_info()).await?; - + let (internal_ledger_info_opt, storage_ledger_info) = + api_spawn_blocking(move || context.get_latest_internal_and_storage_ledger_info()) + .await?; + let storage_version = storage_ledger_info.ledger_version.into(); + let internal_ledger_version = internal_ledger_info_opt + .as_ref() + .map(|info| info.ledger_version.into()); + let latest_ledger_info = internal_ledger_info_opt.unwrap_or(storage_ledger_info); let txn_data = self - .get_by_hash(hash.into(), &ledger_info) + .get_by_hash(hash.into(), storage_version, internal_ledger_version) .await .context(format!("Failed to get transaction by hash {}", hash)) .map_err(|err| { BasicErrorWith404::internal_with_code( err, AptosErrorCode::InternalError, - &ledger_info, + &latest_ledger_info, ) })? .context(format!("Failed to find transaction with hash: {}", hash)) - .map_err(|_| transaction_not_found_by_hash(hash, &ledger_info))?; + .map_err(|_| transaction_not_found_by_hash(hash, &latest_ledger_info))?; - if let TransactionData::Pending(_) = txn_data { - if (start_time.elapsed().as_millis() as u64) < wait_by_hash_timeout_ms { - tokio::time::sleep(Duration::from_millis(wait_by_hash_poll_interval_ms)).await; - continue; - } + if matches!(txn_data, TransactionData::Pending(_)) + && (start_time.elapsed().as_millis() as u64) < wait_by_hash_timeout_ms + { + tokio::time::sleep(Duration::from_millis(wait_by_hash_poll_interval_ms)).await; + continue; } let api = self.clone(); return api_spawn_blocking(move || { - api.get_transaction_inner(&accept_type, txn_data, &ledger_info) + api.get_transaction_inner(&accept_type, txn_data, &latest_ledger_info) }) .await; } @@ -832,25 +838,34 @@ impl TransactionsApi { let context = self.context.clone(); let accept_type = accept_type.clone(); - let ledger_info = api_spawn_blocking(move || context.get_latest_ledger_info()).await?; + let (internal_ledger_info_opt, storage_ledger_info) = + api_spawn_blocking(move || context.get_latest_internal_and_storage_ledger_info()) + .await?; + let storage_version = storage_ledger_info.ledger_version.into(); + let internal_indexer_version = internal_ledger_info_opt + .as_ref() + .map(|info| info.ledger_version.into()); + let latest_ledger_info = internal_ledger_info_opt.unwrap_or(storage_ledger_info); let txn_data = self - .get_by_hash(hash.into(), &ledger_info) + .get_by_hash(hash.into(), storage_version, internal_indexer_version) .await .context(format!("Failed to get transaction by hash {}", hash)) .map_err(|err| { BasicErrorWith404::internal_with_code( err, AptosErrorCode::InternalError, - &ledger_info, + &latest_ledger_info, ) })? .context(format!("Failed to find transaction with hash: {}", hash)) - .map_err(|_| transaction_not_found_by_hash(hash, &ledger_info))?; + .map_err(|_| transaction_not_found_by_hash(hash, &latest_ledger_info))?; let api = self.clone(); - api_spawn_blocking(move || api.get_transaction_inner(&accept_type, txn_data, &ledger_info)) - .await + api_spawn_blocking(move || { + api.get_transaction_inner(&accept_type, txn_data, &latest_ledger_info) + }) + .await } fn get_transaction_by_version_inner( @@ -946,9 +961,11 @@ impl TransactionsApi { return Ok(GetByVersionResponse::VersionTooOld); } Ok(GetByVersionResponse::Found( - self.context - .get_transaction_by_version(version, ledger_info.version())? - .into(), + TransactionData::from_transaction_onchain_data( + self.context + .get_transaction_by_version(version, ledger_info.version())?, + ledger_info.version(), + )?, )) } @@ -959,23 +976,30 @@ impl TransactionsApi { async fn get_by_hash( &self, hash: aptos_crypto::HashValue, - ledger_info: &LedgerInfo, + storage_ledger_version: u64, + internal_ledger_version: Option, ) -> anyhow::Result> { - let context = self.context.clone(); - let version = ledger_info.version(); - let from_db = - tokio::task::spawn_blocking(move || context.get_transaction_by_hash(hash, version)) - .await - .context("Failed to join task to read transaction by hash")? - .context("Failed to read transaction by hash from DB")?; - Ok(match from_db { - None => self - .context - .get_pending_transaction_by_hash(hash) - .await? - .map(|t| t.into()), - _ => from_db.map(|t| t.into()), - }) + Ok( + match self.context.get_pending_transaction_by_hash(hash).await? { + None => { + let context_clone = self.context.clone(); + tokio::task::spawn_blocking(move || { + context_clone.get_transaction_by_hash(hash, storage_ledger_version) + }) + .await + .context("Failed to join task to read transaction by hash")? + .context("Failed to read transaction by hash from DB")? + .map(|t| { + TransactionData::from_transaction_onchain_data( + t, + internal_ledger_version.unwrap_or(storage_ledger_version), + ) + }) + .transpose()? + }, + Some(t) => Some(t.into()), + }, + ) } /// List all transactions for an account diff --git a/api/test-context/src/test_context.rs b/api/test-context/src/test_context.rs index 45452d7311d15..5f159cfa3d365 100644 --- a/api/test-context/src/test_context.rs +++ b/api/test-context/src/test_context.rs @@ -40,6 +40,7 @@ use aptos_types::{ block_info::BlockInfo, block_metadata::BlockMetadata, chain_id::ChainId, + indexer::indexer_db_reader::IndexerReader, ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, transaction::{ signature_verified_transaction::into_signature_verified_block, Transaction, @@ -95,9 +96,18 @@ impl ApiSpecificConfig { } pub fn new_test_context( + test_name: String, + node_config: NodeConfig, + use_db_with_indexer: bool, +) -> TestContext { + new_test_context_inner(test_name, node_config, use_db_with_indexer, None) +} + +pub fn new_test_context_inner( test_name: String, mut node_config: NodeConfig, use_db_with_indexer: bool, + end_version: Option, ) -> TestContext { // Speculative logging uses a global variable and when many instances use it together, they // panic, so we disable this to run tests. @@ -157,8 +167,12 @@ pub fn new_test_context( node_config .storage .set_data_dir(tmp_dir.path().to_path_buf()); - let mock_indexer_service = - MockInternalIndexerDBService::new_for_test(db_rw.reader.clone(), &node_config, recver); + let mock_indexer_service = MockInternalIndexerDBService::new_for_test( + db_rw.reader.clone(), + &node_config, + recver, + end_version, + ); let context = Context::new( ChainId::test(), @@ -428,6 +442,10 @@ impl TestContext { .await; } + pub fn get_indexer_reader(&self) -> Option<&Arc> { + self.context.get_indexer_reader() + } + pub async fn create_multisig_account( &mut self, account: &mut LocalAccount, @@ -565,6 +583,16 @@ impl TestContext { self.context.get_latest_ledger_info::().unwrap() } + pub fn get_latest_storage_ledger_info(&self) -> aptos_api_types::LedgerInfo { + self.context + .get_latest_storage_ledger_info::() + .unwrap() + } + + pub fn get_indexer_readers(&self) -> Option<&Arc> { + self.context.get_indexer_reader() + } + pub fn get_transactions(&self, start: u64, limit: u16) -> Vec { self.context .get_transactions(start, limit, self.get_latest_ledger_info().version()) diff --git a/api/types/src/transaction.rs b/api/types/src/transaction.rs index 4af7dae0b843e..c312d0ff8eae2 100755 --- a/api/types/src/transaction.rs +++ b/api/types/src/transaction.rs @@ -7,7 +7,7 @@ use crate::{ MoveModuleBytecode, MoveModuleId, MoveResource, MoveScriptBytecode, MoveStructTag, MoveType, MoveValue, VerifyInput, VerifyInputWithRecursion, U64, }; -use anyhow::{bail, Context as AnyhowContext}; +use anyhow::{bail, Context as AnyhowContext, Result}; use aptos_crypto::{ ed25519::{self, Ed25519PublicKey, ED25519_PUBLIC_KEY_LENGTH, ED25519_SIGNATURE_LENGTH}, multi_ed25519::{self, MultiEd25519PublicKey, BITMAP_NUM_OF_BYTES, MAX_NUM_OF_KEYS}, @@ -70,9 +70,21 @@ pub enum TransactionData { Pending(Box), } -impl From for TransactionData { - fn from(txn: TransactionOnChainData) -> Self { - Self::OnChain(txn) +impl TransactionData { + pub fn from_transaction_onchain_data( + txn: TransactionOnChainData, + latest_ledger_version: u64, + ) -> Result { + if txn.version > latest_ledger_version { + match txn.transaction { + aptos_types::transaction::Transaction::UserTransaction(txn) => { + Ok(Self::Pending(Box::new(txn))) + }, + _ => bail!("convert non-user onchain transaction to pending shouldn't exist"), + } + } else { + Ok(Self::OnChain(txn)) + } } } @@ -383,7 +395,7 @@ pub struct TransactionInfo { pub epoch: Option, } -/// A transaction waiting in mempool +/// A transaction waiting in mempool or not committed to internal indexer yet #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Object)] pub struct PendingTransaction { pub hash: HashValue, diff --git a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs index 2feaf9c3b5702..5e36b2ca7bb3d 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-table-info/src/internal_indexer_db_service.rs @@ -174,6 +174,25 @@ impl InternalIndexerDBService { start_version = next_version; } } + + // For internal testing + pub async fn run_with_end_version( + &mut self, + node_config: &NodeConfig, + end_version: Option, + ) -> Result<()> { + let mut start_version = self.get_start_version(node_config).await?; + while start_version <= end_version.unwrap_or(std::u64::MAX) { + let next_version = self.db_indexer.process_a_batch(start_version)?; + if next_version == start_version { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + continue; + } + start_version = next_version; + } + + Ok(()) + } } pub struct MockInternalIndexerDBService { @@ -186,6 +205,7 @@ impl MockInternalIndexerDBService { db_reader: Arc, node_config: &NodeConfig, update_receiver: WatchReceiver, + end_version: Option, ) -> Self { if !node_config .indexer_db_config @@ -205,7 +225,7 @@ impl MockInternalIndexerDBService { let config_clone = node_config.to_owned(); handle.spawn(async move { internal_indexer_db_service - .run(&config_clone) + .run_with_end_version(&config_clone, end_version) .await .unwrap(); }); diff --git a/storage/indexer/src/db_indexer.rs b/storage/indexer/src/db_indexer.rs index 63351a633a15d..ad0cbf56053fc 100644 --- a/storage/indexer/src/db_indexer.rs +++ b/storage/indexer/src/db_indexer.rs @@ -408,7 +408,7 @@ impl DBIndexer { version += 1; Ok::<(), AptosDbError>(()) })?; - // Assert we have processes all the readable transaction. + assert!(version > 0, "batch number should be greater than 0"); assert_eq!(num_transactions, version - start_version); if self.indexer_db.transaction_enabled() {