diff --git a/sync/src/orphan_block_pool.rs b/sync/src/orphan_block_pool.rs index 521b21b121..5190ac83b5 100644 --- a/sync/src/orphan_block_pool.rs +++ b/sync/src/orphan_block_pool.rs @@ -1,6 +1,6 @@ use ckb_types::{core, packed}; use ckb_util::shrink_to_fit; -use dashmap::DashMap; +use dashmap::{DashMap, DashSet}; use std::collections::{HashMap, VecDeque}; pub type ParentHash = packed::Byte32; @@ -10,8 +10,14 @@ const SHRINK_THRESHOLD: usize = 100; // orphan_block_pool and block_status_map, but `LruCache` would prune old items implicitly. #[derive(Default)] pub struct OrphanBlockPool { + // Group by blocks in the pool by the parent hash. blocks: DashMap>, + // The map tells the parent hash when given the hash of a block in the pool. + // + // The block is in the orphan pool if and only if the block hash exists as a key in this map. parents: DashMap, + // Leaders are blocks not in the orphan pool but having at least a child in the pool. + leaders: DashSet, } impl OrphanBlockPool { @@ -19,6 +25,7 @@ impl OrphanBlockPool { OrphanBlockPool { blocks: DashMap::with_capacity(capacity), parents: DashMap::new(), + leaders: DashSet::new(), } } @@ -30,12 +37,27 @@ impl OrphanBlockPool { .entry(parent_hash.clone()) .or_insert_with(HashMap::default) .insert(hash.clone(), block); + // Out-of-order insertion needs to be deduplicated + self.leaders.remove(&hash); + // It is a possible optimization to make the judgment in advance, + // because the parent of the block must not be equal to its own hash, + // so we can judge first, which may reduce one arc clone + if !self.parents.contains_key(&parent_hash) { + // Block referenced by `parent_hash` is not in the pool, + // and it has at least one child, the new inserted block, so add it to leaders. + self.leaders.insert(parent_hash.clone()); + } self.parents.insert(hash, parent_hash); } - pub fn remove_blocks_by_parent(&self, hash: &packed::Byte32) -> Vec { + pub fn remove_blocks_by_parent(&self, parent_hash: &ParentHash) -> Vec { + // try remove leaders first + if self.leaders.remove(parent_hash).is_none() { + return Vec::new(); + } + let mut queue: VecDeque = VecDeque::new(); - queue.push_back(hash.to_owned()); + queue.push_back(parent_hash.to_owned()); let mut removed: Vec = Vec::new(); while let Some(parent_hash) = queue.pop_front() { @@ -51,6 +73,7 @@ impl OrphanBlockPool { shrink_to_fit!(self.blocks, SHRINK_THRESHOLD); shrink_to_fit!(self.parents, SHRINK_THRESHOLD); + shrink_to_fit!(self.leaders, SHRINK_THRESHOLD); removed } @@ -65,6 +88,14 @@ impl OrphanBlockPool { pub fn len(&self) -> usize { self.parents.len() } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn clone_leaders(&self) -> Vec { + self.leaders.iter().map(|r| r.clone()).collect::>() + } } #[cfg(test)] @@ -102,9 +133,9 @@ mod tests { } let orphan = pool.remove_blocks_by_parent(&consensus.genesis_block().hash()); - let orphan: HashSet = orphan.into_iter().collect(); - let block: HashSet = blocks.into_iter().collect(); - assert_eq!(orphan, block) + let orphan_set: HashSet = orphan.into_iter().collect(); + let blocks_set: HashSet = blocks.into_iter().collect(); + assert_eq!(orphan_set, blocks_set) } #[test] @@ -133,4 +164,59 @@ mod tests { thread1.join().unwrap(); } + + #[test] + fn test_leaders() { + let consensus = ConsensusBuilder::default().build(); + let block_number = 20; + let mut blocks = Vec::new(); + let mut parent = consensus.genesis_block().header(); + let pool = OrphanBlockPool::with_capacity(20); + for i in 0..block_number - 1 { + let new_block = gen_block(&parent); + blocks.push(new_block.clone()); + parent = new_block.header(); + if i % 5 != 0 { + pool.insert(new_block.clone()); + } + } + + assert_eq!(pool.len(), 15); + assert_eq!(pool.leaders.len(), 4); + + pool.insert(blocks[5].clone()); + assert_eq!(pool.len(), 16); + assert_eq!(pool.leaders.len(), 3); + + pool.insert(blocks[10].clone()); + assert_eq!(pool.len(), 17); + assert_eq!(pool.leaders.len(), 2); + + // index 0 doesn't in the orphan pool, so do nothing + let orphan = pool.remove_blocks_by_parent(&consensus.genesis_block().hash()); + assert!(orphan.is_empty()); + assert_eq!(pool.len(), 17); + assert_eq!(pool.leaders.len(), 2); + + pool.insert(blocks[0].clone()); + assert_eq!(pool.len(), 18); + assert_eq!(pool.leaders.len(), 2); + + let orphan = pool.remove_blocks_by_parent(&consensus.genesis_block().hash()); + assert_eq!(pool.len(), 3); + assert_eq!(pool.leaders.len(), 1); + + pool.insert(blocks[15].clone()); + assert_eq!(pool.len(), 4); + assert_eq!(pool.leaders.len(), 1); + + let orphan_1 = pool.remove_blocks_by_parent(&blocks[14].hash()); + + let orphan_set: HashSet = + orphan.into_iter().chain(orphan_1.into_iter()).collect(); + let blocks_set: HashSet = blocks.into_iter().collect(); + assert_eq!(orphan_set, blocks_set); + assert_eq!(pool.len(), 0); + assert_eq!(pool.leaders.len(), 0); + } } diff --git a/sync/src/relayer/mod.rs b/sync/src/relayer/mod.rs index 230f30e481..0298288bc3 100644 --- a/sync/src/relayer/mod.rs +++ b/sync/src/relayer/mod.rs @@ -836,12 +836,13 @@ impl CKBProtocolHandler for Relayer { } ASK_FOR_TXS_TOKEN => self.ask_for_txs(nc.as_ref()), TX_HASHES_TOKEN => self.send_bulk_of_tx_hashes(nc.as_ref()), - SEARCH_ORPHAN_POOL_TOKEN => tokio::task::block_in_place(|| { - self.shared.try_search_orphan_pool( - &self.chain, - &self.shared.active_chain().tip_header().hash(), - ) - }), + SEARCH_ORPHAN_POOL_TOKEN => { + if !self.shared.state().orphan_pool().is_empty() { + tokio::task::block_in_place(|| { + self.shared.try_search_orphan_pool(&self.chain); + }) + } + } _ => unreachable!(), } trace_target!( diff --git a/sync/src/types/mod.rs b/sync/src/types/mod.rs index 2cb2c6f5b0..94e63ee6b1 100644 --- a/sync/src/types/mod.rs +++ b/sync/src/types/mod.rs @@ -1275,7 +1275,7 @@ impl SyncShared { block: Arc, ) -> Result { // Insert the given block into orphan_block_pool if its parent is not found - if !self.is_parent_stored(&block) { + if !self.is_stored(&block.parent_hash()) { debug!( "insert new orphan block {} {}", block.header().number(), @@ -1294,38 +1294,44 @@ impl SyncShared { // The above block has been accepted. Attempt to accept its descendant blocks in orphan pool. // The returned blocks of `remove_blocks_by_parent` are in topology order by parents - self.try_search_orphan_pool(chain, &block.as_ref().hash()); + self.try_search_orphan_pool(chain); ret } - /// Try search orphan pool with current tip header hash - pub fn try_search_orphan_pool(&self, chain: &ChainController, parent_hash: &Byte32) { - let descendants = self.state.remove_orphan_by_parent(parent_hash); - debug!( - "try accepting {} descendant orphan blocks", - descendants.len() - ); - - for block in descendants { - // If we can not find the block's parent in database, that means it was failed to accept - // its parent, so we treat it as an invalid block as well. - if !self.is_parent_stored(&block) { - debug!( - "parent-unknown orphan block, block: {}, {}, parent: {}", - block.header().number(), - block.header().hash(), - block.header().parent_hash(), - ); - continue; + /// Try to find blocks from the orphan block pool that may no longer be orphan + pub fn try_search_orphan_pool(&self, chain: &ChainController) { + for hash in self.state.orphan_pool().clone_leaders() { + if self.state.orphan_pool().is_empty() { + break; } - - let block = Arc::new(block); - if let Err(err) = self.accept_block(chain, Arc::clone(&block)) { + if self.is_stored(&hash) { + let descendants = self.state.remove_orphan_by_parent(&hash); debug!( - "accept descendant orphan block {} error {:?}", - block.header().hash(), - err + "try accepting {} descendant orphan blocks by exist parents hash", + descendants.len() ); + for block in descendants { + // If we can not find the block's parent in database, that means it was failed to accept + // its parent, so we treat it as an invalid block as well. + if !self.is_stored(&block.parent_hash()) { + debug!( + "parent-unknown orphan block, block: {}, {}, parent: {}", + block.header().number(), + block.header().hash(), + block.header().parent_hash(), + ); + continue; + } + + let block = Arc::new(block); + if let Err(err) = self.accept_block(chain, Arc::clone(&block)) { + debug!( + "accept descendant orphan block {} error {:?}", + block.header().hash(), + err + ); + } + } } } } @@ -1438,11 +1444,10 @@ impl SyncShared { } } - /// Check whether block's parent has been inserted to chain store - pub fn is_parent_stored(&self, block: &core::BlockView) -> bool { - self.store() - .get_block_header(&block.data().header().raw().parent_hash()) - .is_some() + /// Check whether block has been inserted to chain store + pub fn is_stored(&self, hash: &packed::Byte32) -> bool { + let status = self.active_chain().get_block_status(&hash); + status.contains(BlockStatus::BLOCK_STORED) } /// Get epoch ext by block hash diff --git a/test/src/main.rs b/test/src/main.rs index f1db127b8c..0f3540a7ca 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -491,6 +491,10 @@ fn all_specs() -> Vec> { Box::new(CellBeingCellDepAndSpentInSameBlockTestGetBlockTemplate), Box::new(CellBeingCellDepAndSpentInSameBlockTestGetBlockTemplateMultiple), Box::new(HeaderSyncCycle), + Box::new(InboundSync), + Box::new(OutboundSync), + Box::new(InboundMinedDuringSync), + Box::new(OutboundMinedDuringSync), // Test hard fork features Box::new(CheckCellDeps), Box::new(CheckAbsoluteEpochSince), @@ -499,10 +503,6 @@ fn all_specs() -> Vec> { Box::new(CheckVmVersion), Box::new(CheckVmBExtension), Box::new(ImmatureHeaderDeps), - Box::new(InboundSync), - Box::new(OutboundSync), - Box::new(InboundMinedDuringSync), - Box::new(OutboundMinedDuringSync), ]; specs.shuffle(&mut thread_rng()); specs