Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Optimize block download tasks with a simple task scheduler #1999

Merged
merged 5 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions network/src/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ pub trait CKBProtocolContext: Send {
fn send_paused(&self) -> bool;
// Other methods
fn protocol_id(&self) -> ProtocolId;
fn p2p_control(&self) -> Option<&ServiceControl> {
None
}
}

pub trait CKBProtocolHandler: Sync + Send {
Expand Down Expand Up @@ -399,6 +402,10 @@ impl CKBProtocolContext for DefaultCKBProtocolContext {
fn send_paused(&self) -> bool {
self.send_paused
}

fn p2p_control(&self) -> Option<&ServiceControl> {
Some(&self.p2p_control)
}
}

pub(crate) struct BlockingFutureTask {
Expand Down
1 change: 1 addition & 0 deletions sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ sentry = "0.16.0"
futures = "0.3"
ckb-error = {path = "../error"}
ckb-tx-pool = { path = "../tx-pool" }
crossbeam-channel = "0.3"

[dev-dependencies]
ckb-test-chain-utils = { path = "../util/test-chain-utils" }
Expand Down
8 changes: 6 additions & 2 deletions sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,15 @@ pub const MAX_INVENTORY_LEN: usize = 50_000;
pub const MAX_SCHEDULED_LEN: usize = 4 * 1024;
pub const MAX_BLOCKS_TO_ANNOUNCE: usize = 8;
pub const MAX_UNCONNECTING_HEADERS: usize = 10;
pub const MAX_BLOCKS_IN_TRANSIT_PER_PEER: usize = 16;
pub const MAX_TIP_AGE: u64 = 24 * 60 * 60 * 1000;
pub const STALE_RELAY_AGE_LIMIT: u64 = 30 * 24 * 60 * 60 * 1000;

/* About Download Scheduler */
pub const INIT_BLOCKS_IN_TRANSIT_PER_PEER: usize = 16;
pub const FIRST_LEVEL_MAX: usize = 32;
pub const MAX_BLOCKS_IN_TRANSIT_PER_PEER: usize = 128;
pub const CHECK_POINT_WINDOW: u64 = (MAX_BLOCKS_IN_TRANSIT_PER_PEER * 4) as u64;

pub(crate) const LOG_TARGET_RELAY: &str = "ckb-relay";

use ckb_network::ProtocolId;
Expand All @@ -51,7 +56,6 @@ impl Into<ProtocolId> for NetworkProtocol {
pub const HEADERS_DOWNLOAD_TIMEOUT_BASE: u64 = 6 * 60 * 1000; // 6 minutes
pub const HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER: u64 = 1; // 1ms/header
pub const POW_SPACE: u64 = 10_000; // 10s
pub const MAX_PEERS_PER_BLOCK: usize = 2;

// Protect at least this many outbound peers from disconnection due to slow
// behind headers chain.
Expand Down
34 changes: 24 additions & 10 deletions sync/src/relayer/block_transactions_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ impl<'a> BlockTransactionsProcess<'a> {
let missing_uncles: Vec<u32>;
let mut collision = false;

{
self.relayer
.shared
.state()
.write_inflight_blocks()
.remove_compact(self.peer, &block_hash);
}

if let Entry::Occupied(mut pending) = shared
.state()
.pending_compact_blocks()
Expand Down Expand Up @@ -135,18 +143,24 @@ impl<'a> BlockTransactionsProcess<'a> {

assert!(!missing_transactions.is_empty() || !missing_uncles.is_empty());

let content = packed::GetBlockTransactions::new_builder()
.block_hash(block_hash.clone())
.indexes(missing_transactions.pack())
.uncle_indexes(missing_uncles.pack())
.build();
let message = packed::RelayMessage::new_builder().set(content).build();
if shared
.state()
.write_inflight_blocks()
.compact_reconstruct(self.peer, block_hash.clone())
{
let content = packed::GetBlockTransactions::new_builder()
.block_hash(block_hash.clone())
.indexes(missing_transactions.pack())
.uncle_indexes(missing_uncles.pack())
.build();
let message = packed::RelayMessage::new_builder().set(content).build();

if let Err(err) = self.nc.send_message_to(self.peer, message.as_bytes()) {
return StatusCode::Network
.with_context(format!("Send GetBlockTransactions error: {:?}", err,));
if let Err(err) = self.nc.send_message_to(self.peer, message.as_bytes()) {
return StatusCode::Network
.with_context(format!("Send GetBlockTransactions error: {:?}", err,));
}
crate::relayer::log_sent_metric(message.to_enum().item_name());
}
crate::relayer::log_sent_metric(message.to_enum().item_name());

mem::replace(expected_transaction_indexes, missing_transactions);
mem::replace(expected_uncle_indexes, missing_uncles);
Expand Down
8 changes: 4 additions & 4 deletions sync/src/relayer/compact_block_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ impl<'a> CompactBlockProcess<'a> {

let parent = parent.unwrap();

if let Some(flight) = shared
if let Some(peers) = shared
.state()
.read_inflight_blocks()
.inflight_state_by_block(&block_hash)
.inflight_compact_by_block(&block_hash)
{
if flight.peers.contains(&self.peer) {
if peers.contains(&self.peer) {
debug_target!(
crate::LOG_TARGET_RELAY,
"discard already in-flight compact block {}",
Expand Down Expand Up @@ -240,7 +240,7 @@ impl<'a> CompactBlockProcess<'a> {
if !shared
.state()
.write_inflight_blocks()
.insert(self.peer, block_hash.clone())
.compact_reconstruct(self.peer, block_hash.clone())
{
debug_target!(
crate::LOG_TARGET_RELAY,
Expand Down
7 changes: 3 additions & 4 deletions sync/src/relayer/tests/compact_block_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::block_status::BlockStatus;
use crate::relayer::compact_block_process::CompactBlockProcess;
use crate::relayer::tests::helper::{build_chain, new_header_builder, MockProtocalContext};
use crate::types::InflightBlocks;
use crate::MAX_PEERS_PER_BLOCK;
use crate::{NetworkProtocol, Status, StatusCode};
use ckb_network::PeerIndex;
use ckb_store::ChainStore;
Expand Down Expand Up @@ -201,7 +200,7 @@ fn test_already_in_flight() {

// Already in flight
let mut in_flight_blocks = InflightBlocks::default();
in_flight_blocks.insert(peer_index, block.header().hash());
in_flight_blocks.compact_reconstruct(peer_index, block.header().hash());
*relayer.shared.state().write_inflight_blocks() = in_flight_blocks;

let compact_block_process = CompactBlockProcess::new(
Expand Down Expand Up @@ -349,8 +348,8 @@ fn test_inflight_blocks_reach_limit() {
// in_flight_blocks is full
{
let mut in_flight_blocks = InflightBlocks::default();
for i in 0..=MAX_PEERS_PER_BLOCK {
in_flight_blocks.insert(i.into(), block.header().hash());
for i in 0..=2 {
in_flight_blocks.compact_reconstruct(i.into(), block.header().hash());
}
*relayer.shared.state().write_inflight_blocks() = in_flight_blocks;
}
Expand Down
51 changes: 35 additions & 16 deletions sync/src/synchronizer/block_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use crate::block_status::BlockStatus;
use crate::synchronizer::Synchronizer;
use crate::types::{ActiveChain, HeaderView, IBDState};
use crate::{BLOCK_DOWNLOAD_WINDOW, MAX_BLOCKS_IN_TRANSIT_PER_PEER};
use crate::BLOCK_DOWNLOAD_WINDOW;
use ckb_logger::{debug, trace};
use ckb_network::PeerIndex;
use ckb_types::{core, packed};
use std::cmp::min;

pub struct BlockFetcher {
synchronizer: Synchronizer,
pub struct BlockFetcher<'a> {
synchronizer: &'a Synchronizer,
peer: PeerIndex,
active_chain: ActiveChain,
ibd: IBDState,
}

impl BlockFetcher {
pub fn new(synchronizer: Synchronizer, peer: PeerIndex, ibd: IBDState) -> Self {
impl<'a> BlockFetcher<'a> {
pub fn new(synchronizer: &'a Synchronizer, peer: PeerIndex, ibd: IBDState) -> Self {
let active_chain = synchronizer.shared.active_chain();
BlockFetcher {
peer,
Expand All @@ -29,7 +29,7 @@ impl BlockFetcher {
let inflight = self.synchronizer.shared().state().read_inflight_blocks();

// Can't download any more from this peer
inflight.peer_inflight_count(self.peer) >= MAX_BLOCKS_IN_TRANSIT_PER_PEER
inflight.peer_can_fetch_count(self.peer) == 0
}

pub fn is_better_chain(&self, header: &HeaderView) -> bool {
Expand Down Expand Up @@ -69,7 +69,7 @@ impl BlockFetcher {
Some(fixed_last_common_header)
}

pub fn fetch(self) -> Option<Vec<packed::Byte32>> {
pub fn fetch(self) -> Option<Vec<Vec<packed::Byte32>>> {
trace!("[block downloader] BlockFetcher process");

if self.reached_inflight_limit() {
Expand Down Expand Up @@ -130,7 +130,7 @@ impl BlockFetcher {
let end = min(best_known_header.number(), start + BLOCK_DOWNLOAD_WINDOW);
let n_fetch = min(
end.saturating_sub(start) as usize + 1,
MAX_BLOCKS_IN_TRANSIT_PER_PEER.saturating_sub(inflight.peer_inflight_count(self.peer)),
inflight.peer_can_fetch_count(self.peer),
);
let mut fetch = Vec::with_capacity(n_fetch);

Expand All @@ -146,16 +146,21 @@ impl BlockFetcher {
for _ in 0..span {
let parent_hash = header.parent_hash();
let hash = header.hash();
// NOTE: Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
// stopping synchronization even when orphan_pool maintains dirty items by bugs.
let stored = self
.active_chain
.contains_block_status(&hash, BlockStatus::BLOCK_STORED);
if stored {

let status = self.active_chain.get_block_status(&hash);
if status == BlockStatus::BLOCK_STORED {
// If the block is stored, its ancestor must on store
// So we can skip the search of this space directly
break;
} else if inflight.insert(self.peer, hash) {
} else if self.ibd.into() && status.contains(BlockStatus::BLOCK_RECEIVED) {
driftluo marked this conversation as resolved.
Show resolved Hide resolved
// NOTE: NO-IBD Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
// stopping synchronization even when orphan_pool maintains dirty items by bugs.
// TODO: If async validation is achieved, then the IBD state judgement here can be removed

// On IBD, BLOCK_RECEIVED means this block had been received, so this block doesn't need to fetch
// On NO-IBD, because of the model, this block has to be requested again
// But all of these can do nothing on this branch
} else if inflight.insert(self.peer, (header.number(), hash).into()) {
fetch.push(header)
}

Expand All @@ -172,6 +177,20 @@ impl BlockFetcher {

// The headers in `fetch` may be unordered. Sort them by number.
fetch.sort_by_key(|header| header.number());
Some(fetch.iter().map(core::HeaderView::hash).collect())

let tip = self.active_chain.tip_number();
driftluo marked this conversation as resolved.
Show resolved Hide resolved
let should_mark = fetch.last().map_or(false, |header| {
header.number().saturating_sub(crate::CHECK_POINT_WINDOW) > tip
});
if should_mark {
inflight.mark_slow_block(tip);
}

Some(
fetch
.chunks(crate::INIT_BLOCKS_IN_TRANSIT_PER_PEER)
.map(|headers| headers.iter().map(core::HeaderView::hash).collect())
.collect(),
)
}
}
6 changes: 3 additions & 3 deletions sync/src/synchronizer/get_blocks_process.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::block_status::BlockStatus;
use crate::synchronizer::Synchronizer;
use crate::{Status, StatusCode, MAX_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN};
use crate::{Status, StatusCode, INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN};
use ckb_logger::debug;
use ckb_network::{CKBProtocolContext, PeerIndex};
use ckb_types::{packed, prelude::*};
Expand Down Expand Up @@ -29,7 +29,7 @@ impl<'a> GetBlocksProcess<'a> {

pub fn execute(self) -> Status {
let block_hashes = self.message.block_hashes();
// use MAX_HEADERS_LEN as limit, we may increase the value of MAX_BLOCKS_IN_TRANSIT_PER_PEER in the future
// use MAX_HEADERS_LEN as limit, we may increase the value of INIT_BLOCKS_IN_TRANSIT_PER_PEER in the future
if block_hashes.len() > MAX_HEADERS_LEN {
return StatusCode::ProtocolMessageIsMalformed.with_context(format!(
"BlockHashes count({}) > MAX_HEADERS_LEN({})",
Expand All @@ -39,7 +39,7 @@ impl<'a> GetBlocksProcess<'a> {
}
let active_chain = self.synchronizer.shared.active_chain();

for block_hash in block_hashes.iter().take(MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
for block_hash in block_hashes.iter().take(INIT_BLOCKS_IN_TRANSIT_PER_PEER) {
debug!("get_blocks {} from peer {:?}", block_hash, self.peer);
let block_hash = block_hash.to_entity();

Expand Down
Loading