Skip to content

Commit

Permalink
refactor(tx-pool): remove from nested collection
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangsoledad committed Jan 4, 2022
1 parent 3557180 commit f382e8a
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 58 deletions.
41 changes: 20 additions & 21 deletions tx-pool/src/component/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use ckb_types::{
prelude::*,
};
use ckb_util::{LinkedHashMap, LinkedHashMapEntries};
use std::collections::{HashMap, HashSet};
use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque};

type ConflictEntry = (TxEntry, Reject);

Expand Down Expand Up @@ -190,8 +190,6 @@ impl PendingQueue {
}

pub(crate) fn get_descendants(&self, entry: &TxEntry) -> Vec<ProposalShortId> {
use std::collections::VecDeque;

let mut entries: VecDeque<&TxEntry> = VecDeque::new();
entries.push_back(entry);

Expand Down Expand Up @@ -219,28 +217,29 @@ impl PendingQueue {
let outputs = entry.transaction().output_pts();

for i in inputs {
let mut empty = false;

if let Some(ids) = self.inputs.get_mut(&i) {
ids.remove(&tx_short_id);
empty = ids.is_empty();
}

if empty {
self.inputs.remove(&i);
if let Entry::Occupied(mut occupied) = self.inputs.entry(i) {
let empty = {
let ids = occupied.get_mut();
ids.remove(&tx_short_id);
ids.is_empty()
};
if empty {
occupied.remove();
}
}
}

// remove dep
for d in entry.related_dep_out_points() {
let mut empty = false;
if let Some(x) = self.deps.get_mut(d) {
x.remove(&tx_short_id);
empty = x.is_empty();
}

if empty {
self.deps.remove(d);
for d in entry.related_dep_out_points().cloned() {
if let Entry::Occupied(mut occupied) = self.deps.entry(d) {
let empty = {
let ids = occupied.get_mut();
ids.remove(&tx_short_id);
ids.is_empty()
};
if empty {
occupied.remove();
}
}
}

Expand Down
26 changes: 13 additions & 13 deletions tx-pool/src/component/proposed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use ckb_types::{
packed::{Byte32, CellOutput, OutPoint, ProposalShortId},
prelude::*,
};
use std::collections::{HashMap, HashSet};
use std::collections::{hash_map::Entry, HashMap, HashSet};
use std::iter;

type ConflictEntry = (TxEntry, Reject);
Expand Down Expand Up @@ -78,16 +78,16 @@ impl Edges {
self.deps.entry(out_point).or_default().insert(txid);
}

pub(crate) fn delete_txid_by_dep(&mut self, out_point: &OutPoint, txid: &ProposalShortId) {
let mut empty = false;

if let Some(x) = self.deps.get_mut(out_point) {
x.remove(txid);
empty = x.is_empty();
}

if empty {
self.deps.remove(out_point);
pub(crate) fn delete_txid_by_dep(&mut self, out_point: OutPoint, txid: &ProposalShortId) {
if let Entry::Occupied(mut occupied) = self.deps.entry(out_point) {
let empty = {
let ids = occupied.get_mut();
ids.remove(txid);
ids.is_empty()
};
if empty {
occupied.remove();
}
}
}

Expand Down Expand Up @@ -194,7 +194,7 @@ impl ProposedPool {
}
}

for d in entry.related_dep_out_points() {
for d in entry.related_dep_out_points().cloned() {
self.edges.delete_txid_by_dep(d, id);
}

Expand Down Expand Up @@ -228,7 +228,7 @@ impl ProposedPool {
self.edges.remove_input(&i);
}

for d in entry.related_dep_out_points() {
for d in entry.related_dep_out_points().cloned() {
self.edges.delete_txid_by_dep(d, &id);
}

Expand Down
8 changes: 4 additions & 4 deletions tx-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ impl TxPool {

pub(crate) fn gap_rtx(
&mut self,
cache_entry: Option<CacheEntry>,
cache_entry: CacheEntry,
size: usize,
rtx: ResolvedTransaction,
) -> Result<CacheEntry, Reject> {
Expand All @@ -471,7 +471,7 @@ impl TxPool {
self.check_rtx_from_pending_and_proposed(&rtx, resolve_opts)?;

let max_cycles = snapshot.consensus().max_block_cycles();
let verified = verify_rtx(snapshot, &rtx, &tx_env, &cache_entry, max_cycles)?;
let verified = verify_rtx(snapshot, &rtx, &tx_env, &Some(cache_entry), max_cycles)?;

let entry = TxEntry::new(rtx, verified.cycles, verified.fee, size);
let tx_hash = entry.transaction().hash();
Expand All @@ -484,7 +484,7 @@ impl TxPool {

pub(crate) fn proposed_rtx(
&mut self,
cache_entry: Option<CacheEntry>,
cache_entry: CacheEntry,
size: usize,
rtx: ResolvedTransaction,
) -> Result<CacheEntry, Reject> {
Expand All @@ -501,7 +501,7 @@ impl TxPool {
self.check_rtx_from_proposed(&rtx, resolve_opts)?;

let max_cycles = snapshot.consensus().max_block_cycles();
let verified = verify_rtx(snapshot, &rtx, &tx_env, &cache_entry, max_cycles)?;
let verified = verify_rtx(snapshot, &rtx, &tx_env, &Some(cache_entry), max_cycles)?;

let entry = TxEntry::new(rtx, verified.cycles, verified.fee, size);
let tx_hash = entry.transaction().hash();
Expand Down
33 changes: 13 additions & 20 deletions tx-pool/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1383,27 +1383,24 @@ fn _update_tx_pool_for_reorg(
) {
tx_pool.snapshot = Arc::clone(&snapshot);

// NOTE: `remove_expired` will try to re-put the given expired/detached proposals into
// NOTE: `remove_by_detached_proposal` will try to re-put the given expired/detached proposals into
// pending-pool if they can be found within txpool. As for a transaction
// which is both expired and committed at the one time(commit at its end of commit-window),
// we should treat it as a committed and not re-put into pending-pool. So we should ensure
// that involves `remove_committed_txs` before `remove_expired`.
tx_pool.remove_committed_txs(attached.iter(), callbacks, detached_headers);
tx_pool.remove_by_detached_proposal(detached_proposal_id.iter());

let mut entries = Vec::new();
let mut gaps = Vec::new();

// mine mode:
// pending ---> gap ----> proposed
// try move gap to proposed
if mine_mode {
let mut entries = Vec::new();
let mut gaps = Vec::new();

tx_pool.gap.remove_entries_by_filter(|id, tx_entry| {
if snapshot.proposals().contains_proposed(id) {
entries.push((
Some(CacheEntry::completed(tx_entry.cycles, tx_entry.fee)),
tx_entry.clone(),
));
entries.push(tx_entry.clone());
true
} else {
false
Expand All @@ -1412,36 +1409,32 @@ fn _update_tx_pool_for_reorg(

tx_pool.pending.remove_entries_by_filter(|id, tx_entry| {
if snapshot.proposals().contains_proposed(id) {
entries.push((
Some(CacheEntry::completed(tx_entry.cycles, tx_entry.fee)),
tx_entry.clone(),
));
entries.push(tx_entry.clone());
true
} else if snapshot.proposals().contains_gap(id) {
gaps.push((
Some(CacheEntry::completed(tx_entry.cycles, tx_entry.fee)),
tx_entry.clone(),
));
gaps.push(tx_entry.clone());
true
} else {
false
}
});

for (cycles, entry) in entries {
for entry in entries {
let cached = CacheEntry::completed(entry.cycles, entry.fee);
let tx_hash = entry.transaction().hash();
if let Err(e) = tx_pool.proposed_rtx(cycles, entry.size, entry.rtx.clone()) {
if let Err(e) = tx_pool.proposed_rtx(cached, entry.size, entry.rtx.clone()) {
debug!("Failed to add proposed tx {}, reason: {}", tx_hash, e);
callbacks.call_reject(tx_pool, &entry, e.clone());
} else {
callbacks.call_proposed(tx_pool, &entry, false);
}
}

for (cycles, entry) in gaps {
for entry in gaps {
debug!("tx proposed, add to gap {}", entry.transaction().hash());
let tx_hash = entry.transaction().hash();
if let Err(e) = tx_pool.gap_rtx(cycles, entry.size, entry.rtx.clone()) {
let cached = CacheEntry::completed(entry.cycles, entry.fee);
if let Err(e) = tx_pool.gap_rtx(cached, entry.size, entry.rtx.clone()) {
debug!("Failed to add tx to gap {}, reason: {}", tx_hash, e);
callbacks.call_reject(tx_pool, &entry, e.clone());
}
Expand Down

0 comments on commit f382e8a

Please sign in to comment.