Skip to content

Commit

Permalink
Avoid broadcasting mempool rejected or expired transactions to peers (#…
Browse files Browse the repository at this point in the history
…2858)

* do not advertise rejected transactions

* do not broadcast transaction that are expired

* change dummy var name

* simplify code, performance

* clippy

* add some test coverage

* clippy

Co-authored-by: teor <[email protected]>
  • Loading branch information
oxarbitrage and teor2345 authored Oct 13, 2021
1 parent e36ec34 commit 8120e8a
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 16 deletions.
47 changes: 32 additions & 15 deletions zebrad/src/components/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,32 +245,36 @@ impl Service<Request> for Mempool {
}
}

// Collect inserted transaction ids.
let mut send_to_peers_ids = HashSet::<_>::new();

// Clean up completed download tasks and add to mempool if successful.
// Also, send succesful transactions to peers.
let mut inserted_txids = HashSet::new();
while let Poll::Ready(Some(r)) = tx_downloads.as_mut().poll_next(cx) {
match r {
Ok(tx) => {
// Storage handles conflicting transactions or a full mempool internally,
// so just ignore the storage result here
let _ = storage.insert(tx.clone());
// Save transaction ids that we will send to peers
inserted_txids.insert(tx.id);
if let Ok(inserted_id) = storage.insert(tx.clone()) {
// Save transaction ids that we will send to peers
send_to_peers_ids.insert(inserted_id);
}
}
Err((txid, e)) => {
reject_if_needed(storage, txid, e);
// TODO: should we also log the result?
}
};
}
// Send any newly inserted transactions to peers
if !inserted_txids.is_empty() {
let _ = self.transaction_sender.send(inserted_txids)?;
}

// Remove expired transactions from the mempool.
if let Some(tip_height) = self.latest_chain_tip.best_tip_height() {
remove_expired_transactions(storage, tip_height);
let expired_transactions = remove_expired_transactions(storage, tip_height);
// Remove transactions that are expired from the peers list
send_to_peers_ids =
remove_expired_from_peer_list(&send_to_peers_ids, &expired_transactions);
}

// Send transactions that were not rejected nor expired to peers
if !send_to_peers_ids.is_empty() {
let _ = self.transaction_sender.send(send_to_peers_ids)?;
}
}
ActiveState::Disabled => {
Expand Down Expand Up @@ -347,7 +351,7 @@ impl Service<Request> for Mempool {
fn remove_expired_transactions(
storage: &mut storage::Storage,
tip_height: zebra_chain::block::Height,
) {
) -> HashSet<UnminedTxId> {
let mut txid_set = HashSet::new();
// we need a separate set, since reject() takes the original unmined ID,
// then extracts the mined ID out of it
Expand All @@ -366,9 +370,22 @@ fn remove_expired_transactions(
storage.remove_same_effects(&txid_set);

// also reject it
for id in unmined_id_set {
storage.reject(id, SameEffectsChainRejectionError::Expired.into());
for id in unmined_id_set.iter() {
storage.reject(*id, SameEffectsChainRejectionError::Expired.into());
}

unmined_id_set
}

/// Remove expired transaction ids from a given list of inserted ones.
fn remove_expired_from_peer_list(
send_to_peers_ids: &HashSet<UnminedTxId>,
expired_transactions: &HashSet<UnminedTxId>,
) -> HashSet<UnminedTxId> {
send_to_peers_ids
.difference(expired_transactions)
.copied()
.collect()
}

/// Add a transaction that failed download and verification to the rejected list
Expand Down
64 changes: 63 additions & 1 deletion zebrad/src/components/mempool/storage/tests/vectors.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
use std::iter;

use crate::components::mempool::{remove_expired_from_peer_list, remove_expired_transactions};

use super::{super::*, unmined_transactions_in_blocks};

use zebra_chain::parameters::Network;
use zebra_chain::{
block::{Block, Height},
parameters::Network,
serialization::ZcashDeserializeInto,
transaction::UnminedTxId,
};

use color_eyre::eyre::Result;

Expand Down Expand Up @@ -129,3 +136,58 @@ fn mempool_storage_basic_for_network(network: Network) -> Result<()> {

Ok(())
}

#[test]
fn mempool_expired_basic() -> Result<()> {
zebra_test::init();

mempool_expired_basic_for_network(Network::Mainnet)?;
mempool_expired_basic_for_network(Network::Testnet)?;

Ok(())
}

fn mempool_expired_basic_for_network(network: Network) -> Result<()> {
// Create an empty storage
let mut storage: Storage = Default::default();

let block: Block = match network {
Network::Mainnet => {
zebra_test::vectors::BLOCK_MAINNET_982681_BYTES.zcash_deserialize_into()?
}
Network::Testnet => {
zebra_test::vectors::BLOCK_TESTNET_925483_BYTES.zcash_deserialize_into()?
}
};

// Get a test transaction
let tx = &*(block.transactions[1]).clone();

// Change the expiration height of the test transaction
let mut tx = tx.clone();
*tx.expiry_height_mut() = zebra_chain::block::Height(1);

let tx_id = tx.unmined_id();

// Insert the transaction into the mempool
storage.insert(UnminedTx::from(tx))?;

assert_eq!(storage.transaction_count(), 1);

// Get everything available in mempool now
let everything_in_mempool: HashSet<UnminedTxId> = storage.tx_ids().collect();
assert_eq!(everything_in_mempool.len(), 1);
assert!(everything_in_mempool.contains(&tx_id));

// remove_expired_transactions() will return what was removed
let expired = remove_expired_transactions(&mut storage, Height(1));
assert!(expired.contains(&tx_id));
let everything_in_mempool: HashSet<UnminedTxId> = storage.tx_ids().collect();
assert_eq!(everything_in_mempool.len(), 0);

// No transaction will be sent to peers
let send_to_peers = remove_expired_from_peer_list(&everything_in_mempool, &expired);
assert_eq!(send_to_peers.len(), 0);

Ok(())
}

0 comments on commit 8120e8a

Please sign in to comment.