diff --git a/sync/src/synchronizer/block_fetcher.rs b/sync/src/synchronizer/block_fetcher.rs index 142f755928c..0f9d83c030a 100644 --- a/sync/src/synchronizer/block_fetcher.rs +++ b/sync/src/synchronizer/block_fetcher.rs @@ -156,7 +156,11 @@ impl<'a> BlockFetcher<'a> { // NOTE: NO-IBD Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding // stopping synchronization even when orphan_pool maintains dirty items by bugs. // TODO: If async validation is achieved, then the IBD state judgement here can be removed - } else if inflight.insert(self.peer, hash, header.number()) { + + // On IBD, BLOCK_RECEIVED means this block had been received, so this block doesn't need to fetch + // On NO-IBD, because of the model, this block has to be requested again + // But all of these can do nothing on this branch + } else if inflight.insert(self.peer, (header.number(), hash).into()) { fetch.push(header) } @@ -175,15 +179,11 @@ impl<'a> BlockFetcher<'a> { fetch.sort_by_key(|header| header.number()); let tip = self.active_chain.tip_number(); - let should_mark = fetch.last().map_or(true, |header| { + let should_mark = fetch.last().map_or(false, |header| { header.number().saturating_sub(crate::CHECK_POINT_WINDOW) > tip }); if should_mark { - if let Some(entry) = inflight.trace_number.get_mut(&(tip + 1)) { - if entry.1.is_none() { - entry.1 = Some(faketime::unix_time_as_millis()); - } - } + inflight.mark_slow_block(tip); } Some( diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index 5a41af9fbac..6feca8bb6e8 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -491,9 +491,10 @@ impl Synchronizer { trace!("poll find_blocks_to_fetch select peers"); // fetch use a lot of cpu time, especially in ibd state + // so in ibd, the fetch function use another thread match nc.p2p_control() { Some(raw) if ibd.into() => match self.fetch_channel { - Some(ref send) => send.send(FetchCMD::Fetch(peers)).unwrap(), + Some(ref sender) => sender.send(FetchCMD::Fetch(peers)).unwrap(), None => { let p2p_control = raw.clone(); let sync = self.clone(); diff --git a/sync/src/tests/inflight_blocks.rs b/sync/src/tests/inflight_blocks.rs index 4ed27ea217e..d8131b454ef 100644 --- a/sync/src/tests/inflight_blocks.rs +++ b/sync/src/tests/inflight_blocks.rs @@ -1,4 +1,4 @@ -use crate::types::InflightBlocks; +use crate::types::{BlockNumberAndHash, InflightBlocks}; use crate::BLOCK_DOWNLOAD_TIMEOUT; use ckb_types::prelude::*; use ckb_types::{h256, H256}; @@ -10,24 +10,24 @@ fn inflight_blocks_count() { let mut inflight_blocks = InflightBlocks::default(); // don't allow 2 peer for one block - assert!(inflight_blocks.insert(2.into(), h256!("0x1").pack(), 1)); - assert!(!inflight_blocks.insert(1.into(), h256!("0x1").pack(), 1)); + assert!(inflight_blocks.insert(2.into(), (1, h256!("0x1").pack()).into())); + assert!(!inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into())); // peer 1 inflight - assert!(!inflight_blocks.insert(1.into(), h256!("0x1").pack(), 1)); + assert!(!inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into())); - assert!(inflight_blocks.insert(1.into(), h256!("0x2").pack(), 2)); + assert!(inflight_blocks.insert(1.into(), (2, h256!("0x2").pack()).into())); assert_eq!(inflight_blocks.total_inflight_count(), 2); // 0x1 0x2 assert_eq!(inflight_blocks.peer_inflight_count(1.into()), 1); assert_eq!(inflight_blocks.peer_inflight_count(2.into()), 1); // one block inflight assert_eq!( inflight_blocks.inflight_block_by_peer(1.into()).cloned(), - Some(HashSet::from_iter(vec![h256!("0x2").pack()])) + Some(HashSet::from_iter(vec![(2, h256!("0x2").pack()).into()])) ); // receive block 0x1 - inflight_blocks.remove_by_block(h256!("0x1").pack()); + inflight_blocks.remove_by_block((1, h256!("0x1").pack()).into()); assert_eq!(inflight_blocks.total_inflight_count(), 1); // 0x2 assert_eq!(inflight_blocks.peer_inflight_count(1.into()), 1); @@ -36,7 +36,7 @@ fn inflight_blocks_count() { inflight_blocks .inflight_block_by_peer(1.into()) .map(|set| set.iter().collect()), - Some(vec![&h256!("0x2").pack()]) + Some(vec![&(2, h256!("0x2").pack()).into()]) ); } @@ -44,19 +44,19 @@ fn inflight_blocks_count() { fn inflight_blocks_state() { let mut inflight_blocks = InflightBlocks::default(); - assert!(inflight_blocks.insert(1.into(), h256!("0x1").pack(), 1)); - assert!(!inflight_blocks.insert(2.into(), h256!("0x1").pack(), 1)); - assert!(!inflight_blocks.insert(3.into(), h256!("0x1").pack(), 1)); + assert!(inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into())); + assert!(!inflight_blocks.insert(2.into(), (1, h256!("0x1").pack()).into())); + assert!(!inflight_blocks.insert(3.into(), (1, h256!("0x1").pack()).into())); // peer 1 inflight - assert!(!inflight_blocks.insert(1.into(), h256!("0x1").pack(), 1)); - assert!(inflight_blocks.insert(1.into(), h256!("0x2").pack(), 2)); + assert!(!inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into())); + assert!(inflight_blocks.insert(1.into(), (2, h256!("0x2").pack()).into())); - assert!(inflight_blocks.insert(3.into(), h256!("0x3").pack(), 3)); + assert!(inflight_blocks.insert(3.into(), (3, h256!("0x3").pack()).into())); assert_eq!( inflight_blocks - .inflight_state_by_block(&h256!("0x1").pack()) + .inflight_state_by_block(&(1, h256!("0x1").pack()).into()) .cloned() .map(|state| { state.peer }), Some(1.into()) @@ -64,7 +64,7 @@ fn inflight_blocks_state() { assert_eq!( inflight_blocks - .inflight_state_by_block(&h256!("0x3").pack()) + .inflight_state_by_block(&(3, h256!("0x3").pack()).into()) .map(|state| state.peer), Some(3.into()) ); @@ -75,14 +75,14 @@ fn inflight_blocks_state() { assert_eq!( inflight_blocks - .inflight_state_by_block(&h256!("0x1").pack()) + .inflight_state_by_block(&(1, h256!("0x1").pack()).into()) .map(|state| state.peer), None ); assert_eq!( inflight_blocks - .inflight_state_by_block(&h256!("0x3").pack()) + .inflight_state_by_block(&(3, h256!("0x3").pack()).into()) .map(|state| state.peer), Some(3.into()) ); @@ -95,43 +95,43 @@ fn inflight_blocks_timeout() { faketime::enable(&faketime_file); let mut inflight_blocks = InflightBlocks::default(); - assert!(inflight_blocks.insert(1.into(), h256!("0x1").pack(), 1)); - assert!(inflight_blocks.insert(1.into(), h256!("0x2").pack(), 2)); - assert!(inflight_blocks.insert(2.into(), h256!("0x3").pack(), 3)); - assert!(!inflight_blocks.insert(1.into(), h256!("0x3").pack(), 3)); - assert!(inflight_blocks.insert(1.into(), h256!("0x4").pack(), 4)); - assert!(inflight_blocks.insert(2.into(), h256!("0x5").pack(), 5)); - assert!(!inflight_blocks.insert(2.into(), h256!("0x5").pack(), 5)); + assert!(inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into())); + assert!(inflight_blocks.insert(1.into(), (2, h256!("0x2").pack()).into())); + assert!(inflight_blocks.insert(2.into(), (3, h256!("0x3").pack()).into())); + assert!(!inflight_blocks.insert(1.into(), (3, h256!("0x3").pack()).into())); + assert!(inflight_blocks.insert(1.into(), (4, h256!("0x4").pack()).into())); + assert!(inflight_blocks.insert(2.into(), (5, h256!("0x5").pack()).into())); + assert!(!inflight_blocks.insert(2.into(), (5, h256!("0x5").pack()).into())); faketime::write_millis(&faketime_file, BLOCK_DOWNLOAD_TIMEOUT + 1).expect("write millis"); - assert!(!inflight_blocks.insert(3.into(), h256!("0x3").pack(), 3)); - assert!(!inflight_blocks.insert(3.into(), h256!("0x2").pack(), 2)); - assert!(inflight_blocks.insert(4.into(), h256!("0x6").pack(), 6)); - assert!(inflight_blocks.insert(1.into(), h256!("0x7").pack(), 7)); + assert!(!inflight_blocks.insert(3.into(), (3, h256!("0x3").pack()).into())); + assert!(!inflight_blocks.insert(3.into(), (2, h256!("0x2").pack()).into())); + assert!(inflight_blocks.insert(4.into(), (6, h256!("0x6").pack()).into())); + assert!(inflight_blocks.insert(1.into(), (7, h256!("0x7").pack()).into())); let peers = inflight_blocks.prune(0); assert_eq!(peers, HashSet::from_iter(vec![1.into(), 2.into()])); - assert!(inflight_blocks.insert(3.into(), h256!("0x2").pack(), 2)); - assert!(inflight_blocks.insert(3.into(), h256!("0x3").pack(), 3)); + assert!(inflight_blocks.insert(3.into(), (2, h256!("0x2").pack()).into())); + assert!(inflight_blocks.insert(3.into(), (3, h256!("0x3").pack()).into())); assert_eq!( inflight_blocks - .inflight_state_by_block(&h256!("0x3").pack()) + .inflight_state_by_block(&(3, h256!("0x3").pack()).into()) .map(|state| state.peer), Some(3.into()) ); assert_eq!( inflight_blocks - .inflight_state_by_block(&h256!("0x2").pack()) + .inflight_state_by_block(&(2, h256!("0x2").pack()).into()) .map(|state| state.peer), Some(3.into()) ); assert_eq!( inflight_blocks - .inflight_state_by_block(&h256!("0x6").pack()) + .inflight_state_by_block(&(6, h256!("0x6").pack()).into()) .cloned() .map(|state| state.peer), Some(4.into()) @@ -146,49 +146,49 @@ fn inflight_trace_number_state() { let mut inflight_blocks = InflightBlocks::default(); - assert!(inflight_blocks.insert(1.into(), h256!("0x1").pack(), 1)); - assert!(inflight_blocks.insert(2.into(), h256!("0x2").pack(), 2)); - assert!(inflight_blocks.insert(3.into(), h256!("0x3").pack(), 3)); - assert!(inflight_blocks.insert(4.into(), h256!("0x33").pack(), 3)); - assert!(inflight_blocks.insert(5.into(), h256!("0x4").pack(), 4)); - assert!(inflight_blocks.insert(6.into(), h256!("0x5").pack(), 5)); - assert!(inflight_blocks.insert(7.into(), h256!("0x55").pack(), 5)); + assert!(inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into())); + assert!(inflight_blocks.insert(2.into(), (2, h256!("0x2").pack()).into())); + assert!(inflight_blocks.insert(3.into(), (3, h256!("0x3").pack()).into())); + assert!(inflight_blocks.insert(4.into(), (3, h256!("0x33").pack()).into())); + assert!(inflight_blocks.insert(5.into(), (4, h256!("0x4").pack()).into())); + assert!(inflight_blocks.insert(6.into(), (5, h256!("0x5").pack()).into())); + assert!(inflight_blocks.insert(7.into(), (5, h256!("0x55").pack()).into())); let list = inflight_blocks.prune(2); assert!(list.is_empty()); - let (next_number, (blocks, time)) = inflight_blocks.trace_number.iter().next().unwrap(); + assert!(inflight_blocks.trace_number.is_empty()); + assert!(inflight_blocks.restart_number == 0); + + // When 2 + 512 number block request send out + inflight_blocks.mark_slow_block(2); - assert_eq!(next_number, &3); assert_eq!( - blocks, - &HashSet::from_iter(vec![h256!("0x3").pack(), h256!("0x33").pack()]) + inflight_blocks + .trace_number + .keys() + .cloned() + .collect::>(), + HashSet::from_iter(vec![ + (1, h256!("0x1").pack()).into(), + (2, h256!("0x2").pack()).into(), + (3, h256!("0x3").pack()).into(), + (3, h256!("0x33").pack()).into() + ]) ); - assert!(time.is_none()); - - // When an orphan block is inserted - { - if let Some((_, time)) = inflight_blocks.trace_number.get_mut(&3) { - *time = Some(faketime::unix_time_as_millis()) - } - } faketime::write_millis(&faketime_file, 2000).expect("write millis"); let list = inflight_blocks.prune(2); assert!(list.is_empty()); - let (next_number, (blocks, time)) = inflight_blocks.trace_number.iter().next().unwrap(); - - assert_eq!(next_number, &4); - assert_eq!(blocks, &HashSet::from_iter(vec![h256!("0x4").pack()])); - assert!(time.is_none()); + assert!(inflight_blocks.restart_number == 3); assert!(inflight_blocks - .inflight_state_by_block(&h256!("0x3").pack()) + .inflight_state_by_block(&(3, h256!("0x3").pack()).into()) .is_none()); assert!(inflight_blocks - .inflight_state_by_block(&h256!("0x33").pack()) + .inflight_state_by_block(&(3, h256!("0x33").pack()).into()) .is_none()); assert_eq!(inflight_blocks.peer_can_fetch_count(3.into()), 8); diff --git a/sync/src/types.rs b/sync/src/types.rs index 13698e38b89..d116a68812f 100644 --- a/sync/src/types.rs +++ b/sync/src/types.rs @@ -26,7 +26,7 @@ use failure::Error as FailureError; use faketime::unix_time_as_millis; use lru_cache::LruCache; use std::cmp; -use std::collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}; +use std::collections::{btree_map::Entry, BTreeMap, HashMap, HashSet}; use std::fmt; use std::hash::Hash; use std::mem; @@ -254,15 +254,28 @@ pub struct Peers { pub struct InflightState { pub(crate) peer: PeerIndex, pub(crate) timestamp: u64, - pub(crate) number: BlockNumber, } impl InflightState { - fn new(peer: PeerIndex, number: BlockNumber) -> Self { + fn new(peer: PeerIndex) -> Self { Self { peer, timestamp: unix_time_as_millis(), - number, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct BlockNumberAndHash { + pub number: BlockNumber, + pub hash: Byte32, +} + +impl From<(BlockNumber, Byte32)> for BlockNumberAndHash { + fn from(inner: (BlockNumber, Byte32)) -> Self { + Self { + number: inner.0, + hash: inner.1, } } } @@ -272,7 +285,7 @@ pub struct DownloadScheduler { task_count: usize, timeout_count: usize, breakthroughs_count: usize, - hashes: HashSet, + hashes: HashSet, } impl Default for DownloadScheduler { @@ -302,14 +315,22 @@ impl DownloadScheduler { fn adjust(&mut self, time: u64, len: u64) { let now = unix_time_as_millis(); // 8 means default max outbound - let (a, b) = if len > 8 { (len >> 3, len % 8) } else { (1, 0) }; + // All synchronization tests are based on the assumption of 8 nodes. + // If the number of nodes is increased, the number of requests and processing time will increase, + // and the corresponding adjustment criteria are needed, so the adjustment is based on 8. + // + // note: This is an interim scenario and the next step is to consider the median response time + // within a certain interval as the adjustment criterion + let (quotient, remainder) = if len > 8 { (len >> 3, len % 8) } else { (1, 0) }; // Dynamically adjust download tasks based on response time. // // Block max size about 700k, Under 10m/s bandwidth it may cost 1s to response // But the mark is on the same time, so multiple tasks may be affected by one task, // causing all responses to be in the range of reduced tasks. // So the adjustment to reduce the number of tasks will be calculated by modulo 3 with the actual number of triggers. - match ((now - time) / a).saturating_sub(b * 100) { + // Adjust for each block received. + match ((now - time) / quotient).saturating_sub(remainder * 100) { + // Within 500ms of response, considered better to communicate with that node's network 0..=500 => { if self.task_count < FIRST_LEVEL_MAX - 1 { self.task_count += 2 @@ -317,6 +338,7 @@ impl DownloadScheduler { self.breakthroughs_count += 1 } } + // Within 1000ms of response time, the network is relatively good and communication should be maintained 501..=1000 => { if self.task_count < FIRST_LEVEL_MAX { self.task_count += 1 @@ -324,7 +346,9 @@ impl DownloadScheduler { self.breakthroughs_count += 1 } } + // Response time within 1500ms, acceptable range, but not relatively good for nodes and do not adjust 1001..=1500 => (), + // Above 1500ms, relatively poor network, reduced _ => { self.timeout_count += 1; if self.timeout_count > 3 { @@ -349,30 +373,30 @@ impl DownloadScheduler { #[derive(Clone)] pub struct InflightBlocks { pub(crate) download_schedulers: HashMap, - inflight_states: HashMap, - pub(crate) trace_number: BTreeMap, Option)>, + inflight_states: BTreeMap, + pub(crate) trace_number: HashMap, compact_reconstruct_inflight: HashMap>, - restart_number: BlockNumber, + pub(crate) restart_number: BlockNumber, } impl Default for InflightBlocks { fn default() -> Self { InflightBlocks { download_schedulers: HashMap::default(), - inflight_states: HashMap::default(), - trace_number: BTreeMap::default(), + inflight_states: BTreeMap::default(), + trace_number: HashMap::default(), compact_reconstruct_inflight: HashMap::default(), restart_number: 0, } } } -struct DebugHashSet<'a>(&'a HashSet); +struct DebugHashSet<'a>(&'a HashSet); impl<'a> fmt::Debug for DebugHashSet<'a> { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_set() - .entries(self.0.iter().map(|h| format!("{}", h))) + .entries(self.0.iter().map(|h| format!("{}", h.hash))) .finish() } } @@ -390,14 +414,14 @@ impl fmt::Debug for InflightBlocks { .entries( self.inflight_states .iter() - .map(|(k, v)| (format!("{}", k), v)), + .map(|(k, v)| (format!("{}", k.hash), v)), ) .finish() } } impl InflightBlocks { - pub fn blocks_iter(&self) -> impl Iterator)> { + pub fn blocks_iter(&self) -> impl Iterator)> { self.download_schedulers.iter().map(|(k, v)| (k, &v.hashes)) } @@ -419,11 +443,11 @@ impl InflightBlocks { ) } - pub fn inflight_block_by_peer(&self, peer: PeerIndex) -> Option<&HashSet> { + pub fn inflight_block_by_peer(&self, peer: PeerIndex) -> Option<&HashSet> { self.download_schedulers.get(&peer).map(|d| &d.hashes) } - pub fn inflight_state_by_block(&self, block: &Byte32) -> Option<&InflightState> { + pub fn inflight_state_by_block(&self, block: &BlockNumberAndHash) -> Option<&InflightState> { self.inflight_states.get(block) } @@ -446,28 +470,52 @@ impl InflightBlocks { self.compact_reconstruct_inflight.get(hash) } + pub fn mark_slow_block(&mut self, tip: BlockNumber) { + let now = faketime::unix_time_as_millis(); + for key in self.inflight_states.keys() { + if key.number > tip + 1 { + break; + } + self.trace_number.entry(key.clone()).or_insert(now); + } + } + pub fn prune(&mut self, tip: BlockNumber) -> HashSet { let now = unix_time_as_millis(); let prev_count = self.total_inflight_count(); - self.trace_number = self.trace_number.split_off(&(tip + 1)); let mut disconnect_list = HashSet::new(); let trace = &mut self.trace_number; - let blocks = &mut self.download_schedulers; + let download_schedulers = &mut self.download_schedulers; let states = &mut self.inflight_states; - states.retain(|k, v| { - let outdate = (v.timestamp + BLOCK_DOWNLOAD_TIMEOUT) < now; - if outdate { - blocks.get_mut(&v.peer).map(|set| set.hashes.remove(k)); - trace.entry(v.number).and_modify(|(trace, _)| { - trace.remove(k); - }); - disconnect_list.insert(v.peer); + + let mut remove_key = Vec::new(); + // Since this is a btreemap, with the data already sorted, + // we don't have to worry about missing points, and we don't need to + // iterate through all the data each time, just check within tip + 20, + // with the checkpoint marking possible blocking points, it's enough + let end = tip + 20; + for (key, value) in states.iter() { + if key.number > end { + break; } - !outdate - }); + if value.timestamp + BLOCK_DOWNLOAD_TIMEOUT < now { + download_schedulers + .get_mut(&value.peer) + .map(|set| set.hashes.remove(key)); + if !trace.is_empty() { + trace.remove(&key); + } + disconnect_list.insert(value.peer); + remove_key.push(key.clone()); + } + } - blocks.retain(|k, v| { + for key in remove_key { + states.remove(&key); + } + + download_schedulers.retain(|k, v| { // task number zero means this peer's response is very slow let zero = v.task_count != 0; if !zero { @@ -476,38 +524,35 @@ impl InflightBlocks { zero }); - let should_remove = trace - .get(&(tip + 1)) - .map(|(hashes, timestamp)| { - // In the normal state, timestamp should always be None - // - // When a sync block is inserted to orphan pool, it means that - // there is an anomaly in the sync at tip + 1, i.e. a node is stuck, - // at which point it will be recorded as the timestamp at that time. - // - // If the time exceeds 1s, delete the task and halve the number of - // executable tasks for the corresponding node - if let Some(time) = timestamp { - if now > 1000 + *time { - for hash in hashes { - let state = states.remove(&hash).unwrap(); - if let Some(d) = blocks.get_mut(&state.peer) { - d.punish(); - d.hashes.remove(&hash); - }; - } - return true; - } - } - false - }) - .unwrap_or(false); - - if should_remove { - self.trace_number.remove(&(tip + 1)); - self.restart_number = tip + 1; + if self.restart_number != 0 && tip + 1 > self.restart_number { + self.restart_number = 0; } + let restart_number = &mut self.restart_number; + trace.retain(|key, time| { + // In the normal state, trace will always empty + // + // When the inflight request reaches the checkpoint(inflight > tip + 512), + // it means that there is an anomaly in the sync less than tip + 1, i.e. some nodes are stuck, + // at which point it will be recorded as the timestamp at that time. + // + // If the time exceeds 1s, delete the task and halve the number of + // executable tasks for the corresponding node + if now > 1000 + *time { + if let Some(state) = states.remove(key) { + if let Some(d) = download_schedulers.get_mut(&state.peer) { + d.punish(); + d.hashes.remove(key); + }; + } + if key.number > *restart_number { + *restart_number = key.number; + } + return false; + } + true + }); + if prev_count == 0 { metric!({ "topic": "blocks_in_flight", @@ -523,29 +568,23 @@ impl InflightBlocks { disconnect_list } - pub fn insert(&mut self, peer: PeerIndex, hash: Byte32, number: BlockNumber) -> bool { - let state = self.inflight_states.entry(hash.clone()); + pub fn insert(&mut self, peer: PeerIndex, block: BlockNumberAndHash) -> bool { + let state = self.inflight_states.entry(block.clone()); match state { Entry::Occupied(_entry) => return false, - Entry::Vacant(entry) => entry.insert(InflightState::new(peer, number)), + Entry::Vacant(entry) => entry.insert(InflightState::new(peer)), }; - let blocks = self + if self.restart_number >= block.number { + self.trace_number + .insert(block.clone(), unix_time_as_millis() + 500); + } + + let download_scheduler = self .download_schedulers .entry(peer) .or_insert_with(DownloadScheduler::default); - let ret = blocks.hashes.insert(hash.clone()); - - let trace_number = self - .trace_number - .entry(number) - .or_insert_with(|| (HashSet::default(), None)); - trace_number.0.insert(hash); - if self.restart_number == number { - trace_number.1 = Some(unix_time_as_millis() + 500); - self.restart_number = 0; - } - ret + download_scheduler.hashes.insert(block) } pub fn remove_by_peer(&mut self, peer: PeerIndex) -> bool { @@ -556,32 +595,33 @@ impl InflightBlocks { .remove(&peer) .map(|blocks| { for block in blocks.hashes { - compact.get_mut(&block).map(|peers| peers.remove(&peer)); - if let Some(state) = state.remove(&block) { - trace.entry(state.number).and_modify(|(trace, _)| { - trace.remove(&block); - }); + compact + .get_mut(&block.hash) + .map(|peers| peers.remove(&peer)); + state.remove(&block); + if !trace.is_empty() { + trace.remove(&block); } } }) .is_some() } - pub fn remove_by_block(&mut self, block: Byte32) -> bool { - let blocks = &mut self.download_schedulers; + pub fn remove_by_block(&mut self, block: BlockNumberAndHash) -> bool { + let download_schedulers = &mut self.download_schedulers; let trace = &mut self.trace_number; let compact = &mut self.compact_reconstruct_inflight; - let len = blocks.len() as u64; + let len = download_schedulers.len() as u64; self.inflight_states .remove(&block) .map(|state| { - if let Some(set) = blocks.get_mut(&state.peer) { + if let Some(set) = download_schedulers.get_mut(&state.peer) { set.hashes.remove(&block); - compact.remove(&block); + compact.remove(&block.hash); set.adjust(state.timestamp, len); - trace.entry(state.number).and_modify(|(trace, _)| { + if !trace.is_empty() { trace.remove(&block); - }); + } }; state.timestamp }) @@ -1236,7 +1276,8 @@ impl SyncState { // Return true when the block is that we have requested and received first time. pub fn new_block_received(&self, block: &core::BlockView) -> bool { - self.write_inflight_blocks().remove_by_block(block.hash()) + self.write_inflight_blocks() + .remove_by_block((block.number(), block.hash()).into()) } pub fn insert_inflight_proposals(&self, ids: Vec) -> Vec { diff --git a/test/src/specs/sync/block_sync.rs b/test/src/specs/sync/block_sync.rs index a9675a59d79..89e74fa2f23 100644 --- a/test/src/specs/sync/block_sync.rs +++ b/test/src/specs/sync/block_sync.rs @@ -1,5 +1,5 @@ use crate::utils::{ - build_block, build_get_blocks, build_header, new_block_with_template, sleep, wait_until, + build_block, build_get_blocks, build_header, new_block_with_template, wait_until, }; use crate::{Net, Node, Spec, TestProtocol}; use ckb_jsonrpc_types::ChainInfo; @@ -254,12 +254,6 @@ impl Spec for BlockSyncOrphanBlocks { let node0 = &net.nodes[0]; let node1 = &net.nodes[1]; net.exit_ibd_mode(); - net.connect(node0); - let (peer_id, _, _) = net - .receive_timeout(Duration::new(10, 0)) - .expect("net receive timeout"); - let rpc_client = node0.rpc_client(); - let tip_number = rpc_client.get_tip_block_number(); // Generate some blocks from node1 let mut blocks: Vec = (1..=5) @@ -270,13 +264,22 @@ impl Spec for BlockSyncOrphanBlocks { }) .collect(); + net.connect(node0); + let (peer_id, _, _) = net + .receive_timeout(Duration::new(10, 0)) + .expect("net receive timeout"); + let rpc_client = node0.rpc_client(); + let tip_number = rpc_client.get_tip_block_number(); + // Send headers to node0, keep blocks body blocks.iter().for_each(|block| { sync_header(&net, peer_id, block); }); // Wait for block fetch timer - sleep(5); + let (_, _, _) = net + .receive_timeout(Duration::new(10, 0)) + .expect("net receive timeout"); // Skip the next block, send the rest blocks to node0 let first = blocks.remove(0);