Skip to content

Commit

Permalink
Merge #1999
Browse files Browse the repository at this point in the history
1999: feat: Optimize block download tasks with a simple task scheduler r=quake,doitian a=driftluo

This implementation aims to optimize the task scheduling of the download block. 

It contains a simple task counter to allocate the number of tasks for each node, record and filter the relatively good nodes for download.

After about a week of testing and continuous adjustments, the current PR data is relatively satisfactory, but the possibility of continued adjustments in the future is not ruled out

This PR changes a number of things, including but not limited:
1. Raise the maximum inflight block limit per node to 32-128, but the default is 16, and dynamically adjust this data
2. Remove redundant designs where the same block can be requested from two nodes
3. ~When inserting a orphan block, the countdown for 1 second at the tip + 1 corresponding to `trace_number`, if still not completed, clear the task and send it to another node for download (exponentially decreasing the task limit of the corresponding node)~
4. Split the `getBlockTransaction` task from the `getBlocks` task, keeping the design that getBlockTransaction can request from 2 nodes
5. Clearing out nodes that are peer_best_known < tip in IBD time
6. Separating the `block fetch` process
7. Clearing nodes that do not respond to `getblock` requests for 30 seconds
8. mark timeout on  all `< tip +1` block request if request window > tip + 512
9. Reduce the consumption of checking the maximum timeout time, from check all inflight to check all Less than tip + 20

Test machine configuration: 
2 core 8G RAM 
IP Location on Hong Kong
```
$ cat /proc/cpuinfo | grep name | cut -f2 -d: | uniq -c
2  Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
```

before:

| net state | outbound | average speed | CPU occupancy | Bandwidth consumption
| - | - | - | - | - |
| relatively good | 8 peer | 102-120 block/s | average 70.49%, max 92.77% | average 4 Mbps, max 7.12 Mbps
| relatively poor | 8 peer | 93-100 block/s | average 49.99%, max 74.37% | average 4 Mbps, max 7.12 Mbps

after:

| net state | outbound | average speed | CPU occupancy | Bandwidth consumption
| - | - | - | - | - |
| relatively good | 8 peer | 219-240 block/s | average 78.34%, max 93.17% | average 2Mbps, max 5.52 Mbps
| relatively poor | 8 peer | 200-220 block/s | average 70.06%, max 85.83% | average 3 Mbps, max 19.55 Mbps


Co-authored-by: driftluo <[email protected]>
  • Loading branch information
bors[bot] and driftluo authored Apr 28, 2020
2 parents 3812ef0 + bf0583e commit 573883f
Show file tree
Hide file tree
Showing 15 changed files with 734 additions and 196 deletions.
15 changes: 8 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions network/src/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ pub trait CKBProtocolContext: Send {
fn send_paused(&self) -> bool;
// Other methods
fn protocol_id(&self) -> ProtocolId;
fn p2p_control(&self) -> Option<&ServiceControl> {
None
}
}

