Skip to content

Commit

Permalink
feat: Increase the maximum number of tasks and record punish points
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Apr 16, 2020
1 parent 8b9db25 commit 03c0cd9
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 40 deletions.
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ pub const MAX_INVENTORY_LEN: usize = 50_000;
pub const MAX_SCHEDULED_LEN: usize = 4 * 1024;
pub const MAX_BLOCKS_TO_ANNOUNCE: usize = 8;
pub const MAX_UNCONNECTING_HEADERS: usize = 10;
pub const MAX_BLOCKS_IN_TRANSIT_PER_PEER: usize = 16;
pub const MAX_TIP_AGE: u64 = 24 * 60 * 60 * 1000;
pub const STALE_RELAY_AGE_LIMIT: u64 = 30 * 24 * 60 * 60 * 1000;

/* About Download Scheduler */
pub const INIT_BLOCKS_IN_TRANSIT_PER_PEER: usize = 16;
pub const FIRST_LEVEL_MAX: usize = 32;
pub const MAX_BLOCKS_IN_TRANSIT_PER_PEER: usize = 128;
pub const CHECK_POINT_WINDOW: u64 = (MAX_BLOCKS_IN_TRANSIT_PER_PEER * 4) as u64;

pub(crate) const LOG_TARGET_RELAY: &str = "ckb-relay";

