Skip to content

Commit

Permalink
fix(mempool): Re-verify transactions that were verified at a differen…
Browse files Browse the repository at this point in the history
…t tip height (#6154)

* checks tip height before mempool insertions

* adds unit test for reverifying txs

* Adds TODO

* Adds correctness note

* dedup best_tip_height() calls

* Update zebrad/src/components/mempool.rs

Co-authored-by: teor <[email protected]>

* uses Option for expected tip height

* removes misplaced dummy_call()

* calls wait_for_chain_tip without a timeout where it doesn't matter and skips instead of panicking where it doesn't

* Update zebrad/src/components/mempool/tests/vector.rs

* removes whitespace for rustfmt

---------

Co-authored-by: teor <[email protected]>
  • Loading branch information
arya2 and teor2345 authored Feb 16, 2023
1 parent fddd361 commit f253213
Show file tree
Hide file tree
Showing 3 changed files with 233 additions and 24 deletions.
40 changes: 28 additions & 12 deletions zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,20 +372,36 @@ impl Service<Request> for Mempool {
// Collect inserted transaction ids.
let mut send_to_peers_ids = HashSet::<_>::new();

let best_tip_height = self.latest_chain_tip.best_tip_height();

// Clean up completed download tasks and add to mempool if successful.
while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) {
match r {
Ok(tx) => {
let insert_result = storage.insert(tx.clone());

tracing::trace!(
?insert_result,
"got Ok(_) transaction verify, tried to store",
);

if let Ok(inserted_id) = insert_result {
// Save transaction ids that we will send to peers
send_to_peers_ids.insert(inserted_id);
Ok((tx, expected_tip_height)) => {
// # Correctness:
//
// It's okay to use tip height here instead of the tip hash since
// chain_tip_change.last_tip_change() returns a `TipAction::Reset` when
// the best chain changes (which is the only way to stay at the same height), and the
// mempool re-verifies all pending tx_downloads when there's a `TipAction::Reset`.
if best_tip_height == expected_tip_height {
let insert_result = storage.insert(tx.clone());

tracing::trace!(
?insert_result,
"got Ok(_) transaction verify, tried to store",
);

if let Ok(inserted_id) = insert_result {
// Save transaction ids that we will send to peers
send_to_peers_ids.insert(inserted_id);
}
} else {
tracing::trace!("chain grew during tx verification, retrying ..",);

// We don't care if re-queueing the transaction request fails.
let _result =
tx_downloads.download_if_needed_and_verify(tx.transaction.into());
}
}
Err((txid, error)) => {
Expand Down Expand Up @@ -416,7 +432,7 @@ impl Service<Request> for Mempool {
//
// Lock times never expire, because block times are strictly increasing.
// So we don't need to check them here.
if let Some(tip_height) = self.latest_chain_tip.best_tip_height() {
if let Some(tip_height) = best_tip_height {
let expired_transactions = storage.remove_expired_transactions(tip_height);
// Remove transactions that are expired from the peers list
send_to_peers_ids =
Expand Down
28 changes: 17 additions & 11 deletions zebrad/src/components/mempool/downloads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,12 @@ where
/// A list of pending transaction download and verify tasks.
#[pin]
pending: FuturesUnordered<
JoinHandle<Result<VerifiedUnminedTx, (TransactionDownloadVerifyError, UnminedTxId)>>,
JoinHandle<
Result<
(VerifiedUnminedTx, Option<Height>),
(TransactionDownloadVerifyError, UnminedTxId),
>,
>,
>,

/// A list of channels that can be used to cancel pending transaction download and
Expand All @@ -165,7 +170,8 @@ where
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
type Item = Result<VerifiedUnminedTx, (UnminedTxId, TransactionDownloadVerifyError)>;
type Item =
Result<(VerifiedUnminedTx, Option<Height>), (UnminedTxId, TransactionDownloadVerifyError)>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
Expand All @@ -180,9 +186,9 @@ where
// TODO: this would be cleaner with poll_map (#2693)
if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
match join_result.expect("transaction download and verify tasks must not panic") {
Ok(tx) => {
Ok((tx, tip_height)) => {
this.cancel_handles.remove(&tx.transaction.id);
Poll::Ready(Some(Ok(tx)))
Poll::Ready(Some(Ok((tx, tip_height))))
}
Err((e, hash)) => {
this.cancel_handles.remove(&hash);
Expand Down Expand Up @@ -282,12 +288,12 @@ where

trace!(?txid, "transaction is not in best chain");

let next_height = match state.oneshot(zs::Request::Tip).await {
Ok(zs::Response::Tip(None)) => Ok(Height(0)),
let (tip_height, next_height) = match state.oneshot(zs::Request::Tip).await {
Ok(zs::Response::Tip(None)) => Ok((None, Height(0))),
Ok(zs::Response::Tip(Some((height, _hash)))) => {
let next_height =
(height + 1).expect("valid heights are far below the maximum");
Ok(next_height)
Ok((Some(height), next_height))
}
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(TransactionDownloadVerifyError::StateError(e)),
Expand Down Expand Up @@ -341,8 +347,8 @@ where
height: next_height,
})
.map_ok(|rsp| {
rsp.into_mempool_transaction()
.expect("unexpected non-mempool response to mempool request")
(rsp.into_mempool_transaction()
.expect("unexpected non-mempool response to mempool request"), tip_height)
})
.await;

Expand All @@ -351,13 +357,13 @@ where

result.map_err(|e| TransactionDownloadVerifyError::Invalid(e.into()))
}
.map_ok(|tx| {
.map_ok(|(tx, tip_height)| {
metrics::counter!(
"mempool.verified.transactions.total",
1,
"version" => format!("{}", tx.transaction.transaction.version()),
);
tx
(tx, tip_height)
})
// Tack the hash onto the error so we can remove the cancel handle
// on failure as well as on success.
Expand Down
189 changes: 188 additions & 1 deletion zebrad/src/components/mempool/tests/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use tokio::time::{self, timeout};
use tower::{ServiceBuilder, ServiceExt};

use zebra_chain::{
block::Block, fmt::humantime_seconds, parameters::Network, serialization::ZcashDeserializeInto,
amount::Amount, block::Block, fmt::humantime_seconds, parameters::Network,
serialization::ZcashDeserializeInto, transaction::VerifiedUnminedTx,
};
use zebra_consensus::transaction as tx;
use zebra_state::{Config as StateConfig, CHAIN_TIP_UPDATE_WAIT_LIMIT};
Expand Down Expand Up @@ -794,6 +795,192 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> {
Ok(())
}

/// Check that transactions are re-verified if the tip changes
/// during verification.
#[tokio::test(flavor = "multi_thread")]
async fn mempool_reverifies_after_tip_change() -> Result<(), Report> {
let network = Network::Mainnet;

let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
.zcash_deserialize_into()
.unwrap();
let block1: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_1_BYTES
.zcash_deserialize_into()
.unwrap();
let block2: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES
.zcash_deserialize_into()
.unwrap();
let block3: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_3_BYTES
.zcash_deserialize_into()
.unwrap();

let (
mut mempool,
mut peer_set,
mut state_service,
mut chain_tip_change,
mut tx_verifier,
mut recent_syncs,
) = setup(network, u64::MAX).await;

// Enable the mempool
mempool.enable(&mut recent_syncs).await;
assert!(mempool.is_enabled());

// Push the genesis block to the state
state_service
.ready()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
genesis_block.clone().into(),
))
.await
.unwrap();

// Wait for the chain tip update without a timeout
// (skipping the chain tip change here may cause the test to
// pass without reverifying transactions for `TipAction::Grow`)
chain_tip_change
.wait_for_tip_change()
.await
.expect("unexpected chain tip update failure");

// Queue transaction from block 3 for download
let tx = block3.transactions[0].clone();
let txid = block3.transactions[0].unmined_id();
let response = mempool
.ready()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(mempool.tx_downloads().in_flight(), 1);

// Verify the transaction

peer_set
.expect_request_that(|req| matches!(req, zn::Request::TransactionsById(_)))
.map(|responder| {
responder.respond(zn::Response::Transactions(vec![
zn::InventoryResponse::Available(tx.clone().into()),
]));
})
.await;

tx_verifier
.expect_request_that(|_| true)
.map(|responder| {
let transaction = responder
.request()
.clone()
.mempool_transaction()
.expect("unexpected non-mempool request");

// Set a dummy fee and sigops.
responder.respond(transaction::Response::from(VerifiedUnminedTx::new(
transaction,
Amount::zero(),
0,
)));
})
.await;

// Push block 1 to the state. This is considered a network upgrade,
// and must cancel all pending transaction downloads with a `TipAction::Reset`.
state_service
.ready()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
block1.clone().into(),
))
.await
.unwrap();

// Wait for the chain tip update without a timeout
// (skipping the chain tip change here will fail the test)
chain_tip_change
.wait_for_tip_change()
.await
.expect("unexpected chain tip update failure");

// Query the mempool to make it poll chain_tip_change and try reverifying its state for the `TipAction::Reset`
mempool.dummy_call().await;

// Check that there is still an in-flight tx_download and that
// no transactions were inserted in the mempool.
assert_eq!(mempool.tx_downloads().in_flight(), 1);
assert_eq!(mempool.storage().transaction_count(), 0);

// Verify the transaction again

peer_set
.expect_request_that(|req| matches!(req, zn::Request::TransactionsById(_)))
.map(|responder| {
responder.respond(zn::Response::Transactions(vec![
zn::InventoryResponse::Available(tx.into()),
]));
})
.await;

// Verify the transaction now that the mempool has already checked chain_tip_change
tx_verifier
.expect_request_that(|_| true)
.map(|responder| {
let transaction = responder
.request()
.clone()
.mempool_transaction()
.expect("unexpected non-mempool request");

// Set a dummy fee and sigops.
responder.respond(transaction::Response::from(VerifiedUnminedTx::new(
transaction,
Amount::zero(),
0,
)));
})
.await;

// Push block 2 to the state. This will increase the tip height past the expected
// tip height that the the tx was verified at.
state_service
.ready()
.await
.unwrap()
.call(zebra_state::Request::CommitFinalizedBlock(
block2.clone().into(),
))
.await
.unwrap();

// Wait for the chain tip update without a timeout
// (skipping the chain tip change here will fail the test)
chain_tip_change
.wait_for_tip_change()
.await
.expect("unexpected chain tip update failure");

// Query the mempool to make it poll tx_downloads.pending and try reverifying transactions
// because the tip height has changed.
mempool.dummy_call().await;

// Check that there is still an in-flight tx_download and that
// no transactions were inserted in the mempool.
assert_eq!(mempool.tx_downloads().in_flight(), 1);
assert_eq!(mempool.storage().transaction_count(), 0);

Ok(())
}

/// Create a new [`Mempool`] instance using mocked services.
async fn setup(
network: Network,
Expand Down

0 comments on commit f253213

Please sign in to comment.