Skip to content

Commit

Permalink
chore: rewrite inflight state
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Apr 28, 2020
1 parent 94749db commit c58adb0
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 172 deletions.
14 changes: 7 additions & 7 deletions sync/src/synchronizer/block_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,11 @@ impl<'a> BlockFetcher<'a> {
// NOTE: NO-IBD Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
// stopping synchronization even when orphan_pool maintains dirty items by bugs.
// TODO: If async validation is achieved, then the IBD state judgement here can be removed
} else if inflight.insert(self.peer, hash, header.number()) {

// On IBD, BLOCK_RECEIVED means this block had been received, so this block doesn't need to fetch
// On NO-IBD, because of the model, this block has to be requested again
// But all of these can do nothing on this branch
} else if inflight.insert(self.peer, (header.number(), hash).into()) {
fetch.push(header)
}

Expand All @@ -175,15 +179,11 @@ impl<'a> BlockFetcher<'a> {
fetch.sort_by_key(|header| header.number());

let tip = self.active_chain.tip_number();
let should_mark = fetch.last().map_or(true, |header| {
let should_mark = fetch.last().map_or(false, |header| {
header.number().saturating_sub(crate::CHECK_POINT_WINDOW) > tip
});
if should_mark {
if let Some(entry) = inflight.trace_number.get_mut(&(tip + 1)) {
if entry.1.is_none() {
entry.1 = Some(faketime::unix_time_as_millis());
}
}
inflight.mark_slow_block(tip);
}

Some(
Expand Down
8 changes: 5 additions & 3 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,12 +424,13 @@ impl Synchronizer {
let disconnect_list = {
let mut list = self.shared().state().write_inflight_blocks().prune(tip);
if let IBDState::In = ibd {
// best known < tip and in IBD state, these node can be disconnect
// best known < tip and in IBD state, and unknown list is empty,
// these node can be disconnect
list.extend(
self.shared
.state()
.peers()
.get_best_known_less_than_tip(tip),
.get_best_known_less_than_tip_and_unknown_empty(tip),
)
};
list
Expand Down Expand Up @@ -491,9 +492,10 @@ impl Synchronizer {

trace!("poll find_blocks_to_fetch select peers");
// fetch use a lot of cpu time, especially in ibd state
// so in ibd, the fetch function use another thread
match nc.p2p_control() {
Some(raw) if ibd.into() => match self.fetch_channel {
Some(ref send) => send.send(FetchCMD::Fetch(peers)).unwrap(),
Some(ref sender) => sender.send(FetchCMD::Fetch(peers)).unwrap(),
None => {
let p2p_control = raw.clone();
let sync = self.clone();
Expand Down
120 changes: 60 additions & 60 deletions sync/src/tests/inflight_blocks.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::types::InflightBlocks;
use crate::types::{BlockNumberAndHash, InflightBlocks};
use crate::BLOCK_DOWNLOAD_TIMEOUT;
use ckb_types::prelude::*;
use ckb_types::{h256, H256};
Expand All @@ -10,24 +10,24 @@ fn inflight_blocks_count() {
let mut inflight_blocks = InflightBlocks::default();

// don't allow 2 peer for one block
assert!(inflight_blocks.insert(2.into(), h256!("0x1").pack(), 1));
assert!(!inflight_blocks.insert(1.into(), h256!("0x1").pack(), 1));
assert!(inflight_blocks.insert(2.into(), (1, h256!("0x1").pack()).into()));
assert!(!inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into()));

// peer 1 inflight
assert!(!inflight_blocks.insert(1.into(), h256!("0x1").pack(), 1));
assert!(!inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into()));

assert!(inflight_blocks.insert(1.into(), h256!("0x2").pack(), 2));
assert!(inflight_blocks.insert(1.into(), (2, h256!("0x2").pack()).into()));

assert_eq!(inflight_blocks.total_inflight_count(), 2); // 0x1 0x2
assert_eq!(inflight_blocks.peer_inflight_count(1.into()), 1);
assert_eq!(inflight_blocks.peer_inflight_count(2.into()), 1); // one block inflight
assert_eq!(
inflight_blocks.inflight_block_by_peer(1.into()).cloned(),
Some(HashSet::from_iter(vec![h256!("0x2").pack()]))
Some(HashSet::from_iter(vec![(2, h256!("0x2").pack()).into()]))
);

// receive block 0x1
inflight_blocks.remove_by_block(h256!("0x1").pack());
inflight_blocks.remove_by_block((1, h256!("0x1").pack()).into());

assert_eq!(inflight_blocks.total_inflight_count(), 1); // 0x2
assert_eq!(inflight_blocks.peer_inflight_count(1.into()), 1);
Expand All @@ -36,35 +36,35 @@ fn inflight_blocks_count() {
inflight_blocks
.inflight_block_by_peer(1.into())
.map(|set| set.iter().collect()),
Some(vec![&h256!("0x2").pack()])
Some(vec![&(2, h256!("0x2").pack()).into()])
);
}

#[test]
fn inflight_blocks_state() {
let mut inflight_blocks = InflightBlocks::default();

assert!(inflight_blocks.insert(1.into(), h256!("0x1").pack(), 1));
assert!(!inflight_blocks.insert(2.into(), h256!("0x1").pack(), 1));
assert!(!inflight_blocks.insert(3.into(), h256!("0x1").pack(), 1));
assert!(inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into()));
assert!(!inflight_blocks.insert(2.into(), (1, h256!("0x1").pack()).into()));
assert!(!inflight_blocks.insert(3.into(), (1, h256!("0x1").pack()).into()));

// peer 1 inflight
assert!(!inflight_blocks.insert(1.into(), h256!("0x1").pack(), 1));
assert!(inflight_blocks.insert(1.into(), h256!("0x2").pack(), 2));
assert!(!inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into()));
assert!(inflight_blocks.insert(1.into(), (2, h256!("0x2").pack()).into()));

assert!(inflight_blocks.insert(3.into(), h256!("0x3").pack(), 3));
assert!(inflight_blocks.insert(3.into(), (3, h256!("0x3").pack()).into()));

assert_eq!(
inflight_blocks
.inflight_state_by_block(&h256!("0x1").pack())
.inflight_state_by_block(&(1, h256!("0x1").pack()).into())
.cloned()
.map(|state| { state.peer }),
Some(1.into())
);

assert_eq!(
inflight_blocks
.inflight_state_by_block(&h256!("0x3").pack())
.inflight_state_by_block(&(3, h256!("0x3").pack()).into())
.map(|state| state.peer),
Some(3.into())
);
Expand All @@ -75,14 +75,14 @@ fn inflight_blocks_state() {

assert_eq!(
inflight_blocks
.inflight_state_by_block(&h256!("0x1").pack())
.inflight_state_by_block(&(1, h256!("0x1").pack()).into())
.map(|state| state.peer),
None
);

assert_eq!(
inflight_blocks
.inflight_state_by_block(&h256!("0x3").pack())
.inflight_state_by_block(&(3, h256!("0x3").pack()).into())
.map(|state| state.peer),
Some(3.into())
);
Expand All @@ -95,43 +95,43 @@ fn inflight_blocks_timeout() {
faketime::enable(&faketime_file);
let mut inflight_blocks = InflightBlocks::default();

assert!(inflight_blocks.insert(1.into(), h256!("0x1").pack(), 1));
assert!(inflight_blocks.insert(1.into(), h256!("0x2").pack(), 2));
assert!(inflight_blocks.insert(2.into(), h256!("0x3").pack(), 3));
assert!(!inflight_blocks.insert(1.into(), h256!("0x3").pack(), 3));
assert!(inflight_blocks.insert(1.into(), h256!("0x4").pack(), 4));
assert!(inflight_blocks.insert(2.into(), h256!("0x5").pack(), 5));
assert!(!inflight_blocks.insert(2.into(), h256!("0x5").pack(), 5));
assert!(inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into()));
assert!(inflight_blocks.insert(1.into(), (2, h256!("0x2").pack()).into()));
assert!(inflight_blocks.insert(2.into(), (3, h256!("0x3").pack()).into()));
assert!(!inflight_blocks.insert(1.into(), (3, h256!("0x3").pack()).into()));
assert!(inflight_blocks.insert(1.into(), (4, h256!("0x4").pack()).into()));
assert!(inflight_blocks.insert(2.into(), (5, h256!("0x5").pack()).into()));
assert!(!inflight_blocks.insert(2.into(), (5, h256!("0x5").pack()).into()));

faketime::write_millis(&faketime_file, BLOCK_DOWNLOAD_TIMEOUT + 1).expect("write millis");

assert!(!inflight_blocks.insert(3.into(), h256!("0x3").pack(), 3));
assert!(!inflight_blocks.insert(3.into(), h256!("0x2").pack(), 2));
assert!(inflight_blocks.insert(4.into(), h256!("0x6").pack(), 6));
assert!(inflight_blocks.insert(1.into(), h256!("0x7").pack(), 7));
assert!(!inflight_blocks.insert(3.into(), (3, h256!("0x3").pack()).into()));
assert!(!inflight_blocks.insert(3.into(), (2, h256!("0x2").pack()).into()));
assert!(inflight_blocks.insert(4.into(), (6, h256!("0x6").pack()).into()));
assert!(inflight_blocks.insert(1.into(), (7, h256!("0x7").pack()).into()));

let peers = inflight_blocks.prune(0);
assert_eq!(peers, HashSet::from_iter(vec![1.into(), 2.into()]));
assert!(inflight_blocks.insert(3.into(), h256!("0x2").pack(), 2));
assert!(inflight_blocks.insert(3.into(), h256!("0x3").pack(), 3));
assert!(inflight_blocks.insert(3.into(), (2, h256!("0x2").pack()).into()));
assert!(inflight_blocks.insert(3.into(), (3, h256!("0x3").pack()).into()));

assert_eq!(
inflight_blocks
.inflight_state_by_block(&h256!("0x3").pack())
.inflight_state_by_block(&(3, h256!("0x3").pack()).into())
.map(|state| state.peer),
Some(3.into())
);

assert_eq!(
inflight_blocks
.inflight_state_by_block(&h256!("0x2").pack())
.inflight_state_by_block(&(2, h256!("0x2").pack()).into())
.map(|state| state.peer),
Some(3.into())
);

assert_eq!(
inflight_blocks
.inflight_state_by_block(&h256!("0x6").pack())
.inflight_state_by_block(&(6, h256!("0x6").pack()).into())
.cloned()
.map(|state| state.peer),
Some(4.into())
Expand All @@ -146,49 +146,49 @@ fn inflight_trace_number_state() {

let mut inflight_blocks = InflightBlocks::default();

assert!(inflight_blocks.insert(1.into(), h256!("0x1").pack(), 1));
assert!(inflight_blocks.insert(2.into(), h256!("0x2").pack(), 2));
assert!(inflight_blocks.insert(3.into(), h256!("0x3").pack(), 3));
assert!(inflight_blocks.insert(4.into(), h256!("0x33").pack(), 3));
assert!(inflight_blocks.insert(5.into(), h256!("0x4").pack(), 4));
assert!(inflight_blocks.insert(6.into(), h256!("0x5").pack(), 5));
assert!(inflight_blocks.insert(7.into(), h256!("0x55").pack(), 5));
assert!(inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into()));
assert!(inflight_blocks.insert(2.into(), (2, h256!("0x2").pack()).into()));
assert!(inflight_blocks.insert(3.into(), (3, h256!("0x3").pack()).into()));
assert!(inflight_blocks.insert(4.into(), (3, h256!("0x33").pack()).into()));
assert!(inflight_blocks.insert(5.into(), (4, h256!("0x4").pack()).into()));
assert!(inflight_blocks.insert(6.into(), (5, h256!("0x5").pack()).into()));
assert!(inflight_blocks.insert(7.into(), (5, h256!("0x55").pack()).into()));

let list = inflight_blocks.prune(2);
assert!(list.is_empty());

let (next_number, (blocks, time)) = inflight_blocks.trace_number.iter().next().unwrap();
assert!(inflight_blocks.trace_number.is_empty());
assert!(inflight_blocks.restart_number == 0);

// When 2 + 512 number block request send out
inflight_blocks.mark_slow_block(2);

assert_eq!(next_number, &3);
assert_eq!(
blocks,
&HashSet::from_iter(vec![h256!("0x3").pack(), h256!("0x33").pack()])
inflight_blocks
.trace_number
.keys()
.cloned()
.collect::<HashSet<BlockNumberAndHash>>(),
HashSet::from_iter(vec![
(1, h256!("0x1").pack()).into(),
(2, h256!("0x2").pack()).into(),
(3, h256!("0x3").pack()).into(),
(3, h256!("0x33").pack()).into()
])
);
assert!(time.is_none());

// When an orphan block is inserted
{
if let Some((_, time)) = inflight_blocks.trace_number.get_mut(&3) {
*time = Some(faketime::unix_time_as_millis())
}
}

faketime::write_millis(&faketime_file, 2000).expect("write millis");

let list = inflight_blocks.prune(2);
assert!(list.is_empty());

let (next_number, (blocks, time)) = inflight_blocks.trace_number.iter().next().unwrap();

assert_eq!(next_number, &4);
assert_eq!(blocks, &HashSet::from_iter(vec![h256!("0x4").pack()]));
assert!(time.is_none());
assert!(inflight_blocks.restart_number == 3);

assert!(inflight_blocks
.inflight_state_by_block(&h256!("0x3").pack())
.inflight_state_by_block(&(3, h256!("0x3").pack()).into())
.is_none());
assert!(inflight_blocks
.inflight_state_by_block(&h256!("0x33").pack())
.inflight_state_by_block(&(3, h256!("0x33").pack()).into())
.is_none());

assert_eq!(inflight_blocks.peer_can_fetch_count(3.into()), 8);
Expand Down
Loading

0 comments on commit c58adb0

Please sign in to comment.