diff --git a/shared/src/shared_builder.rs b/shared/src/shared_builder.rs index 985add3ba0..9a92d77cf6 100644 --- a/shared/src/shared_builder.rs +++ b/shared/src/shared_builder.rs @@ -416,10 +416,7 @@ fn register_tx_pool_callback(tx_pool_builder: &mut TxPoolServiceBuilder, notify: move |tx_pool: &mut TxPool, entry: &TxEntry, reject: Reject| { let tx_hash = entry.transaction().hash(); // record recent reject - if matches!( - reject, - Reject::Resolve(..) | Reject::RBFRejected(..) | Reject::Invalidated(..) - ) { + if reject.should_recorded() { if let Some(ref mut recent_reject) = tx_pool.recent_reject { if let Err(e) = recent_reject.put(&tx_hash, reject.clone()) { error!("record recent_reject failed {} {} {}", tx_hash, reject, e); diff --git a/sync/src/relayer/mod.rs b/sync/src/relayer/mod.rs index bfcdc28dfa..abfa92bac9 100644 --- a/sync/src/relayer/mod.rs +++ b/sync/src/relayer/mod.rs @@ -752,6 +752,17 @@ impl Relayer { TxVerificationResult::Reject { tx_hash } => { self.shared.state().remove_from_known_txs(&tx_hash); } + TxVerificationResult::UnknownParents { peer, parents } => { + let tx_hashes: Vec<_> = { + let mut tx_filter = self.shared.state().tx_filter(); + tx_filter.remove_expired(); + parents + .into_iter() + .filter(|tx_hash| !tx_filter.contains(tx_hash)) + .collect() + }; + self.shared.state().add_ask_for_txs(peer, tx_hashes); + } } } } diff --git a/test/src/main.rs b/test/src/main.rs index cef9c5a673..e877ffb695 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -431,6 +431,7 @@ fn all_specs() -> Vec> { Box::new(TxPoolOrphanNormal), Box::new(TxPoolOrphanReverse), Box::new(TxPoolOrphanUnordered), + Box::new(TxPoolOrphanPartialInputUnknown), Box::new(TxPoolOrphanDoubleSpend), Box::new(OrphanTxRejected), Box::new(GetRawTxPool), diff --git a/test/src/specs/tx_pool/orphan_tx.rs b/test/src/specs/tx_pool/orphan_tx.rs index 0d94ea4609..7e056380d5 100644 --- a/test/src/specs/tx_pool/orphan_tx.rs +++ b/test/src/specs/tx_pool/orphan_tx.rs @@ -1,10 +1,14 @@ -use crate::util::transaction::relay_tx; +use crate::util::transaction::{relay_tx, send_tx}; use crate::utils::wait_until; use crate::{Net, Node, Spec}; use ckb_jsonrpc_types::Status; use ckb_network::SupportProtocols; -use ckb_types::core::{capacity_bytes, Capacity, TransactionBuilder, TransactionView}; use ckb_types::packed::CellOutputBuilder; +use ckb_types::{ + bytes::Bytes, + core::{capacity_bytes, Capacity, TransactionBuilder, TransactionView}, + packed, +}; use ckb_types::{ packed::{CellInput, OutPoint}, prelude::*, @@ -193,6 +197,30 @@ fn run_replay_tx( }) } +fn run_send_tx( + net: &Net, + node0: &Node, + tx: TransactionView, + orphan_tx_cnt: u64, + pending_cnt: u64, +) -> bool { + send_tx(net, node0, tx, ALWAYS_SUCCESS_SCRIPT_CYCLE); + + wait_until(5, || { + let tx_pool_info = node0.get_tip_tx_pool_info(); + tx_pool_info.orphan.value() == orphan_tx_cnt && tx_pool_info.pending.value() == pending_cnt + }) +} + +fn should_receive_get_relay_transactions(net: &Net, node0: &Node, assert_message: &str) { + let ret = net.should_receive(node0, |data: &Bytes| { + packed::RelayMessage::from_slice(data) + .map(|message| message.to_enum().item_name() == packed::GetRelayTransactions::NAME) + .unwrap_or(false) + }); + assert!(ret, "{}", assert_message); +} + pub struct TxPoolOrphanNormal; impl Spec for TxPoolOrphanNormal { fn run(&self, nodes: &mut Vec) { @@ -236,23 +264,20 @@ impl Spec for TxPoolOrphanReverse { run_replay_tx(&net, node0, final_tx, 1, 0), "expect final_tx is in orphan pool" ); + should_receive_get_relay_transactions(&net, node0, "node should ask for tx11 tx12 tx13"); + + assert!(run_send_tx(&net, node0, tx13, 2, 0), "tx13 in orphan pool"); + should_receive_get_relay_transactions(&net, node0, "node should ask for tx1"); assert!( - run_replay_tx(&net, node0, tx13, 2, 0), - "tx13 in orphan pool" - ); - assert!( - run_replay_tx(&net, node0, tx12, 3, 0), + run_send_tx(&net, node0, tx12, 3, 0), "tx12 is in orphan pool" ); - assert!(run_replay_tx(&net, node0, tx11, 4, 0), "tx11 is in orphan"); - - assert!(run_replay_tx(&net, node0, tx1, 5, 0), "tx1 is in orphan"); + assert!(run_send_tx(&net, node0, tx11, 4, 0), "tx11 is in orphan"); + assert!(run_send_tx(&net, node0, tx1, 5, 0), "tx1 is in orphan"); - assert!( - run_replay_tx(&net, node0, parent, 0, 6), - "all is in pending" - ); + should_receive_get_relay_transactions(&net, node0, "node should ask for parent"); + assert!(run_send_tx(&net, node0, parent, 0, 6), "all is in pending"); } } @@ -267,13 +292,14 @@ impl Spec for TxPoolOrphanUnordered { "expect final_tx is in orphan pool" ); - assert!( - run_replay_tx(&net, node0, tx11, 2, 0), - "tx11 in orphan pool" - ); + should_receive_get_relay_transactions(&net, node0, "node should ask for tx11 tx12 tx13"); + + assert!(run_send_tx(&net, node0, tx11, 2, 0), "tx11 in orphan pool"); + should_receive_get_relay_transactions(&net, node0, "node should ask for tx1"); + let tx12_clone = tx12.clone(); assert!( - run_replay_tx(&net, node0, tx12, 3, 0), + run_send_tx(&net, node0, tx12, 3, 0), "tx12 is in orphan pool" ); @@ -292,12 +318,47 @@ impl Spec for TxPoolOrphanUnordered { "parent is sent, should be in pending without change orphan pool" ); assert!( - run_replay_tx(&net, node0, tx1, 1, 4), + run_send_tx(&net, node0, tx1, 1, 4), "tx1 is sent, orphan pool only contains final_tx" ); assert!( - run_replay_tx(&net, node0, tx13, 0, 6), + run_send_tx(&net, node0, tx13, 0, 6), + "tx13 is sent, orphan pool is empty" + ); + } +} + +pub struct TxPoolOrphanPartialInputUnknown; +impl Spec for TxPoolOrphanPartialInputUnknown { + fn run(&self, nodes: &mut Vec) { + let node0 = &nodes[0]; + let (net, (parent, tx1, tx11, tx12, tx13, final_tx)) = build_tx_chain(node0); + + assert!( + run_replay_tx(&net, node0, parent, 0, 1), + "parent sended expect nothing in orphan pool" + ); + assert!( + run_replay_tx(&net, node0, tx1, 0, 2), + "tx1 is sent expect nothing in orphan pool" + ); + assert!( + run_replay_tx(&net, node0, tx11, 0, 3), + "tx11 is sent expect nothing in orphan pool" + ); + assert!( + run_replay_tx(&net, node0, tx12, 0, 4), + "tx12 is sent expect nothing in orphan pool" + ); + assert!( + run_replay_tx(&net, node0, final_tx, 1, 4), + "expect final_tx is in orphan pool" + ); + + should_receive_get_relay_transactions(&net, node0, "node should ask for tx13"); + assert!( + run_send_tx(&net, node0, tx13, 0, 6), "tx13 is sent, orphan pool is empty" ); } diff --git a/test/src/util/transaction.rs b/test/src/util/transaction.rs index d699e61da0..35e640cd64 100644 --- a/test/src/util/transaction.rs +++ b/test/src/util/transaction.rs @@ -47,6 +47,26 @@ pub fn always_success_transactions_with_rand_data( .build() } +pub fn send_tx(net: &Net, node: &Node, tx: TransactionView, cycles: u64) { + let relay_tx = packed::RelayTransaction::new_builder() + .cycles(cycles.pack()) + .transaction(tx.data()) + .build(); + + let tx_msg = packed::RelayMessage::new_builder() + .set( + packed::RelayTransactions::new_builder() + .transactions( + packed::RelayTransactionVec::new_builder() + .set(vec![relay_tx]) + .build(), + ) + .build(), + ) + .build(); + net.send(node, SupportProtocols::RelayV3, tx_msg.as_bytes()); +} + pub fn relay_tx(net: &Net, node: &Node, tx: TransactionView, cycles: u64) { let tx_hashes_msg = packed::RelayMessage::new_builder() .set( diff --git a/tx-pool/src/component/orphan.rs b/tx-pool/src/component/orphan.rs index 7686c0c1f1..d73132bc81 100644 --- a/tx-pool/src/component/orphan.rs +++ b/tx-pool/src/component/orphan.rs @@ -151,6 +151,7 @@ impl OrphanPool { .insert(tx.proposal_short_id()); } + // DoS prevention: do not allow OrphanPool to grow unbounded self.limit_size() } diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index a8e39f7239..5a5961530c 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -18,7 +18,6 @@ use ckb_logger::{debug, error, info, log_enabled_target, trace_target}; use ckb_network::PeerIndex; use ckb_snapshot::Snapshot; use ckb_store::data_loader_wrapper::AsDataLoader; -use ckb_store::ChainStore; use ckb_types::core::error::OutPointError; use ckb_types::{ core::{cell::ResolvedTransaction, BlockView, Capacity, Cycle, HeaderView, TransactionView}, @@ -459,7 +458,10 @@ impl TxPoolService { match remote { Some((declared_cycle, peer)) => match ret { Ok(_) => { - debug!("after_process remote send_result_to_relayer {}", tx_hash); + debug!( + "after_process remote send_result_to_relayer {} {}", + tx_hash, peer + ); self.send_result_to_relayer(TxVerificationResult::Ok { original_peer: Some(peer), with_vm_2023, @@ -468,8 +470,15 @@ impl TxPoolService { self.process_orphan_tx(&tx).await; } Err(reject) => { - debug!("after_process {} remote reject: {} ", tx_hash, reject); - if is_missing_input(reject) && all_inputs_is_unknown(snapshot, &tx) { + debug!( + "after_process {} {} remote reject: {} ", + tx_hash, peer, reject + ); + if is_missing_input(reject) { + self.send_result_to_relayer(TxVerificationResult::UnknownParents { + peer, + parents: tx.unique_parents(), + }); self.add_orphan(tx, peer, declared_cycle).await; } else { if reject.is_malformed_tx() { @@ -480,13 +489,7 @@ impl TxPoolService { tx_hash: tx_hash.clone(), }); } - - if matches!( - reject, - Reject::Resolve(..) - | Reject::Verification(..) - | Reject::RBFRejected(..) - ) { + if reject.should_recorded() { self.put_recent_reject(&tx_hash, reject).await; } } @@ -514,12 +517,7 @@ impl TxPoolService { } Err(reject) => { debug!("after_process {} reject: {} ", tx_hash, reject); - if matches!( - reject, - Reject::Resolve(..) - | Reject::Verification(..) - | Reject::RBFRejected(..) - ) { + if reject.should_recorded() { self.put_recent_reject(&tx_hash, reject).await; } } @@ -628,12 +626,7 @@ impl TxPoolService { tx_hash: orphan.tx.hash(), }); } - if matches!( - reject, - Reject::Resolve(..) - | Reject::Verification(..) - | Reject::RBFRejected(..) - ) { + if reject.should_recorded() { self.put_recent_reject(&orphan.tx.hash(), &reject).await; } } @@ -1259,8 +1252,3 @@ fn _update_tx_pool_for_reorg( // Remove transactions from the pool until its size <= size_limit. let _ = tx_pool.limit_size(callbacks, None); } - -pub fn all_inputs_is_unknown(snapshot: &Snapshot, tx: &TransactionView) -> bool { - !tx.input_pts_iter() - .any(|pt| snapshot.transaction_exists(&pt.tx_hash())) -} diff --git a/tx-pool/src/service.rs b/tx-pool/src/service.rs index e684508127..7d46926b1c 100644 --- a/tx-pool/src/service.rs +++ b/tx-pool/src/service.rs @@ -295,7 +295,8 @@ impl TxPoolController { send_message!(self, GetTransactionWithStatus, hash) } - /// Return txs for network + /// Mainly used for compact block reconstruction and block proposal pre-broadcasting + /// Orphan/conflicted/etc transactions that are returned for compact block reconstruction. pub fn fetch_txs( &self, short_ids: HashSet, @@ -304,6 +305,7 @@ impl TxPoolController { } /// Return txs with cycles + /// Mainly for relay transactions pub fn fetch_txs_with_cycles( &self, short_ids: HashSet, @@ -677,6 +679,13 @@ pub enum TxVerificationResult { /// transaction hash tx_hash: Byte32, }, + /// tx parent is unknown + UnknownParents { + /// original peer + peer: PeerIndex, + /// parents hashes + parents: HashSet, + }, /// tx is rejected Reject { /// transaction hash @@ -852,11 +861,13 @@ async fn process(mut service: TxPoolService, message: Message) { arguments: short_ids, }) => { let tx_pool = service.tx_pool.read().await; + let orphan = service.orphan.read().await; let txs = short_ids .into_iter() .filter_map(|short_id| { tx_pool .get_tx_from_pool_or_store(&short_id) + .or_else(|| orphan.get(&short_id).map(|entry| &entry.tx).cloned()) .map(|tx| (short_id, tx)) }) .collect(); diff --git a/util/types/src/core/tx_pool.rs b/util/types/src/core/tx_pool.rs index 1ca23a061f..01723d18bf 100644 --- a/util/types/src/core/tx_pool.rs +++ b/util/types/src/core/tx_pool.rs @@ -95,6 +95,11 @@ impl Reject { } } + /// Returns true if the reject should be recorded. + pub fn should_recorded(&self) -> bool { + !matches!(self, Reject::Duplicated(..)) + } + /// Returns true if tx can be resubmitted, allowing relay /// * Declared wrong cycles should allow relay with the correct cycles /// * Reject but is not malformed and the fee rate reached the threshold, diff --git a/util/types/src/core/views.rs b/util/types/src/core/views.rs index 6e7a473087..326751b3eb 100644 --- a/util/types/src/core/views.rs +++ b/util/types/src/core/views.rs @@ -422,6 +422,13 @@ impl TransactionView { pub fn proposal_short_id(&self) -> packed::ProposalShortId { packed::ProposalShortId::from_tx_hash(&self.hash()) } + + /// return deduplicate parent tx_hashes + pub fn unique_parents(&self) -> HashSet { + self.input_pts_iter() + .map(|outpoint| outpoint.tx_hash()) + .collect() + } } impl ExtraHashView {