Skip to content

Commit

Permalink
feat: Optimize block download tasks with a simple task scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Apr 16, 2020
1 parent 781ac1f commit 8b9db25
Show file tree
Hide file tree
Showing 12 changed files with 582 additions and 149 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions network/src/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ pub trait CKBProtocolContext: Send {
fn send_paused(&self) -> bool;
// Other methods
fn protocol_id(&self) -> ProtocolId;
fn p2p_control(&self) -> Option<&ServiceControl> {
None
}
}

pub trait CKBProtocolHandler: Sync + Send {
Expand Down Expand Up @@ -399,6 +402,10 @@ impl CKBProtocolContext for DefaultCKBProtocolContext {
fn send_paused(&self) -> bool {
self.send_paused
}

fn p2p_control(&self) -> Option<&ServiceControl> {
Some(&self.p2p_control)
}
}

pub(crate) struct BlockingFutureTask {
Expand Down
1 change: 1 addition & 0 deletions sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ sentry = "0.16.0"
futures = "0.3"
ckb-error = {path = "../error"}
ckb-tx-pool = { path = "../tx-pool" }
crossbeam-channel = "0.3"

[dev-dependencies]
ckb-test-chain-utils = { path = "../util/test-chain-utils" }
Expand Down
1 change: 0 additions & 1 deletion sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ impl Into<ProtocolId> for NetworkProtocol {
pub const HEADERS_DOWNLOAD_TIMEOUT_BASE: u64 = 6 * 60 * 1000; // 6 minutes
pub const HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER: u64 = 1; // 1ms/header
pub const POW_SPACE: u64 = 10_000; // 10s
pub const MAX_PEERS_PER_BLOCK: usize = 2;

// Protect at least this many outbound peers from disconnection due to slow
// behind headers chain.
Expand Down
34 changes: 24 additions & 10 deletions sync/src/relayer/block_transactions_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ impl<'a> BlockTransactionsProcess<'a> {
let missing_uncles: Vec<u32>;
let mut collision = false;

{
self.relayer
.shared
.state()
.write_inflight_blocks()
.remove_compact(self.peer, &block_hash);
}

if let Entry::Occupied(mut pending) = shared
.state()
.pending_compact_blocks()
Expand Down Expand Up @@ -135,18 +143,24 @@ impl<'a> BlockTransactionsProcess<'a> {

assert!(!missing_transactions.is_empty() || !missing_uncles.is_empty());

let content = packed::GetBlockTransactions::new_builder()
.block_hash(block_hash.clone())
.indexes(missing_transactions.pack())
.uncle_indexes(missing_uncles.pack())
.build();
let message = packed::RelayMessage::new_builder().set(content).build();
if shared
.state()
.write_inflight_blocks()
.compact_reconstruct(self.peer, block_hash.clone())
{
let content = packed::GetBlockTransactions::new_builder()
.block_hash(block_hash.clone())
.indexes(missing_transactions.pack())
.uncle_indexes(missing_uncles.pack())
.build();
let message = packed::RelayMessage::new_builder().set(content).build();

if let Err(err) = self.nc.send_message_to(self.peer, message.as_bytes()) {
return StatusCode::Network
.with_context(format!("Send GetBlockTransactions error: {:?}", err,));
if let Err(err) = self.nc.send_message_to(self.peer, message.as_bytes()) {
return StatusCode::Network
.with_context(format!("Send GetBlockTransactions error: {:?}", err,));
}
crate::relayer::log_sent_metric(message.to_enum().item_name());
}
crate::relayer::log_sent_metric(message.to_enum().item_name());

mem::replace(expected_transaction_indexes, missing_transactions);
mem::replace(expected_uncle_indexes, missing_uncles);
Expand Down
8 changes: 4 additions & 4 deletions sync/src/relayer/compact_block_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ impl<'a> CompactBlockProcess<'a> {

let parent = parent.unwrap();

if let Some(flight) = shared
if let Some(peers) = shared
.state()
.read_inflight_blocks()
.inflight_state_by_block(&block_hash)
.inflight_compact_by_block(&block_hash)
{
if flight.peers.contains(&self.peer) {
if peers.contains(&self.peer) {
debug_target!(
crate::LOG_TARGET_RELAY,
"discard already in-flight compact block {}",
Expand Down Expand Up @@ -240,7 +240,7 @@ impl<'a> CompactBlockProcess<'a> {
if !shared
.state()
.write_inflight_blocks()
.insert(self.peer, block_hash.clone())
.compact_reconstruct(self.peer, block_hash.clone())
{
debug_target!(
crate::LOG_TARGET_RELAY,
Expand Down
7 changes: 3 additions & 4 deletions sync/src/relayer/tests/compact_block_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::block_status::BlockStatus;
use crate::relayer::compact_block_process::CompactBlockProcess;
use crate::relayer::tests::helper::{build_chain, new_header_builder, MockProtocalContext};
use crate::types::InflightBlocks;
use crate::MAX_PEERS_PER_BLOCK;
use crate::{NetworkProtocol, Status, StatusCode};
use ckb_network::PeerIndex;
use ckb_store::ChainStore;
Expand Down Expand Up @@ -201,7 +200,7 @@ fn test_already_in_flight() {

// Already in flight
let mut in_flight_blocks = InflightBlocks::default();
in_flight_blocks.insert(peer_index, block.header().hash());
in_flight_blocks.compact_reconstruct(peer_index, block.header().hash());
*relayer.shared.state().write_inflight_blocks() = in_flight_blocks;

let compact_block_process = CompactBlockProcess::new(
Expand Down Expand Up @@ -349,8 +348,8 @@ fn test_inflight_blocks_reach_limit() {
// in_flight_blocks is full
{
let mut in_flight_blocks = InflightBlocks::default();
for i in 0..=MAX_PEERS_PER_BLOCK {
in_flight_blocks.insert(i.into(), block.header().hash());
for i in 0..=2 {
in_flight_blocks.compact_reconstruct(i.into(), block.header().hash());
}
*relayer.shared.state().write_inflight_blocks() = in_flight_blocks;
}
Expand Down
26 changes: 16 additions & 10 deletions sync/src/synchronizer/block_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
use crate::block_status::BlockStatus;
use crate::synchronizer::Synchronizer;
use crate::types::{ActiveChain, HeaderView, IBDState};
use crate::{BLOCK_DOWNLOAD_WINDOW, MAX_BLOCKS_IN_TRANSIT_PER_PEER};
use crate::BLOCK_DOWNLOAD_WINDOW;
use ckb_logger::{debug, trace};
use ckb_network::PeerIndex;
use ckb_types::{core, packed};
use std::cmp::min;

pub struct BlockFetcher {
synchronizer: Synchronizer,
pub struct BlockFetcher<'a> {
synchronizer: &'a Synchronizer,
peer: PeerIndex,
active_chain: ActiveChain,
ibd: IBDState,
}

impl BlockFetcher {
pub fn new(synchronizer: Synchronizer, peer: PeerIndex, ibd: IBDState) -> Self {
impl<'a> BlockFetcher<'a> {
pub fn new(synchronizer: &'a Synchronizer, peer: PeerIndex, ibd: IBDState) -> Self {
let active_chain = synchronizer.shared.active_chain();
BlockFetcher {
peer,
Expand All @@ -29,7 +29,7 @@ impl BlockFetcher {
let inflight = self.synchronizer.shared().state().read_inflight_blocks();

// Can't download any more from this peer
inflight.peer_inflight_count(self.peer) >= MAX_BLOCKS_IN_TRANSIT_PER_PEER
inflight.peer_can_fetch_count(self.peer) == 0
}

pub fn is_better_chain(&self, header: &HeaderView) -> bool {
Expand Down Expand Up @@ -69,7 +69,7 @@ impl BlockFetcher {
Some(fixed_last_common_header)
}

pub fn fetch(self) -> Option<Vec<packed::Byte32>> {
pub fn fetch(self) -> Option<Vec<Vec<packed::Byte32>>> {
trace!("[block downloader] BlockFetcher process");

if self.reached_inflight_limit() {
Expand Down Expand Up @@ -130,7 +130,7 @@ impl BlockFetcher {
let end = min(best_known_header.number(), start + BLOCK_DOWNLOAD_WINDOW);
let n_fetch = min(
end.saturating_sub(start) as usize + 1,
MAX_BLOCKS_IN_TRANSIT_PER_PEER.saturating_sub(inflight.peer_inflight_count(self.peer)),
inflight.peer_can_fetch_count(self.peer),
);
let mut fetch = Vec::with_capacity(n_fetch);

Expand All @@ -155,7 +155,7 @@ impl BlockFetcher {
// If the block is stored, its ancestor must on store
// So we can skip the search of this space directly
break;
} else if inflight.insert(self.peer, hash) {
} else if inflight.insert(self.peer, hash, header.number()) {
fetch.push(header)
}

Expand All @@ -172,6 +172,12 @@ impl BlockFetcher {

// The headers in `fetch` may be unordered. Sort them by number.
fetch.sort_by_key(|header| header.number());
Some(fetch.iter().map(core::HeaderView::hash).collect())

Some(
fetch
.chunks(crate::MAX_BLOCKS_IN_TRANSIT_PER_PEER)
.map(|headers| headers.iter().map(core::HeaderView::hash).collect())
.collect(),
)
}
}
Loading

0 comments on commit 8b9db25

Please sign in to comment.