Skip to content

Commit

Permalink
fix: try traverse all known unknow parent hash
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Aug 7, 2021
1 parent 3537b8e commit 4fc08b0
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 48 deletions.
98 changes: 92 additions & 6 deletions sync/src/orphan_block_pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use ckb_types::{core, packed};
use ckb_util::shrink_to_fit;
use dashmap::DashMap;
use dashmap::{DashMap, DashSet};
use std::collections::{HashMap, VecDeque};

pub type ParentHash = packed::Byte32;
Expand All @@ -10,15 +10,22 @@ const SHRINK_THRESHOLD: usize = 100;
// orphan_block_pool and block_status_map, but `LruCache` would prune old items implicitly.
#[derive(Default)]
pub struct OrphanBlockPool {
// Group by blocks in the pool by the parent hash.
blocks: DashMap<ParentHash, HashMap<packed::Byte32, core::BlockView>>,
// The map tells the parent hash when given the hash of a block in the pool.
//
// The block is in the orphan pool if and only if the block hash exists as a key in this map.
parents: DashMap<packed::Byte32, ParentHash>,
// Leaders are blocks not in the orphan pool but having at least a child in the pool.
leaders: DashSet<ParentHash>,
}

impl OrphanBlockPool {
pub fn with_capacity(capacity: usize) -> Self {
OrphanBlockPool {
blocks: DashMap::with_capacity(capacity),
parents: DashMap::new(),
leaders: DashSet::new(),
}
}

Expand All @@ -30,12 +37,27 @@ impl OrphanBlockPool {
.entry(parent_hash.clone())
.or_insert_with(HashMap::default)
.insert(hash.clone(), block);
// Out-of-order insertion needs to be deduplicated
self.leaders.remove(&hash);
// It is a possible optimization to make the judgment in advance,
// because the parent of the block must not be equal to its own hash,
// so we can judge first, which may reduce one arc clone
if !self.parents.contains_key(&parent_hash) {
// Block referenced by `parent_hash` is not in the pool,
// and it has at least one child, the new inserted block, so add it to leaders.
self.leaders.insert(parent_hash.clone());
}
self.parents.insert(hash, parent_hash);
}

pub fn remove_blocks_by_parent(&self, hash: &packed::Byte32) -> Vec<core::BlockView> {
pub fn remove_blocks_by_parent(&self, parent_hash: &ParentHash) -> Vec<core::BlockView> {
// try remove leaders first
if self.leaders.remove(parent_hash).is_none() {
return Vec::new();
}

let mut queue: VecDeque<packed::Byte32> = VecDeque::new();
queue.push_back(hash.to_owned());
queue.push_back(parent_hash.to_owned());

let mut removed: Vec<core::BlockView> = Vec::new();
while let Some(parent_hash) = queue.pop_front() {
Expand All @@ -51,6 +73,7 @@ impl OrphanBlockPool {

shrink_to_fit!(self.blocks, SHRINK_THRESHOLD);
shrink_to_fit!(self.parents, SHRINK_THRESHOLD);
shrink_to_fit!(self.leaders, SHRINK_THRESHOLD);
removed
}

Expand All @@ -65,6 +88,14 @@ impl OrphanBlockPool {
pub fn len(&self) -> usize {
self.parents.len()
}

pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub fn clone_leaders(&self) -> Vec<ParentHash> {
self.leaders.iter().map(|r| r.clone()).collect::<Vec<_>>()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -102,9 +133,9 @@ mod tests {
}

let orphan = pool.remove_blocks_by_parent(&consensus.genesis_block().hash());
let orphan: HashSet<BlockView> = orphan.into_iter().collect();
let block: HashSet<BlockView> = blocks.into_iter().collect();
assert_eq!(orphan, block)
let orphan_set: HashSet<BlockView> = orphan.into_iter().collect();
let blocks_set: HashSet<BlockView> = blocks.into_iter().collect();
assert_eq!(orphan_set, blocks_set)
}

#[test]
Expand Down Expand Up @@ -133,4 +164,59 @@ mod tests {

thread1.join().unwrap();
}

#[test]
fn test_leaders() {
let consensus = ConsensusBuilder::default().build();
let block_number = 20;
let mut blocks = Vec::new();
let mut parent = consensus.genesis_block().header();
let pool = OrphanBlockPool::with_capacity(20);
for i in 0..block_number - 1 {
let new_block = gen_block(&parent);
blocks.push(new_block.clone());
parent = new_block.header();
if i % 5 != 0 {
pool.insert(new_block.clone());
}
}

assert_eq!(pool.len(), 15);
assert_eq!(pool.leaders.len(), 4);

pool.insert(blocks[5].clone());
assert_eq!(pool.len(), 16);
assert_eq!(pool.leaders.len(), 3);

pool.insert(blocks[10].clone());
assert_eq!(pool.len(), 17);
assert_eq!(pool.leaders.len(), 2);

// index 0 doesn't in the orphan pool, so do nothing
let orphan = pool.remove_blocks_by_parent(&consensus.genesis_block().hash());
assert!(orphan.is_empty());
assert_eq!(pool.len(), 17);
assert_eq!(pool.leaders.len(), 2);

pool.insert(blocks[0].clone());
assert_eq!(pool.len(), 18);
assert_eq!(pool.leaders.len(), 2);

let orphan = pool.remove_blocks_by_parent(&consensus.genesis_block().hash());
assert_eq!(pool.len(), 3);
assert_eq!(pool.leaders.len(), 1);

pool.insert(blocks[15].clone());
assert_eq!(pool.len(), 4);
assert_eq!(pool.leaders.len(), 1);

let orphan_1 = pool.remove_blocks_by_parent(&blocks[14].hash());

let orphan_set: HashSet<BlockView> =
orphan.into_iter().chain(orphan_1.into_iter()).collect();
let blocks_set: HashSet<BlockView> = blocks.into_iter().collect();
assert_eq!(orphan_set, blocks_set);
assert_eq!(pool.len(), 0);
assert_eq!(pool.leaders.len(), 0);
}
}
13 changes: 7 additions & 6 deletions sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,12 +836,13 @@ impl CKBProtocolHandler for Relayer {
}
ASK_FOR_TXS_TOKEN => self.ask_for_txs(nc.as_ref()),
TX_HASHES_TOKEN => self.send_bulk_of_tx_hashes(nc.as_ref()),
SEARCH_ORPHAN_POOL_TOKEN => tokio::task::block_in_place(|| {
self.shared.try_search_orphan_pool(
&self.chain,
&self.shared.active_chain().tip_header().hash(),
)
}),
SEARCH_ORPHAN_POOL_TOKEN => {
if !self.shared.state().orphan_pool().is_empty() {
tokio::task::block_in_place(|| {
self.shared.try_search_orphan_pool(&self.chain);
})
}
}
_ => unreachable!(),
}
trace_target!(
Expand Down
69 changes: 37 additions & 32 deletions sync/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1275,7 +1275,7 @@ impl SyncShared {
block: Arc<core::BlockView>,
) -> Result<bool, CKBError> {
// Insert the given block into orphan_block_pool if its parent is not found
if !self.is_parent_stored(&block) {
if !self.is_stored(&block.parent_hash()) {
debug!(
"insert new orphan block {} {}",
block.header().number(),
Expand All @@ -1294,38 +1294,44 @@ impl SyncShared {

// The above block has been accepted. Attempt to accept its descendant blocks in orphan pool.
// The returned blocks of `remove_blocks_by_parent` are in topology order by parents
self.try_search_orphan_pool(chain, &block.as_ref().hash());
self.try_search_orphan_pool(chain);
ret
}

/// Try search orphan pool with current tip header hash
pub fn try_search_orphan_pool(&self, chain: &ChainController, parent_hash: &Byte32) {
let descendants = self.state.remove_orphan_by_parent(parent_hash);
debug!(
"try accepting {} descendant orphan blocks",
descendants.len()
);

for block in descendants {
// If we can not find the block's parent in database, that means it was failed to accept
// its parent, so we treat it as an invalid block as well.
if !self.is_parent_stored(&block) {
debug!(
"parent-unknown orphan block, block: {}, {}, parent: {}",
block.header().number(),
block.header().hash(),
block.header().parent_hash(),
);
continue;
/// Try to find blocks from the orphan block pool that may no longer be orphan
pub fn try_search_orphan_pool(&self, chain: &ChainController) {
for hash in self.state.orphan_pool().clone_leaders() {
if self.state.orphan_pool().is_empty() {
break;
}

let block = Arc::new(block);
if let Err(err) = self.accept_block(chain, Arc::clone(&block)) {
if self.is_stored(&hash) {
let descendants = self.state.remove_orphan_by_parent(&hash);
debug!(
"accept descendant orphan block {} error {:?}",
block.header().hash(),
err
"try accepting {} descendant orphan blocks by exist parents hash",
descendants.len()
);
for block in descendants {
// If we can not find the block's parent in database, that means it was failed to accept
// its parent, so we treat it as an invalid block as well.
if !self.is_stored(&block.parent_hash()) {
debug!(
"parent-unknown orphan block, block: {}, {}, parent: {}",
block.header().number(),
block.header().hash(),
block.header().parent_hash(),
);
continue;
}

let block = Arc::new(block);
if let Err(err) = self.accept_block(chain, Arc::clone(&block)) {
debug!(
"accept descendant orphan block {} error {:?}",
block.header().hash(),
err
);
}
}
}
}
}
Expand Down Expand Up @@ -1438,11 +1444,10 @@ impl SyncShared {
}
}

/// Check whether block's parent has been inserted to chain store
pub fn is_parent_stored(&self, block: &core::BlockView) -> bool {
self.store()
.get_block_header(&block.data().header().raw().parent_hash())
.is_some()
/// Check whether block has been inserted to chain store
pub fn is_stored(&self, hash: &packed::Byte32) -> bool {
let status = self.active_chain().get_block_status(&hash);
status.contains(BlockStatus::BLOCK_STORED)
}

/// Get epoch ext by block hash
Expand Down
8 changes: 4 additions & 4 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,10 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(CellBeingCellDepAndSpentInSameBlockTestGetBlockTemplate),
Box::new(CellBeingCellDepAndSpentInSameBlockTestGetBlockTemplateMultiple),
Box::new(HeaderSyncCycle),
Box::new(InboundSync),
Box::new(OutboundSync),
Box::new(InboundMinedDuringSync),
Box::new(OutboundMinedDuringSync),
// Test hard fork features
Box::new(CheckCellDeps),
Box::new(CheckAbsoluteEpochSince),
Expand All @@ -499,10 +503,6 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(CheckVmVersion),
Box::new(CheckVmBExtension),
Box::new(ImmatureHeaderDeps),
Box::new(InboundSync),
Box::new(OutboundSync),
Box::new(InboundMinedDuringSync),
Box::new(OutboundMinedDuringSync),
];
specs.shuffle(&mut thread_rng());
specs
Expand Down

0 comments on commit 4fc08b0

Please sign in to comment.