Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify the record scope of tx-pool reject record and fix rule for orphan tx. #4511

Merged
merged 4 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions shared/src/shared_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,10 +416,7 @@ fn register_tx_pool_callback(tx_pool_builder: &mut TxPoolServiceBuilder, notify:
move |tx_pool: &mut TxPool, entry: &TxEntry, reject: Reject| {
let tx_hash = entry.transaction().hash();
// record recent reject
if matches!(
reject,
Reject::Resolve(..) | Reject::RBFRejected(..) | Reject::Invalidated(..)
) {
if reject.should_recorded() {
if let Some(ref mut recent_reject) = tx_pool.recent_reject {
if let Err(e) = recent_reject.put(&tx_hash, reject.clone()) {
error!("record recent_reject failed {} {} {}", tx_hash, reject, e);
Expand Down
11 changes: 11 additions & 0 deletions sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,17 @@ impl Relayer {
TxVerificationResult::Reject { tx_hash } => {
self.shared.state().remove_from_known_txs(&tx_hash);
}
TxVerificationResult::UnknownParents { peer, parents } => {
let tx_hashes: Vec<_> = {
let mut tx_filter = self.shared.state().tx_filter();
tx_filter.remove_expired();
parents
.into_iter()
.filter(|tx_hash| !tx_filter.contains(tx_hash))
.collect()
};
self.shared.state().add_ask_for_txs(peer, tx_hashes);
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(TxPoolOrphanNormal),
Box::new(TxPoolOrphanReverse),
Box::new(TxPoolOrphanUnordered),
Box::new(TxPoolOrphanPartialInputUnknown),
Box::new(TxPoolOrphanDoubleSpend),
Box::new(OrphanTxRejected),
Box::new(GetRawTxPool),
Expand Down
103 changes: 82 additions & 21 deletions test/src/specs/tx_pool/orphan_tx.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use crate::util::transaction::relay_tx;
use crate::util::transaction::{relay_tx, send_tx};
use crate::utils::wait_until;
use crate::{Net, Node, Spec};
use ckb_jsonrpc_types::Status;
use ckb_network::SupportProtocols;
use ckb_types::core::{capacity_bytes, Capacity, TransactionBuilder, TransactionView};
use ckb_types::packed::CellOutputBuilder;
use ckb_types::{
bytes::Bytes,
core::{capacity_bytes, Capacity, TransactionBuilder, TransactionView},
packed,
};
use ckb_types::{
packed::{CellInput, OutPoint},
prelude::*,
Expand Down Expand Up @@ -193,6 +197,30 @@ fn run_replay_tx(
})
}

fn run_send_tx(
net: &Net,
node0: &Node,
tx: TransactionView,
orphan_tx_cnt: u64,
pending_cnt: u64,
) -> bool {
send_tx(net, node0, tx, ALWAYS_SUCCESS_SCRIPT_CYCLE);

wait_until(5, || {
let tx_pool_info = node0.get_tip_tx_pool_info();
tx_pool_info.orphan.value() == orphan_tx_cnt && tx_pool_info.pending.value() == pending_cnt
})
}

fn should_receive_get_relay_transactions(net: &Net, node0: &Node, assert_message: &str) {
let ret = net.should_receive(node0, |data: &Bytes| {
packed::RelayMessage::from_slice(data)
.map(|message| message.to_enum().item_name() == packed::GetRelayTransactions::NAME)
.unwrap_or(false)
});
assert!(ret, "{}", assert_message);
}

pub struct TxPoolOrphanNormal;
impl Spec for TxPoolOrphanNormal {
fn run(&self, nodes: &mut Vec<Node>) {
Expand Down Expand Up @@ -236,23 +264,20 @@ impl Spec for TxPoolOrphanReverse {
run_replay_tx(&net, node0, final_tx, 1, 0),
"expect final_tx is in orphan pool"
);
should_receive_get_relay_transactions(&net, node0, "node should ask for tx11 tx12 tx13");

assert!(run_send_tx(&net, node0, tx13, 2, 0), "tx13 in orphan pool");
should_receive_get_relay_transactions(&net, node0, "node should ask for tx1");

assert!(
run_replay_tx(&net, node0, tx13, 2, 0),
"tx13 in orphan pool"
);
assert!(
run_replay_tx(&net, node0, tx12, 3, 0),
run_send_tx(&net, node0, tx12, 3, 0),
"tx12 is in orphan pool"
);
assert!(run_replay_tx(&net, node0, tx11, 4, 0), "tx11 is in orphan");

assert!(run_replay_tx(&net, node0, tx1, 5, 0), "tx1 is in orphan");
assert!(run_send_tx(&net, node0, tx11, 4, 0), "tx11 is in orphan");
assert!(run_send_tx(&net, node0, tx1, 5, 0), "tx1 is in orphan");

assert!(
run_replay_tx(&net, node0, parent, 0, 6),
"all is in pending"
);
should_receive_get_relay_transactions(&net, node0, "node should ask for parent");
assert!(run_send_tx(&net, node0, parent, 0, 6), "all is in pending");
}
}

Expand All @@ -267,13 +292,14 @@ impl Spec for TxPoolOrphanUnordered {
"expect final_tx is in orphan pool"
);

assert!(
run_replay_tx(&net, node0, tx11, 2, 0),
"tx11 in orphan pool"
);
should_receive_get_relay_transactions(&net, node0, "node should ask for tx11 tx12 tx13");

assert!(run_send_tx(&net, node0, tx11, 2, 0), "tx11 in orphan pool");
should_receive_get_relay_transactions(&net, node0, "node should ask for tx1");

let tx12_clone = tx12.clone();
assert!(
run_replay_tx(&net, node0, tx12, 3, 0),
run_send_tx(&net, node0, tx12, 3, 0),
"tx12 is in orphan pool"
);

Expand All @@ -292,12 +318,47 @@ impl Spec for TxPoolOrphanUnordered {
"parent is sent, should be in pending without change orphan pool"
);
assert!(
run_replay_tx(&net, node0, tx1, 1, 4),
run_send_tx(&net, node0, tx1, 1, 4),
"tx1 is sent, orphan pool only contains final_tx"
);

assert!(
run_replay_tx(&net, node0, tx13, 0, 6),
run_send_tx(&net, node0, tx13, 0, 6),
"tx13 is sent, orphan pool is empty"
);
}
}

pub struct TxPoolOrphanPartialInputUnknown;
impl Spec for TxPoolOrphanPartialInputUnknown {
fn run(&self, nodes: &mut Vec<Node>) {
let node0 = &nodes[0];
let (net, (parent, tx1, tx11, tx12, tx13, final_tx)) = build_tx_chain(node0);

assert!(
run_replay_tx(&net, node0, parent, 0, 1),
"parent sended expect nothing in orphan pool"
);
assert!(
run_replay_tx(&net, node0, tx1, 0, 2),
"tx1 is sent expect nothing in orphan pool"
);
assert!(
run_replay_tx(&net, node0, tx11, 0, 3),
"tx11 is sent expect nothing in orphan pool"
);
assert!(
run_replay_tx(&net, node0, tx12, 0, 4),
"tx12 is sent expect nothing in orphan pool"
);
assert!(
run_replay_tx(&net, node0, final_tx, 1, 4),
"expect final_tx is in orphan pool"
);

should_receive_get_relay_transactions(&net, node0, "node should ask for tx13");
assert!(
run_send_tx(&net, node0, tx13, 0, 6),
"tx13 is sent, orphan pool is empty"
);
}
Expand Down
20 changes: 20 additions & 0 deletions test/src/util/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,26 @@ pub fn always_success_transactions_with_rand_data(
.build()
}

pub fn send_tx(net: &Net, node: &Node, tx: TransactionView, cycles: u64) {
let relay_tx = packed::RelayTransaction::new_builder()
.cycles(cycles.pack())
.transaction(tx.data())
.build();

let tx_msg = packed::RelayMessage::new_builder()
.set(
packed::RelayTransactions::new_builder()
.transactions(
packed::RelayTransactionVec::new_builder()
.set(vec![relay_tx])
.build(),
)
.build(),
)
.build();
net.send(node, SupportProtocols::RelayV3, tx_msg.as_bytes());
}

pub fn relay_tx(net: &Net, node: &Node, tx: TransactionView, cycles: u64) {
let tx_hashes_msg = packed::RelayMessage::new_builder()
.set(
Expand Down
1 change: 1 addition & 0 deletions tx-pool/src/component/orphan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl OrphanPool {
.insert(tx.proposal_short_id());
}

// DoS prevention: do not allow OrphanPool to grow unbounded
self.limit_size()
}

Expand Down
44 changes: 16 additions & 28 deletions tx-pool/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use ckb_logger::{debug, error, info, log_enabled_target, trace_target};
use ckb_network::PeerIndex;
use ckb_snapshot::Snapshot;
use ckb_store::data_loader_wrapper::AsDataLoader;
use ckb_store::ChainStore;
use ckb_types::core::error::OutPointError;
use ckb_types::{
core::{cell::ResolvedTransaction, BlockView, Capacity, Cycle, HeaderView, TransactionView},
Expand Down Expand Up @@ -459,7 +458,10 @@ impl TxPoolService {
match remote {
Some((declared_cycle, peer)) => match ret {
Ok(_) => {
debug!("after_process remote send_result_to_relayer {}", tx_hash);
debug!(
"after_process remote send_result_to_relayer {} {}",
tx_hash, peer
);
self.send_result_to_relayer(TxVerificationResult::Ok {
original_peer: Some(peer),
with_vm_2023,
Expand All @@ -468,8 +470,15 @@ impl TxPoolService {
self.process_orphan_tx(&tx).await;
}
Err(reject) => {
debug!("after_process {} remote reject: {} ", tx_hash, reject);
if is_missing_input(reject) && all_inputs_is_unknown(snapshot, &tx) {
debug!(
"after_process {} {} remote reject: {} ",
tx_hash, peer, reject
);
if is_missing_input(reject) {
self.send_result_to_relayer(TxVerificationResult::UnknownParents {
peer,
parents: tx.unique_parents(),
});
self.add_orphan(tx, peer, declared_cycle).await;
} else {
if reject.is_malformed_tx() {
Expand All @@ -480,13 +489,7 @@ impl TxPoolService {
tx_hash: tx_hash.clone(),
});
}

if matches!(
reject,
Reject::Resolve(..)
| Reject::Verification(..)
| Reject::RBFRejected(..)
) {
if reject.should_recorded() {
self.put_recent_reject(&tx_hash, reject).await;
}
}
Expand Down Expand Up @@ -514,12 +517,7 @@ impl TxPoolService {
}
Err(reject) => {
debug!("after_process {} reject: {} ", tx_hash, reject);
if matches!(
reject,
Reject::Resolve(..)
| Reject::Verification(..)
| Reject::RBFRejected(..)
) {
if reject.should_recorded() {
self.put_recent_reject(&tx_hash, reject).await;
}
}
Expand Down Expand Up @@ -628,12 +626,7 @@ impl TxPoolService {
tx_hash: orphan.tx.hash(),
});
}
if matches!(
reject,
Reject::Resolve(..)
| Reject::Verification(..)
| Reject::RBFRejected(..)
) {
if reject.should_recorded() {
self.put_recent_reject(&orphan.tx.hash(), &reject).await;
}
}
Expand Down Expand Up @@ -1259,8 +1252,3 @@ fn _update_tx_pool_for_reorg(
// Remove transactions from the pool until its size <= size_limit.
let _ = tx_pool.limit_size(callbacks, None);
}

pub fn all_inputs_is_unknown(snapshot: &Snapshot, tx: &TransactionView) -> bool {
!tx.input_pts_iter()
.any(|pt| snapshot.transaction_exists(&pt.tx_hash()))
}
13 changes: 12 additions & 1 deletion tx-pool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ impl TxPoolController {
send_message!(self, GetTransactionWithStatus, hash)
}

/// Return txs for network
/// Mainly used for compact block reconstruction and block proposal pre-broadcasting
/// Orphan/conflicted/etc transactions that are returned for compact block reconstruction.
pub fn fetch_txs(
&self,
short_ids: HashSet<ProposalShortId>,
Expand All @@ -304,6 +305,7 @@ impl TxPoolController {
}

/// Return txs with cycles
/// Mainly for relay transactions
pub fn fetch_txs_with_cycles(
&self,
short_ids: HashSet<ProposalShortId>,
Expand Down Expand Up @@ -677,6 +679,13 @@ pub enum TxVerificationResult {
/// transaction hash
tx_hash: Byte32,
},
/// tx parent is unknown
UnknownParents {
/// original peer
peer: PeerIndex,
/// parents hashes
parents: HashSet<Byte32>,
},
/// tx is rejected
Reject {
/// transaction hash
Expand Down Expand Up @@ -852,11 +861,13 @@ async fn process(mut service: TxPoolService, message: Message) {
arguments: short_ids,
}) => {
let tx_pool = service.tx_pool.read().await;
let orphan = service.orphan.read().await;
let txs = short_ids
.into_iter()
.filter_map(|short_id| {
tx_pool
.get_tx_from_pool_or_store(&short_id)
.or_else(|| orphan.get(&short_id).map(|entry| &entry.tx).cloned())
.map(|tx| (short_id, tx))
})
.collect();
Expand Down
5 changes: 5 additions & 0 deletions util/types/src/core/tx_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ impl Reject {
}
}

/// Returns true if the reject should be recorded.
pub fn should_recorded(&self) -> bool {
!matches!(self, Reject::Duplicated(..))
}

/// Returns true if tx can be resubmitted, allowing relay
/// * Declared wrong cycles should allow relay with the correct cycles
/// * Reject but is not malformed and the fee rate reached the threshold,
Expand Down
7 changes: 7 additions & 0 deletions util/types/src/core/views.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,13 @@ impl TransactionView {
pub fn proposal_short_id(&self) -> packed::ProposalShortId {
packed::ProposalShortId::from_tx_hash(&self.hash())
}

/// return deduplicate parent tx_hashes
pub fn unique_parents(&self) -> HashSet<packed::Byte32> {
self.input_pts_iter()
.map(|outpoint| outpoint.tx_hash())
.collect()
}
}

impl ExtraHashView {
Expand Down
Loading