pub trait CKBProtocolHandler: Sync + Send {
Expand Down Expand Up @@ -399,6 +402,10 @@ impl CKBProtocolContext for DefaultCKBProtocolContext {
fn send_paused(&self) -> bool {
self.send_paused
}

fn p2p_control(&self) -> Option<&ServiceControl> {
Some(&self.p2p_control)
}
}

pub(crate) struct BlockingFutureTask {
Expand Down
1 change: 1 addition & 0 deletions sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ sentry = "0.16.0"
futures = "0.3"
ckb-error = {path = "../error"}
ckb-tx-pool = { path = "../tx-pool" }
crossbeam-channel = "0.3"

[dev-dependencies]
ckb-test-chain-utils = { path = "../util/test-chain-utils" }
Expand Down
8 changes: 6 additions & 2 deletions sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ pub const MAX_INVENTORY_LEN: usize = 50_000;
pub const MAX_SCHEDULED_LEN: usize = 4 * 1024;
pub const MAX_BLOCKS_TO_ANNOUNCE: usize = 8;
pub const MAX_UNCONNECTING_HEADERS: usize = 10;
pub const MAX_BLOCKS_IN_TRANSIT_PER_PEER: usize = 16;
pub const MAX_TIP_AGE: u64 = 24 * 60 * 60 * 1000;
pub const STALE_RELAY_AGE_LIMIT: u64 = 30 * 24 * 60 * 60 * 1000;

/* About Download Scheduler */
pub const INIT_BLOCKS_IN_TRANSIT_PER_PEER: usize = 16;
pub const FIRST_LEVEL_MAX: usize = 32;
pub const MAX_BLOCKS_IN_TRANSIT_PER_PEER: usize = 128;
pub const CHECK_POINT_WINDOW: u64 = (MAX_BLOCKS_IN_TRANSIT_PER_PEER * 4) as u64;

pub(crate) const LOG_TARGET_RELAY: &str = "ckb-relay";

use ckb_network::ProtocolId;
Expand All @@ -51,7 +56,6 @@ impl Into<ProtocolId> for NetworkProtocol {
pub const HEADERS_DOWNLOAD_TIMEOUT_BASE: u64 = 6 * 60 * 1000; // 6 minutes
pub const HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER: u64 = 1; // 1ms/header
pub const POW_SPACE: u64 = 10_000; // 10s
pub const MAX_PEERS_PER_BLOCK: usize = 2;

// Protect at least this many outbound peers from disconnection due to slow
// behind headers chain.
Expand Down
34 changes: 24 additions & 10 deletions sync/src/relayer/block_transactions_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ impl<'a> BlockTransactionsProcess<'a> {
let missing_uncles: Vec<u32>;
let mut collision = false;

{
self.relayer
.shared
.state()
.write_inflight_blocks()
.remove_compact(self.peer, &block_hash);
}

if let Entry::Occupied(mut pending) = shared
.state()
.pending_compact_blocks()
Expand Down Expand Up @@ -135,18 +143,24 @@ impl<'a> BlockTransactionsProcess<'a> {

assert!(!missing_transactions.is_empty() || !missing_uncles.is_empty());

let content = packed::GetBlockTransactions::new_builder()
.block_hash(block_hash.clone())
.indexes(missing_transactions.pack())
.uncle_indexes(missing_uncles.pack())
.build();
let message = packed::RelayMessage::new_builder().set(content).build();
if shared
.state()
.write_inflight_blocks()
.compact_reconstruct(self.peer, block_hash.clone())
{
let content = packed::GetBlockTransactions::new_builder()
.block_hash(block_hash.clone())
.indexes(missing_transactions.pack())
.uncle_indexes(missing_uncles.pack())
.build();
let message = packed::RelayMessage::new_builder().set(content).build();

if let Err(err) = self.nc.send_message_to(self.peer, message.as_bytes()) {
return StatusCode::Network
.with_context(format!("Send GetBlockTransactions error: {:?}", err,));
if let Err(err) = self.nc.send_message_to(self.peer, message.as_bytes()) {
return StatusCode::Network
.with_context(format!("Send GetBlockTransactions error: {:?}", err,));
}
crate::relayer::log_sent_metric(message.to_enum().item_name());
}
crate::relayer::log_sent_metric(message.to_enum().item_name());

mem::replace(expected_transaction_indexes, missing_transactions);
mem::replace(expected_uncle_indexes, missing_uncles);
Expand Down
8 changes: 4 additions & 4 deletions sync/src/relayer/compact_block_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ impl<'a> CompactBlockProcess<'a> {

let parent = parent.unwrap();

if let Some(flight) = shared
if let Some(peers) = shared
.state()
.read_inflight_blocks()
.inflight_state_by_block(&block_hash)
.inflight_compact_by_block(&block_hash)
{
if flight.peers.contains(&self.peer) {
if peers.contains(&self.peer) {
debug_target!(
crate::LOG_TARGET_RELAY,
"discard already in-flight compact block {}",
Expand Down Expand Up @@ -240,7 +240,7 @@ impl<'a> CompactBlockProcess<'a> {
if !shared
.state()
.write_inflight_blocks()
.insert(self.peer, block_hash.clone())
.compact_reconstruct(self.peer, block_hash.clone())
{
debug_target!(
crate::LOG_TARGET_RELAY,
Expand Down
7 changes: 3 additions & 4 deletions sync/src/relayer/tests/compact_block_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::block_status::BlockStatus;
use crate::relayer::compact_block_process::CompactBlockProcess;
use crate::relayer::tests::helper::{build_chain, new_header_builder, MockProtocalContext};
use crate::types::InflightBlocks;
use crate::MAX_PEERS_PER_BLOCK;
use crate::{NetworkProtocol, Status, StatusCode};
use ckb_network::PeerIndex;
use ckb_store::ChainStore;
Expand Down Expand Up @@ -201,7 +200,7 @@ fn test_already_in_flight() {

// Already in flight
let mut in_flight_blocks = InflightBlocks::default();
in_flight_blocks.insert(peer_index, block.header().hash());
in_flight_blocks.compact_reconstruct(peer_index, block.header().hash());
*relayer.shared.state().write_inflight_blocks() = in_flight_blocks;

let compact_block_process = CompactBlockProcess::new(
Expand Down Expand Up @@ -349,8 +348,8 @@ fn test_inflight_blocks_reach_limit() {
// in_flight_blocks is full
{
let mut in_flight_blocks = InflightBlocks::default();
for i in 0..=MAX_PEERS_PER_BLOCK {
in_flight_blocks.insert(i.into(), block.header().hash());
for i in 0..=2 {
in_flight_blocks.compact_reconstruct(i.into(), block.header().hash());
}
*relayer.shared.state().write_inflight_blocks() = in_flight_blocks;
}
Expand Down
51 changes: 35 additions & 16 deletions sync/src/synchronizer/block_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use crate::block_status::BlockStatus;
use crate::synchronizer::Synchronizer;
use crate::types::{ActiveChain, HeaderView, IBDState};
use crate::{BLOCK_DOWNLOAD_WINDOW, MAX_BLOCKS_IN_TRANSIT_PER_PEER};
use crate::BLOCK_DOWNLOAD_WINDOW;
use ckb_logger::{debug, trace};
use ckb_network::PeerIndex;
use ckb_types::{core, packed};
use std::cmp::min;

pub struct BlockFetcher {
synchronizer: Synchronizer,
pub struct BlockFetcher<'a> {
synchronizer: &'a Synchronizer,
peer: PeerIndex,
active_chain: ActiveChain,
ibd: IBDState,
}

impl BlockFetcher {
pub fn new(synchronizer: Synchronizer, peer: PeerIndex, ibd: IBDState) -> Self {
impl<'a> BlockFetcher<'a> {
pub fn new(synchronizer: &'a Synchronizer, peer: PeerIndex, ibd: IBDState) -> Self {
let active_chain = synchronizer.shared.active_chain();
BlockFetcher {
peer,
Expand All @@ -29,7 +29,7 @@ impl BlockFetcher {
let inflight = self.synchronizer.shared().state().read_inflight_blocks();

// Can't download any more from this peer
inflight.peer_inflight_count(self.peer) >= MAX_BLOCKS_IN_TRANSIT_PER_PEER
inflight.peer_can_fetch_count(self.peer) == 0
}

pub fn is_better_chain(&self, header: &HeaderView) -> bool {
Expand Down Expand Up @@ -69,7 +69,7 @@ impl BlockFetcher {
Some(fixed_last_common_header)
}

pub fn fetch(self) -> Option<Vec<packed::Byte32>> {
pub fn fetch(self) -> Option<Vec<Vec<packed::Byte32>>> {
trace!("[block downloader] BlockFetcher process");

if self.reached_inflight_limit() {
Expand Down Expand Up @@ -130,7 +130,7 @@ impl BlockFetcher {
let end = min(best_known_header.number(), start + BLOCK_DOWNLOAD_WINDOW);
let n_fetch = min(
end.saturating_sub(start) as usize + 1,
MAX_BLOCKS_IN_TRANSIT_PER_PEER.saturating_sub(inflight.peer_inflight_count(self.peer)),
inflight.peer_can_fetch_count(self.peer),
);
let mut fetch = Vec::with_capacity(n_fetch);

Expand All @@ -146,16 +146,21 @@ impl BlockFetcher {
for _ in 0..span {
let parent_hash = header.parent_hash();
let hash = header.hash();
// NOTE: Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
// stopping synchronization even when orphan_pool maintains dirty items by bugs.
let stored = self
.active_chain
.contains_block_status(&hash, BlockStatus::BLOCK_STORED);
if stored {

let status = self.active_chain.get_block_status(&hash);
if status == BlockStatus::BLOCK_STORED {
// If the block is stored, its ancestor must on store
// So we can skip the search of this space directly
break;
} else if inflight.insert(self.peer, hash) {
} else if self.ibd.into() && status.contains(BlockStatus::BLOCK_RECEIVED) {
// NOTE: NO-IBD Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
// stopping synchronization even when orphan_pool maintains dirty items by bugs.
// TODO: If async validation is achieved, then the IBD state judgement here can be removed

// On IBD, BLOCK_RECEIVED means this block had been received, so this block doesn't need to fetch
// On NO-IBD, because of the model, this block has to be requested again
// But all of these can do nothing on this branch
} else if inflight.insert(self.peer, (header.number(), hash).into()) {
fetch.push(header)
}

Expand All @@ -172,6 +177,20 @@ impl BlockFetcher {

// The headers in `fetch` may be unordered. Sort them by number.
fetch.sort_by_key(|header| header.number());
Some(fetch.iter().map(core::HeaderView::hash).collect())

let tip = self.active_chain.tip_number();
let should_mark = fetch.last().map_or(false, |header| {
header.number().saturating_sub(crate::CHECK_POINT_WINDOW) > tip
});
if should_mark {
inflight.mark_slow_block(tip);
}

Some(
fetch
.chunks(crate::INIT_BLOCKS_IN_TRANSIT_PER_PEER)
.map(|headers| headers.iter().map(core::HeaderView::hash).collect())
.collect(),
)
}
}
6 changes: 3 additions & 3 deletions sync/src/synchronizer/get_blocks_process.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::block_status::BlockStatus;
use crate::synchronizer::Synchronizer;
use crate::{Status, StatusCode, MAX_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN};
use crate::{Status, StatusCode, INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN};
use ckb_logger::debug;
use ckb_network::{CKBProtocolContext, PeerIndex};
use ckb_types::{packed, prelude::*};
Expand Down Expand Up @@ -29,7 +29,7 @@ impl<'a> GetBlocksProcess<'a> {

pub fn execute(self) -> Status {
let block_hashes = self.message.block_hashes();
// use MAX_HEADERS_LEN as limit, we may increase the value of MAX_BLOCKS_IN_TRANSIT_PER_PEER in the future
// use MAX_HEADERS_LEN as limit, we may increase the value of INIT_BLOCKS_IN_TRANSIT_PER_PEER in the future
if block_hashes.len() > MAX_HEADERS_LEN {
return StatusCode::ProtocolMessageIsMalformed.with_context(format!(
"BlockHashes count({}) > MAX_HEADERS_LEN({})",
Expand All @@ -39,7 +39,7 @@ impl<'a> GetBlocksProcess<'a> {
}
let active_chain = self.synchronizer.shared.active_chain();

for block_hash in block_hashes.iter().take(MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
for block_hash in block_hashes.iter().take(INIT_BLOCKS_IN_TRANSIT_PER_PEER) {
debug!("get_blocks {} from peer {:?}", block_hash, self.peer);
let block_hash = block_hash.to_entity();

Expand Down
Loading

0 comments on commit 573883f

Please sign in to comment.