Skip to content

Commit

Permalink
[API Client] Fix wait_for_transaction to wait for expiry and return b…
Browse files Browse the repository at this point in the history
…etter messages
  • Loading branch information
igor-aptos committed Sep 22, 2022
1 parent 50bbf4d commit 230ab00
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 55 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/aptos-rest-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ url = "2.2.2"

aptos-api-types = { path = "../../api/types" }
aptos-crypto = { path = "../aptos-crypto" }
aptos-infallible = { path = "../aptos-infallible" }
aptos-logger = { path = "../aptos-logger" }
aptos-types = { path = "../../types" }

Expand Down
184 changes: 129 additions & 55 deletions crates/aptos-rest-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use aptos_api_types::{
TransactionOnChainData, TransactionsBatchSubmissionResult, UserTransaction, VersionedEvent,
};
use aptos_crypto::HashValue;
use aptos_logger::{info, sample, sample::SampleRate, sample::Sampling, warn};
use aptos_types::{
account_address::AccountAddress,
account_config::{AccountResource, CoinStoreResource, NewBlockEvent, CORE_CODE_ADDRESS},
Expand Down Expand Up @@ -499,82 +500,148 @@ impl Client {
.await
}

pub async fn wait_for_transaction_by_hash(
async fn wait_for_transaction_by_hash_inner<F, Fut, T>(
&self,
hash: HashValue,
expiration_timestamp_secs: u64,
) -> AptosResult<Response<Transaction>> {
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
fetch: F,
) -> AptosResult<Response<T>>
where
F: Fn(HashValue) -> Fut,
Fut: Future<Output = AptosResult<WaitForTransactionResult<T>>>,
{
const DEFAULT_EXTRA_TIMEOUT: u64 = 60;
const DEFAULT_DELAY: Duration = Duration::from_millis(500);

let start = std::time::Instant::now();
while start.elapsed() < DEFAULT_TIMEOUT {
let resp = self.get_transaction_by_hash_inner(hash).await?;
if resp.status() != StatusCode::NOT_FOUND {
let txn_resp: Response<Transaction> = self.json(resp).await?;
let (transaction, state) = txn_resp.into_parts();

if !transaction.is_pending() {
if !transaction.success() {
return Err(anyhow!(
"transaction execution failed: {}",
transaction.vm_status()
))?;
loop {
match fetch(hash).await? {
WaitForTransactionResult::Success(result) => {
return Ok(result);
}
WaitForTransactionResult::FailedExecution(vm_status) => {
return Err(anyhow!(
"Transaction committed on chain, but failed execution: {}",
vm_status
))?;
}
WaitForTransactionResult::Pending(state) => {
if expiration_timestamp_secs <= state.timestamp_usecs / 1_000_000 {
return Err(anyhow!("Transaction expired, it is guaranteed it will not be committed on chain.").into());
}
return Ok(Response::new(transaction, state));
}
if expiration_timestamp_secs <= state.timestamp_usecs / 1_000_000 {
return Err(anyhow!("transaction expired").into());
WaitForTransactionResult::NotFound(error) => {
if let RestError::Api(aptos_error_response) = error {
if let Some(state) = aptos_error_response.state {
if expiration_timestamp_secs <= state.timestamp_usecs / 1_000_000 {
return Err(anyhow!("Transaction expired, it is guaranteed it will not be committed on chain.").into());
}
}
} else {
return Err(error);
}
sample!(
SampleRate::Duration(Duration::from_secs(30)),
warn!(
"Cannot yet find transaction in mempool on {:?}, continuing to wait.",
self.path_prefix_string(),
)
);
}
}

if aptos_infallible::duration_since_epoch().as_secs()
> expiration_timestamp_secs + DEFAULT_EXTRA_TIMEOUT
{
return Err(anyhow!(
"Ledger on client {} is beyond {}s behind current time, timeouting waiting for the transaction. Transaction might still succeed.", self.path_prefix_string(), DEFAULT_EXTRA_TIMEOUT).into());
}

if start.elapsed().as_secs() > 30 {
sample!(
SampleRate::Duration(Duration::from_secs(30)),
warn!("Continuing to wait for transaction {}.", hash)
);
}

tokio::time::sleep(DEFAULT_DELAY).await;
}
}

Err(anyhow!("timeout").into())
pub async fn wait_for_transaction_by_hash(
&self,
hash: HashValue,
expiration_timestamp_secs: u64,
) -> AptosResult<Response<Transaction>> {
self.wait_for_transaction_by_hash_inner(
hash,
expiration_timestamp_secs,
|hash| async move {
let resp = self.get_transaction_by_hash_inner(hash).await?;
if resp.status() != StatusCode::NOT_FOUND {
let txn_resp: Response<Transaction> = self.json(resp).await?;
let (transaction, state) = txn_resp.into_parts();

if !transaction.is_pending() {
if !transaction.success() {
Ok(WaitForTransactionResult::FailedExecution(
transaction.vm_status(),
))
} else {
Ok(WaitForTransactionResult::Success(Response::new(
transaction,
state,
)))
}
} else {
Ok(WaitForTransactionResult::Pending(state))
}
} else {
let error_response = parse_error(resp).await;
Ok(WaitForTransactionResult::NotFound(error_response))
}
},
)
.await
}

pub async fn wait_for_transaction_by_hash_bcs(
&self,
hash: HashValue,
expiration_timestamp_secs: u64,
) -> AptosResult<Response<TransactionOnChainData>> {
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60);
const DEFAULT_DELAY: Duration = Duration::from_millis(500);

let start = std::time::Instant::now();
while start.elapsed() < DEFAULT_TIMEOUT {
let resp = self.get_transaction_by_hash_bcs_inner(hash).await?;

// If it's not found, keep waiting for it
if resp.status() != StatusCode::NOT_FOUND {
let resp = self.check_and_parse_bcs_response(resp).await?;
let resp = resp.and_then(|bytes| bcs::from_bytes(&bytes))?;
let (maybe_pending_txn, state) = resp.into_parts();

// If we have a committed transaction, determine if it failed or not
if let TransactionData::OnChain(txn) = maybe_pending_txn {
let status = txn.info.status();

// The user can handle the error
return match status {
ExecutionStatus::Success => Ok(Response::new(txn, state)),
_ => Err(anyhow!("Transaction failed").into()),
};
}

// If it's expired lets give up
if Duration::from_secs(expiration_timestamp_secs)
<= Duration::from_micros(state.timestamp_usecs)
{
return Err(anyhow!("Transaction expired").into());
self.wait_for_transaction_by_hash_inner(
hash,
expiration_timestamp_secs,
|hash| async move {
let resp = self.get_transaction_by_hash_bcs_inner(hash).await?;
if resp.status() != StatusCode::NOT_FOUND {
let resp = self.check_and_parse_bcs_response(resp).await?;
let resp = resp.and_then(|bytes| bcs::from_bytes(&bytes))?;
let (maybe_pending_txn, state) = resp.into_parts();

// If we have a committed transaction, determine if it failed or not
if let TransactionData::OnChain(txn) = maybe_pending_txn {
let status = txn.info.status();

if status.is_success() {
Ok(WaitForTransactionResult::Success(Response::new(txn, state)))
} else {
Ok(WaitForTransactionResult::FailedExecution(format!(
"{:?}",
status
)))
}
} else {
Ok(WaitForTransactionResult::Pending(state))
}
} else {
let error_response = parse_error(resp).await;
Ok(WaitForTransactionResult::NotFound(error_response))
}
}

tokio::time::sleep(DEFAULT_DELAY).await;
}

Err(anyhow!("Timed out waiting for transaction").into())
},
)
.await
}

pub async fn wait_for_version(&self, version: u64) -> Result<State> {
Expand Down Expand Up @@ -1185,7 +1252,7 @@ impl Client {
break;
}

aptos_logger::info!(
info!(
"Failed to call API, retrying in {}ms: {:?}",
backoff.as_millis(),
result.as_ref().err().unwrap()
Expand Down Expand Up @@ -1258,3 +1325,10 @@ pub struct GasEstimationParams {
pub estimated_gas_used: u64,
pub estimated_gas_price: u64,
}

enum WaitForTransactionResult<T> {
NotFound(RestError),
FailedExecution(String),
Pending(State),
Success(Response<T>),
}

0 comments on commit 230ab00

Please sign in to comment.