Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
feature: PendingTransaction returns Option (#327)
Browse files Browse the repository at this point in the history
* feature: PendingTransaction returns Option

* chore: expand safety reasoning in pending tx expect

* chore: clippy lints

* bug: check readiness of future before taking receipt option
  • Loading branch information
prestwich authored Jul 6, 2021
1 parent dd98a59 commit d722c1a
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 32 deletions.
3 changes: 2 additions & 1 deletion ethers-contract/src/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ impl<M: Middleware> Deployer<M> {
let receipt = pending_tx
.confirmations(self.confs)
.await
.map_err(|_| ContractError::ContractNotDeployed)?;
.map_err(|_| ContractError::ContractNotDeployed)?
.ok_or(ContractError::ContractNotDeployed)?;
let address = receipt
.contract_address
.ok_or(ContractError::ContractNotDeployed)?;
Expand Down
2 changes: 1 addition & 1 deletion ethers-contract/tests/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ mod eth_tests {
let gas_estimate = contract_call.estimate_gas().await.unwrap();
let pending_tx = contract_call.send().await.unwrap();
let tx = client.get_transaction(*pending_tx).await.unwrap().unwrap();
let tx_receipt = pending_tx.await.unwrap();
let tx_receipt = pending_tx.await.unwrap().unwrap();
assert_eq!(last_sender.clone().call().await.unwrap(), client2.address());
assert_eq!(get_value.clone().call().await.unwrap(), "hi");
assert_eq!(tx.input, calldata);
Expand Down
3 changes: 2 additions & 1 deletion ethers-middleware/src/transformer/ds_proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ impl DsProxy {
.send()
.await?
.await
.map_err(ContractError::ProviderError)?;
.map_err(ContractError::ProviderError)?
.ok_or(ContractError::ContractNotDeployed)?;

// decode the event log to get the address of the deployed contract.
if tx_receipt.status == Some(U64::from(1u64)) {
Expand Down
119 changes: 96 additions & 23 deletions ethers-providers/src/pending_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
stream::{interval, DEFAULT_POLL_INTERVAL},
JsonRpcClient, PinBoxFut, Provider, ProviderError,
};
use ethers_core::types::{TransactionReceipt, TxHash, U64};
use ethers_core::types::{Transaction, TransactionReceipt, TxHash, U64};
use futures_core::stream::Stream;
use futures_util::stream::StreamExt;
use pin_project::pin_project;
Expand Down Expand Up @@ -33,12 +33,12 @@ pub struct PendingTransaction<'a, P> {
impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> {
/// Creates a new pending transaction poller from a hash and a provider
pub fn new(tx_hash: TxHash, provider: &'a Provider<P>) -> Self {
let fut = Box::pin(provider.get_transaction_receipt(tx_hash));
let fut = Box::pin(provider.get_transaction(tx_hash));
Self {
tx_hash,
confirmations: 1,
provider,
state: PendingTxState::GettingReceipt(fut),
state: PendingTxState::GettingTx(fut),
interval: Box::new(interval(DEFAULT_POLL_INTERVAL)),
}
}
Expand All @@ -57,13 +57,68 @@ impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> {
}
}

macro_rules! rewake_with_new_state {
($ctx:ident, $this:ident, $new_state:expr) => {
*$this.state = $new_state;
$ctx.waker().wake_by_ref();
return Poll::Pending;
};
}

macro_rules! rewake_with_new_state_if {
($condition:expr, $ctx:ident, $this:ident, $new_state:expr) => {
if $condition {
rewake_with_new_state!($ctx, $this, $new_state);
}
};
}

impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
type Output = Result<TransactionReceipt, ProviderError>;
type Output = Result<Option<TransactionReceipt>, ProviderError>;

fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
let this = self.project();

match this.state {
PendingTxState::PausedGettingTx => {
// Wait the polling period so that we do not spam the chain when no
// new block has been mined
let _ready = futures_util::ready!(this.interval.poll_next_unpin(ctx));
let fut = Box::pin(this.provider.get_transaction(*this.tx_hash));
*this.state = PendingTxState::GettingTx(fut);
ctx.waker().wake_by_ref();
}
PendingTxState::GettingTx(fut) => {
let tx_res = futures_util::ready!(fut.as_mut().poll(ctx));
// If the provider errors, just try again after the interval.
// nbd.
rewake_with_new_state_if!(
tx_res.is_err(),
ctx,
this,
PendingTxState::PausedGettingTx
);

let tx_opt = tx_res.unwrap();
// If the tx is no longer in the mempool, return Ok(None)
if tx_opt.is_none() {
*this.state = PendingTxState::Completed;
return Poll::Ready(Ok(None));
}

// If it hasn't confirmed yet, poll again later
let tx = tx_opt.unwrap();
rewake_with_new_state_if!(
tx.block_number.is_none(),
ctx,
this,
PendingTxState::PausedGettingTx
);

// Start polling for the receipt now
let fut = Box::pin(this.provider.get_transaction_receipt(*this.tx_hash));
*this.state = PendingTxState::GettingReceipt(fut);
}
PendingTxState::PausedGettingReceipt => {
// Wait the polling period so that we do not spam the chain when no
// new block has been mined
Expand All @@ -73,25 +128,31 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
ctx.waker().wake_by_ref();
}
PendingTxState::GettingReceipt(fut) => {
if let Ok(Some(receipt)) = futures_util::ready!(fut.as_mut().poll(ctx)) {
*this.state = PendingTxState::CheckingReceipt(Box::new(receipt))
if let Ok(receipt) = futures_util::ready!(fut.as_mut().poll(ctx)) {
*this.state = PendingTxState::CheckingReceipt(receipt)
} else {
*this.state = PendingTxState::PausedGettingReceipt
}
ctx.waker().wake_by_ref();
}
PendingTxState::CheckingReceipt(receipt) => {
rewake_with_new_state_if!(
receipt.is_none(),
ctx,
this,
PendingTxState::PausedGettingReceipt
);

// If we requested more than 1 confirmation, we need to compare the receipt's
// block number and the current block
if *this.confirmations > 1 {
let fut = Box::pin(this.provider.get_block_number());
*this.state =
PendingTxState::GettingBlockNumber(fut, Box::new(*receipt.clone()));
*this.state = PendingTxState::GettingBlockNumber(fut, receipt.take());

// Schedule the waker to poll again
ctx.waker().wake_by_ref();
} else {
let receipt = *receipt.clone();
let receipt = receipt.take();
*this.state = PendingTxState::Completed;
return Poll::Ready(Ok(receipt));
}
Expand All @@ -104,26 +165,30 @@ impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
// we need to re-instantiate the get_block_number future so that
// we poll again
let fut = Box::pin(this.provider.get_block_number());
*this.state = PendingTxState::GettingBlockNumber(fut, receipt.clone());
*this.state = PendingTxState::GettingBlockNumber(fut, receipt.take());
ctx.waker().wake_by_ref();
}
PendingTxState::GettingBlockNumber(fut, receipt) => {
let current_block = futures_util::ready!(fut.as_mut().poll(ctx))?;

// This is safe so long as we only enter the `GettingBlock`
// loop from `CheckingReceipt`, which contains an explicit
// `is_none` check
let receipt = receipt.take().expect("GettingBlockNumber without receipt");

// Wait for the interval
let inclusion_block = receipt
.block_number
.expect("Receipt did not have a block number. This should never happen");

let current_block = futures_util::ready!(fut.as_mut().poll(ctx))?;

// if the transaction has at least K confirmations, return the receipt
// (subtract 1 since the tx already has 1 conf when it's mined)
if current_block > inclusion_block + *this.confirmations - 1 {
let receipt = *receipt.clone();
let receipt = Some(receipt);
*this.state = PendingTxState::Completed;
return Poll::Ready(Ok(receipt));
} else {
tracing::trace!(tx_hash = ?this.tx_hash, "confirmations {}/{}", current_block - inclusion_block + 1, this.confirmations);
*this.state = PendingTxState::PausedGettingBlockNumber(receipt.clone());
*this.state = PendingTxState::PausedGettingBlockNumber(Some(receipt));
ctx.waker().wake_by_ref();
}
}
Expand Down Expand Up @@ -171,21 +236,27 @@ impl<'a, P> Deref for PendingTransaction<'a, P> {
// We box the TransactionReceipts to keep the enum small.
enum PendingTxState<'a> {
/// Waiting for interval to elapse before calling API again
PausedGettingReceipt,
PausedGettingTx,

/// Polling the blockchain for the receipt
GettingReceipt(PinBoxFut<'a, Option<TransactionReceipt>>),
/// Polling The blockchain to see if the Tx has confirmed or dropped
GettingTx(PinBoxFut<'a, Option<Transaction>>),

/// Waiting for interval to elapse before calling API again
PausedGettingBlockNumber(Box<TransactionReceipt>),
PausedGettingReceipt,

/// Polling the blockchain for the current block number
GettingBlockNumber(PinBoxFut<'a, U64>, Box<TransactionReceipt>),
/// Polling the blockchain for the receipt
GettingReceipt(PinBoxFut<'a, Option<TransactionReceipt>>),

/// If the pending tx required only 1 conf, it will return early. Otherwise it will
/// proceed to the next state which will poll the block number until there have been
/// enough confirmations
CheckingReceipt(Box<TransactionReceipt>),
CheckingReceipt(Option<TransactionReceipt>),

/// Waiting for interval to elapse before calling API again
PausedGettingBlockNumber(Option<TransactionReceipt>),

/// Polling the blockchain for the current block number
GettingBlockNumber(PinBoxFut<'a, U64>, Option<TransactionReceipt>),

/// Future has completed and should panic if polled again
Completed,
Expand All @@ -194,8 +265,10 @@ enum PendingTxState<'a> {
impl<'a> fmt::Debug for PendingTxState<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = match self {
PendingTxState::GettingReceipt(_) => "GettingReceipt",
PendingTxState::PausedGettingTx => "PausedGettingTx",
PendingTxState::GettingTx(_) => "GettingTx",
PendingTxState::PausedGettingReceipt => "PausedGettingReceipt",
PendingTxState::GettingReceipt(_) => "GettingReceipt",
PendingTxState::GettingBlockNumber(_, _) => "GettingBlockNumber",
PendingTxState::PausedGettingBlockNumber(_) => "PausedGettingBlockNumber",
PendingTxState::CheckingReceipt(_) => "CheckingReceipt",
Expand Down
2 changes: 1 addition & 1 deletion ethers-providers/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ mod tests {
.is_none());

let hash = *pending_tx;
let receipt = pending_tx.await.unwrap();
let receipt = pending_tx.await.unwrap().unwrap();
assert_eq!(receipt.transaction_hash, hash);
}

Expand Down
7 changes: 5 additions & 2 deletions ethers-providers/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ mod tests {
.unwrap()
.await
.unwrap()
.unwrap()
}),
)
.fuse();
Expand Down Expand Up @@ -361,7 +362,7 @@ mod tests {

let stream = TransactionStream::new(
&provider,
stream::iter(txs.iter().map(|tx| tx.transaction_hash)),
stream::iter(txs.iter().cloned().map(|tx| tx.unwrap().transaction_hash)),
10,
);
let res = stream
Expand All @@ -374,7 +375,9 @@ mod tests {
assert_eq!(res.len(), txs.len());
assert_eq!(
res.into_iter().map(|tx| tx.hash).collect::<HashSet<_>>(),
txs.into_iter().map(|tx| tx.transaction_hash).collect()
txs.into_iter()
.map(|tx| tx.unwrap().transaction_hash)
.collect()
);
}
}
2 changes: 1 addition & 1 deletion ethers-providers/tests/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ mod eth_tests {
let tx = TransactionRequest::new().to(who).from(who);
let pending_tx = provider.send_transaction(tx, None).await.unwrap();
let tx_hash = *pending_tx;
let receipt = pending_tx.confirmations(3).await.unwrap();
let receipt = pending_tx.confirmations(3).await.unwrap().unwrap();
// got the correct receipt
assert_eq!(receipt.transaction_hash, tx_hash);
}
Expand Down
6 changes: 5 additions & 1 deletion ethers/examples/ens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ async fn main() -> Result<()> {
let tx = TransactionRequest::new().to("vitalik.eth").value(100_000);

// send it!
let receipt = client.send_transaction(tx, None).await?.await?;
let receipt = client
.send_transaction(tx, None)
.await?
.await?
.ok_or_else(|| anyhow::format_err!("tx dropped from mempool"))?;
let tx = client.get_transaction(receipt.transaction_hash).await?;

println!("{}", serde_json::to_string(&tx)?);
Expand Down
4 changes: 3 additions & 1 deletion ethers/examples/local_signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ async fn main() -> Result<()> {
let pending_tx = client.send_transaction(tx, None).await?;

// get the mined tx
let receipt = pending_tx.await?;
let receipt = pending_tx
.await?
.ok_or_else(|| anyhow::format_err!("tx dropped from mempool"))?;
let tx = client.get_transaction(receipt.transaction_hash).await?;

println!("Sent tx: {}\n", serde_json::to_string(&tx)?);
Expand Down

0 comments on commit d722c1a

Please sign in to comment.