From 230ab00d579fba0a6f8f24b8bcf0abc82f7aa331 Mon Sep 17 00:00:00 2001 From: Igor Date: Thu, 22 Sep 2022 10:01:59 -0700 Subject: [PATCH] [API Client] Fix wait_for_transaction to wait for expiry and return better messages --- Cargo.lock | 1 + crates/aptos-rest-client/Cargo.toml | 1 + crates/aptos-rest-client/src/lib.rs | 184 +++++++++++++++++++--------- 3 files changed, 131 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0ec49a46f1dae..0ba4ed1911b54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1024,6 +1024,7 @@ dependencies = [ "anyhow", "aptos-api-types", "aptos-crypto", + "aptos-infallible", "aptos-logger", "aptos-types", "bcs", diff --git a/crates/aptos-rest-client/Cargo.toml b/crates/aptos-rest-client/Cargo.toml index 6dd7f0a0b32bf..1b7c2b519f492 100644 --- a/crates/aptos-rest-client/Cargo.toml +++ b/crates/aptos-rest-client/Cargo.toml @@ -28,6 +28,7 @@ url = "2.2.2" aptos-api-types = { path = "../../api/types" } aptos-crypto = { path = "../aptos-crypto" } +aptos-infallible = { path = "../aptos-infallible" } aptos-logger = { path = "../aptos-logger" } aptos-types = { path = "../../types" } diff --git a/crates/aptos-rest-client/src/lib.rs b/crates/aptos-rest-client/src/lib.rs index 2462099be5d0c..cb48b821854b7 100644 --- a/crates/aptos-rest-client/src/lib.rs +++ b/crates/aptos-rest-client/src/lib.rs @@ -27,6 +27,7 @@ use aptos_api_types::{ TransactionOnChainData, TransactionsBatchSubmissionResult, UserTransaction, VersionedEvent, }; use aptos_crypto::HashValue; +use aptos_logger::{info, sample, sample::SampleRate, sample::Sampling, warn}; use aptos_types::{ account_address::AccountAddress, account_config::{AccountResource, CoinStoreResource, NewBlockEvent, CORE_CODE_ADDRESS}, @@ -499,39 +500,109 @@ impl Client { .await } - pub async fn wait_for_transaction_by_hash( + async fn wait_for_transaction_by_hash_inner( &self, hash: HashValue, expiration_timestamp_secs: u64, - ) -> AptosResult> { - const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60); + fetch: F, + ) -> AptosResult> + where + F: Fn(HashValue) -> Fut, + Fut: Future>>, + { + const DEFAULT_EXTRA_TIMEOUT: u64 = 60; const DEFAULT_DELAY: Duration = Duration::from_millis(500); let start = std::time::Instant::now(); - while start.elapsed() < DEFAULT_TIMEOUT { - let resp = self.get_transaction_by_hash_inner(hash).await?; - if resp.status() != StatusCode::NOT_FOUND { - let txn_resp: Response = self.json(resp).await?; - let (transaction, state) = txn_resp.into_parts(); - - if !transaction.is_pending() { - if !transaction.success() { - return Err(anyhow!( - "transaction execution failed: {}", - transaction.vm_status() - ))?; + loop { + match fetch(hash).await? { + WaitForTransactionResult::Success(result) => { + return Ok(result); + } + WaitForTransactionResult::FailedExecution(vm_status) => { + return Err(anyhow!( + "Transaction committed on chain, but failed execution: {}", + vm_status + ))?; + } + WaitForTransactionResult::Pending(state) => { + if expiration_timestamp_secs <= state.timestamp_usecs / 1_000_000 { + return Err(anyhow!("Transaction expired, it is guaranteed it will not be committed on chain.").into()); } - return Ok(Response::new(transaction, state)); } - if expiration_timestamp_secs <= state.timestamp_usecs / 1_000_000 { - return Err(anyhow!("transaction expired").into()); + WaitForTransactionResult::NotFound(error) => { + if let RestError::Api(aptos_error_response) = error { + if let Some(state) = aptos_error_response.state { + if expiration_timestamp_secs <= state.timestamp_usecs / 1_000_000 { + return Err(anyhow!("Transaction expired, it is guaranteed it will not be committed on chain.").into()); + } + } + } else { + return Err(error); + } + sample!( + SampleRate::Duration(Duration::from_secs(30)), + warn!( + "Cannot yet find transaction in mempool on {:?}, continuing to wait.", + self.path_prefix_string(), + ) + ); } } + if aptos_infallible::duration_since_epoch().as_secs() + > expiration_timestamp_secs + DEFAULT_EXTRA_TIMEOUT + { + return Err(anyhow!( + "Ledger on client {} is beyond {}s behind current time, timeouting waiting for the transaction. Transaction might still succeed.", self.path_prefix_string(), DEFAULT_EXTRA_TIMEOUT).into()); + } + + if start.elapsed().as_secs() > 30 { + sample!( + SampleRate::Duration(Duration::from_secs(30)), + warn!("Continuing to wait for transaction {}.", hash) + ); + } + tokio::time::sleep(DEFAULT_DELAY).await; } + } - Err(anyhow!("timeout").into()) + pub async fn wait_for_transaction_by_hash( + &self, + hash: HashValue, + expiration_timestamp_secs: u64, + ) -> AptosResult> { + self.wait_for_transaction_by_hash_inner( + hash, + expiration_timestamp_secs, + |hash| async move { + let resp = self.get_transaction_by_hash_inner(hash).await?; + if resp.status() != StatusCode::NOT_FOUND { + let txn_resp: Response = self.json(resp).await?; + let (transaction, state) = txn_resp.into_parts(); + + if !transaction.is_pending() { + if !transaction.success() { + Ok(WaitForTransactionResult::FailedExecution( + transaction.vm_status(), + )) + } else { + Ok(WaitForTransactionResult::Success(Response::new( + transaction, + state, + ))) + } + } else { + Ok(WaitForTransactionResult::Pending(state)) + } + } else { + let error_response = parse_error(resp).await; + Ok(WaitForTransactionResult::NotFound(error_response)) + } + }, + ) + .await } pub async fn wait_for_transaction_by_hash_bcs( @@ -539,42 +610,38 @@ impl Client { hash: HashValue, expiration_timestamp_secs: u64, ) -> AptosResult> { - const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60); - const DEFAULT_DELAY: Duration = Duration::from_millis(500); - - let start = std::time::Instant::now(); - while start.elapsed() < DEFAULT_TIMEOUT { - let resp = self.get_transaction_by_hash_bcs_inner(hash).await?; - - // If it's not found, keep waiting for it - if resp.status() != StatusCode::NOT_FOUND { - let resp = self.check_and_parse_bcs_response(resp).await?; - let resp = resp.and_then(|bytes| bcs::from_bytes(&bytes))?; - let (maybe_pending_txn, state) = resp.into_parts(); - - // If we have a committed transaction, determine if it failed or not - if let TransactionData::OnChain(txn) = maybe_pending_txn { - let status = txn.info.status(); - - // The user can handle the error - return match status { - ExecutionStatus::Success => Ok(Response::new(txn, state)), - _ => Err(anyhow!("Transaction failed").into()), - }; - } - - // If it's expired lets give up - if Duration::from_secs(expiration_timestamp_secs) - <= Duration::from_micros(state.timestamp_usecs) - { - return Err(anyhow!("Transaction expired").into()); + self.wait_for_transaction_by_hash_inner( + hash, + expiration_timestamp_secs, + |hash| async move { + let resp = self.get_transaction_by_hash_bcs_inner(hash).await?; + if resp.status() != StatusCode::NOT_FOUND { + let resp = self.check_and_parse_bcs_response(resp).await?; + let resp = resp.and_then(|bytes| bcs::from_bytes(&bytes))?; + let (maybe_pending_txn, state) = resp.into_parts(); + + // If we have a committed transaction, determine if it failed or not + if let TransactionData::OnChain(txn) = maybe_pending_txn { + let status = txn.info.status(); + + if status.is_success() { + Ok(WaitForTransactionResult::Success(Response::new(txn, state))) + } else { + Ok(WaitForTransactionResult::FailedExecution(format!( + "{:?}", + status + ))) + } + } else { + Ok(WaitForTransactionResult::Pending(state)) + } + } else { + let error_response = parse_error(resp).await; + Ok(WaitForTransactionResult::NotFound(error_response)) } - } - - tokio::time::sleep(DEFAULT_DELAY).await; - } - - Err(anyhow!("Timed out waiting for transaction").into()) + }, + ) + .await } pub async fn wait_for_version(&self, version: u64) -> Result { @@ -1185,7 +1252,7 @@ impl Client { break; } - aptos_logger::info!( + info!( "Failed to call API, retrying in {}ms: {:?}", backoff.as_millis(), result.as_ref().err().unwrap() @@ -1258,3 +1325,10 @@ pub struct GasEstimationParams { pub estimated_gas_used: u64, pub estimated_gas_price: u64, } + +enum WaitForTransactionResult { + NotFound(RestError), + FailedExecution(String), + Pending(State), + Success(Response), +}