Skip to content

Commit

Permalink
feat: Increase the maximum number of tasks and record punish points
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Apr 13, 2020
1 parent a1174ef commit 990661d
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 18 deletions.
27 changes: 21 additions & 6 deletions sync/src/synchronizer/block_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,17 @@ impl<'a> BlockFetcher<'a> {
for _ in 0..span {
let parent_hash = header.parent_hash();
let hash = header.hash();
// NOTE: Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
// stopping synchronization even when orphan_pool maintains dirty items by bugs.
let stored = self
.active_chain
.contains_block_status(&hash, BlockStatus::BLOCK_STORED);
if stored {

let status = self.active_chain.get_block_status(&hash);
if status == BlockStatus::BLOCK_STORED {
// If the block is stored, its ancestor must on store
// So we can skip the search of this space directly
break;
} else if self.ibd.into() && status.contains(BlockStatus::BLOCK_RECEIVED) {
// NOTE: NO-IBD Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
// stopping synchronization even when orphan_pool maintains dirty items by bugs.
// TODO: If async validation is achieved, then the IBD state judgement here can be removed
()
} else if inflight.insert(self.peer, hash, header.number()) {
fetch.push(header)
}
Expand All @@ -173,6 +175,19 @@ impl<'a> BlockFetcher<'a> {
// The headers in `fetch` may be unordered. Sort them by number.
fetch.sort_by_key(|header| header.number());

let tip = self.active_chain.tip_number();
if fetch
.last()
.map(|header| header.number().saturating_sub(512) > tip)
.unwrap_or(false)
{
if let Some(entry) = inflight.trace_number.get_mut(&(tip + 1)) {
if entry.1.is_none() {
entry.1 = Some(faketime::unix_time_as_millis());
}
}
}

Some(
fetch
.chunks(crate::MAX_BLOCKS_IN_TRANSIT_PER_PEER)
Expand Down
1 change: 1 addition & 0 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ impl Synchronizer {
.map(|d| d.task_count())
.unwrap_or(crate::MAX_BLOCKS_IN_TRANSIT_PER_PEER)
});
peers.reverse();
peers
};

Expand Down
32 changes: 20 additions & 12 deletions sync/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ impl InflightState {
pub struct DownloadScheduler {
task_count: usize,
timeout_count: usize,
breakthroughs_count: usize,
hashes: HashSet<Byte32>,
}

Expand All @@ -278,6 +279,7 @@ impl Default for DownloadScheduler {
Self {
hashes: HashSet::default(),
task_count: MAX_BLOCKS_IN_TRANSIT_PER_PEER,
breakthroughs_count: 0,
timeout_count: 0,
}
}
Expand Down Expand Up @@ -310,11 +312,15 @@ impl DownloadScheduler {
0..=500 => {
if self.task_count < 31 {
self.task_count += 2
} else {
self.breakthroughs_count += 1
}
}
501..=1000 => {
if self.task_count < 32 {
self.task_count += 1
} else {
self.breakthroughs_count += 1
}
}
1001..=1500 => (),
Expand All @@ -326,6 +332,12 @@ impl DownloadScheduler {
}
}
}
if self.breakthroughs_count > 4 {
if self.task_count < 128 {
self.task_count += 1;
}
self.breakthroughs_count = 0;
}
}

fn punish(&mut self) {
Expand All @@ -339,6 +351,7 @@ pub struct InflightBlocks {
inflight_states: HashMap<Byte32, InflightState>,
pub(crate) trace_number: BTreeMap<BlockNumber, (HashSet<Byte32>, Option<u64>)>,
compact_reconstruct_inflight: HashMap<Byte32, HashSet<PeerIndex>>,
restart_number: BlockNumber,
}

impl Default for InflightBlocks {
Expand All @@ -348,6 +361,7 @@ impl Default for InflightBlocks {
inflight_states: HashMap::default(),
trace_number: BTreeMap::default(),
compact_reconstruct_inflight: HashMap::default(),
restart_number: 0,
}
}
}
Expand Down Expand Up @@ -473,7 +487,7 @@ impl InflightBlocks {
// If the time exceeds 1s, delete the task and halve the number of
// executable tasks for the corresponding node
if let Some(time) = timestamp {
if now - *time > 1000 {
if now > 1000 + *time {
for hash in hashes {
let state = states.remove(&hash).unwrap();
if let Some(d) = blocks.get_mut(&state.peer) {
Expand All @@ -490,6 +504,7 @@ impl InflightBlocks {

if should_remove {
self.trace_number.remove(&(tip + 1));
self.restart_number = tip + 1;
}

if prev_count == 0 {
Expand Down Expand Up @@ -525,6 +540,10 @@ impl InflightBlocks {
.entry(number)
.or_insert_with(|| (HashSet::default(), None));
trace_number.0.insert(hash);
if self.restart_number == number {
trace_number.1 = Some(unix_time_as_millis() + 500);
self.restart_number = 0;
}
ret
}

Expand Down Expand Up @@ -914,17 +933,6 @@ impl SyncShared {
block.header().hash()
);
self.state.insert_orphan_block(pi, (*block).clone());
let tip = self.shared.snapshot().tip_number();
if let Some(entry) = self
.state()
.write_inflight_blocks()
.trace_number
.get_mut(&(tip + 1))
{
if entry.1.is_none() {
entry.1 = Some(unix_time_as_millis());
}
}
return Ok(false);
}

Expand Down

0 comments on commit 990661d

Please sign in to comment.