use ckb_network::ProtocolId;
Expand Down
27 changes: 20 additions & 7 deletions sync/src/synchronizer/block_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,16 @@ impl<'a> BlockFetcher<'a> {
for _ in 0..span {
let parent_hash = header.parent_hash();
let hash = header.hash();
// NOTE: Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
// stopping synchronization even when orphan_pool maintains dirty items by bugs.
let stored = self
.active_chain
.contains_block_status(&hash, BlockStatus::BLOCK_STORED);
if stored {

let status = self.active_chain.get_block_status(&hash);
if status == BlockStatus::BLOCK_STORED {
// If the block is stored, its ancestor must on store
// So we can skip the search of this space directly
break;
} else if self.ibd.into() && status.contains(BlockStatus::BLOCK_RECEIVED) {
// NOTE: NO-IBD Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
// stopping synchronization even when orphan_pool maintains dirty items by bugs.
// TODO: If async validation is achieved, then the IBD state judgement here can be removed
} else if inflight.insert(self.peer, hash, header.number()) {
fetch.push(header)
}
Expand All @@ -173,9 +174,21 @@ impl<'a> BlockFetcher<'a> {
// The headers in `fetch` may be unordered. Sort them by number.
fetch.sort_by_key(|header| header.number());

let tip = self.active_chain.tip_number();
let should_mark = fetch.last().map_or(true, |header| {
header.number().saturating_sub(crate::CHECK_POINT_WINDOW) > tip
});
if should_mark {
if let Some(entry) = inflight.trace_number.get_mut(&(tip + 1)) {
if entry.1.is_none() {
entry.1 = Some(faketime::unix_time_as_millis());
}
}
}

Some(
fetch
.chunks(crate::MAX_BLOCKS_IN_TRANSIT_PER_PEER)
.chunks(crate::INIT_BLOCKS_IN_TRANSIT_PER_PEER)
.map(|headers| headers.iter().map(core::HeaderView::hash).collect())
.collect(),
)
Expand Down
6 changes: 3 additions & 3 deletions sync/src/synchronizer/get_blocks_process.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::block_status::BlockStatus;
use crate::synchronizer::Synchronizer;
use crate::{Status, StatusCode, MAX_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN};
use crate::{Status, StatusCode, INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN};
use ckb_logger::debug;
use ckb_network::{CKBProtocolContext, PeerIndex};
use ckb_types::{packed, prelude::*};
Expand Down Expand Up @@ -29,7 +29,7 @@ impl<'a> GetBlocksProcess<'a> {

pub fn execute(self) -> Status {
let block_hashes = self.message.block_hashes();
// use MAX_HEADERS_LEN as limit, we may increase the value of MAX_BLOCKS_IN_TRANSIT_PER_PEER in the future
// use MAX_HEADERS_LEN as limit, we may increase the value of INIT_BLOCKS_IN_TRANSIT_PER_PEER in the future
if block_hashes.len() > MAX_HEADERS_LEN {
return StatusCode::ProtocolMessageIsMalformed.with_context(format!(
"BlockHashes count({}) > MAX_HEADERS_LEN({})",
Expand All @@ -39,7 +39,7 @@ impl<'a> GetBlocksProcess<'a> {
}
let active_chain = self.synchronizer.shared.active_chain();

for block_hash in block_hashes.iter().take(MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
for block_hash in block_hashes.iter().take(INIT_BLOCKS_IN_TRANSIT_PER_PEER) {
debug!("get_blocks {} from peer {:?}", block_hash, self.peer);
let block_hash = block_hash.to_entity();

Expand Down
4 changes: 2 additions & 2 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,9 +483,9 @@ impl Synchronizer {
peers.sort_by_key(|id| {
state
.get(id)
.map(|d| d.task_count())
.unwrap_or(crate::MAX_BLOCKS_IN_TRANSIT_PER_PEER)
.map_or(crate::INIT_BLOCKS_IN_TRANSIT_PER_PEER, |d| d.task_count())
});
peers.reverse();
peers
};

Expand Down
49 changes: 29 additions & 20 deletions sync/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use crate::orphan_block_pool::OrphanBlockPool;
use crate::BLOCK_DOWNLOAD_TIMEOUT;
use crate::{NetworkProtocol, SUSPEND_SYNC_TIME};
use crate::{
MAX_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN, MAX_TIP_AGE, RETRY_ASK_TX_TIMEOUT_INCREASE,
FIRST_LEVEL_MAX, INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_BLOCKS_IN_TRANSIT_PER_PEER,
MAX_HEADERS_LEN, MAX_TIP_AGE, RETRY_ASK_TX_TIMEOUT_INCREASE,
};
use ckb_chain::chain::ChainController;
use ckb_chain_spec::consensus::Consensus;
Expand Down Expand Up @@ -270,14 +271,16 @@ impl InflightState {
pub struct DownloadScheduler {
task_count: usize,
timeout_count: usize,
breakthroughs_count: usize,
hashes: HashSet<Byte32>,
}

impl Default for DownloadScheduler {
fn default() -> Self {
Self {
hashes: HashSet::default(),
task_count: MAX_BLOCKS_IN_TRANSIT_PER_PEER,
task_count: INIT_BLOCKS_IN_TRANSIT_PER_PEER,
breakthroughs_count: 0,
timeout_count: 0,
}
}
Expand Down Expand Up @@ -308,13 +311,17 @@ impl DownloadScheduler {
// So the adjustment to reduce the number of tasks will be calculated by modulo 3 with the actual number of triggers.
match ((now - time) / a).saturating_sub(b * 100) {
0..=500 => {
if self.task_count < 31 {
if self.task_count < FIRST_LEVEL_MAX - 1 {
self.task_count += 2
} else {
self.breakthroughs_count += 1
}
}
501..=1000 => {
if self.task_count < 32 {
if self.task_count < FIRST_LEVEL_MAX {
self.task_count += 1
} else {
self.breakthroughs_count += 1
}
}
1001..=1500 => (),
Expand All @@ -326,6 +333,12 @@ impl DownloadScheduler {
}
}
}
if self.breakthroughs_count > 4 {
if self.task_count < MAX_BLOCKS_IN_TRANSIT_PER_PEER {
self.task_count += 1;
}
self.breakthroughs_count = 0;
}
}

fn punish(&mut self) {
Expand All @@ -339,6 +352,7 @@ pub struct InflightBlocks {
inflight_states: HashMap<Byte32, InflightState>,
pub(crate) trace_number: BTreeMap<BlockNumber, (HashSet<Byte32>, Option<u64>)>,
compact_reconstruct_inflight: HashMap<Byte32, HashSet<PeerIndex>>,
restart_number: BlockNumber,
}

impl Default for InflightBlocks {
Expand All @@ -348,6 +362,7 @@ impl Default for InflightBlocks {
inflight_states: HashMap::default(),
trace_number: BTreeMap::default(),
compact_reconstruct_inflight: HashMap::default(),
restart_number: 0,
}
}
}
Expand Down Expand Up @@ -398,10 +413,10 @@ impl InflightBlocks {
}

pub fn peer_can_fetch_count(&self, peer: PeerIndex) -> usize {
self.download_schedulers
.get(&peer)
.map(DownloadScheduler::can_fetch)
.unwrap_or(MAX_BLOCKS_IN_TRANSIT_PER_PEER)
self.download_schedulers.get(&peer).map_or(
INIT_BLOCKS_IN_TRANSIT_PER_PEER,
DownloadScheduler::can_fetch,
)
}

pub fn inflight_block_by_peer(&self, peer: PeerIndex) -> Option<&HashSet<Byte32>> {
Expand Down Expand Up @@ -473,7 +488,7 @@ impl InflightBlocks {
// If the time exceeds 1s, delete the task and halve the number of
// executable tasks for the corresponding node
if let Some(time) = timestamp {
if now - *time > 1000 {
if now > 1000 + *time {
for hash in hashes {
let state = states.remove(&hash).unwrap();
if let Some(d) = blocks.get_mut(&state.peer) {
Expand All @@ -490,6 +505,7 @@ impl InflightBlocks {

if should_remove {
self.trace_number.remove(&(tip + 1));
self.restart_number = tip + 1;
}

if prev_count == 0 {
Expand Down Expand Up @@ -525,6 +541,10 @@ impl InflightBlocks {
.entry(number)
.or_insert_with(|| (HashSet::default(), None));
trace_number.0.insert(hash);
if self.restart_number == number {
trace_number.1 = Some(unix_time_as_millis() + 500);
self.restart_number = 0;
}
ret
}

Expand Down Expand Up @@ -914,17 +934,6 @@ impl SyncShared {
block.header().hash()
);
self.state.insert_orphan_block(pi, (*block).clone());
let tip = self.shared.snapshot().tip_number();
if let Some(entry) = self
.state()
.write_inflight_blocks()
.trace_number
.get_mut(&(tip + 1))
{
if entry.1.is_none() {
entry.1 = Some(unix_time_as_millis());
}
}
return Ok(false);
}

Expand Down

0 comments on commit 03c0cd9

Please sign in to comment.