From 8b9db2577e93a1e47fe3aac63c4fe4daec901ea8 Mon Sep 17 00:00:00 2001 From: driftluo Date: Tue, 31 Mar 2020 16:46:56 +0800 Subject: [PATCH] feat: Optimize block download tasks with a simple task scheduler --- Cargo.lock | 1 + network/src/protocols/mod.rs | 7 + sync/Cargo.toml | 1 + sync/src/lib.rs | 1 - .../src/relayer/block_transactions_process.rs | 34 +- sync/src/relayer/compact_block_process.rs | 8 +- .../relayer/tests/compact_block_process.rs | 7 +- sync/src/synchronizer/block_fetcher.rs | 26 +- sync/src/synchronizer/mod.rs | 189 +++++++++-- sync/src/tests/inflight_blocks.rs | 144 ++++++--- sync/src/types.rs | 300 +++++++++++++++--- test/src/specs/sync/get_blocks.rs | 13 +- 12 files changed, 582 insertions(+), 149 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8420b0ff40f..87fd32ae019 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -973,6 +973,7 @@ dependencies = [ "ckb-types", "ckb-util", "ckb-verification", + "crossbeam-channel", "failure", "faketime", "futures 0.3.4", diff --git a/network/src/protocols/mod.rs b/network/src/protocols/mod.rs index 415faf4eef6..70c32487e6b 100644 --- a/network/src/protocols/mod.rs +++ b/network/src/protocols/mod.rs @@ -66,6 +66,9 @@ pub trait CKBProtocolContext: Send { fn send_paused(&self) -> bool; // Other methods fn protocol_id(&self) -> ProtocolId; + fn p2p_control(&self) -> Option<&ServiceControl> { + None + } } pub trait CKBProtocolHandler: Sync + Send { @@ -399,6 +402,10 @@ impl CKBProtocolContext for DefaultCKBProtocolContext { fn send_paused(&self) -> bool { self.send_paused } + + fn p2p_control(&self) -> Option<&ServiceControl> { + Some(&self.p2p_control) + } } pub(crate) struct BlockingFutureTask { diff --git a/sync/Cargo.toml b/sync/Cargo.toml index f5ce6163a55..3a99146c927 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -24,6 +24,7 @@ sentry = "0.16.0" futures = "0.3" ckb-error = {path = "../error"} ckb-tx-pool = { path = "../tx-pool" } +crossbeam-channel = "0.3" [dev-dependencies] ckb-test-chain-utils = { path = "../util/test-chain-utils" } diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 07de21b2794..54a12b0ad47 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -51,7 +51,6 @@ impl Into for NetworkProtocol { pub const HEADERS_DOWNLOAD_TIMEOUT_BASE: u64 = 6 * 60 * 1000; // 6 minutes pub const HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER: u64 = 1; // 1ms/header pub const POW_SPACE: u64 = 10_000; // 10s -pub const MAX_PEERS_PER_BLOCK: usize = 2; // Protect at least this many outbound peers from disconnection due to slow // behind headers chain. diff --git a/sync/src/relayer/block_transactions_process.rs b/sync/src/relayer/block_transactions_process.rs index 40f9b32f4bd..ff42ac735e3 100644 --- a/sync/src/relayer/block_transactions_process.rs +++ b/sync/src/relayer/block_transactions_process.rs @@ -61,6 +61,14 @@ impl<'a> BlockTransactionsProcess<'a> { let missing_uncles: Vec; let mut collision = false; + { + self.relayer + .shared + .state() + .write_inflight_blocks() + .remove_compact(self.peer, &block_hash); + } + if let Entry::Occupied(mut pending) = shared .state() .pending_compact_blocks() @@ -135,18 +143,24 @@ impl<'a> BlockTransactionsProcess<'a> { assert!(!missing_transactions.is_empty() || !missing_uncles.is_empty()); - let content = packed::GetBlockTransactions::new_builder() - .block_hash(block_hash.clone()) - .indexes(missing_transactions.pack()) - .uncle_indexes(missing_uncles.pack()) - .build(); - let message = packed::RelayMessage::new_builder().set(content).build(); + if shared + .state() + .write_inflight_blocks() + .compact_reconstruct(self.peer, block_hash.clone()) + { + let content = packed::GetBlockTransactions::new_builder() + .block_hash(block_hash.clone()) + .indexes(missing_transactions.pack()) + .uncle_indexes(missing_uncles.pack()) + .build(); + let message = packed::RelayMessage::new_builder().set(content).build(); - if let Err(err) = self.nc.send_message_to(self.peer, message.as_bytes()) { - return StatusCode::Network - .with_context(format!("Send GetBlockTransactions error: {:?}", err,)); + if let Err(err) = self.nc.send_message_to(self.peer, message.as_bytes()) { + return StatusCode::Network + .with_context(format!("Send GetBlockTransactions error: {:?}", err,)); + } + crate::relayer::log_sent_metric(message.to_enum().item_name()); } - crate::relayer::log_sent_metric(message.to_enum().item_name()); mem::replace(expected_transaction_indexes, missing_transactions); mem::replace(expected_uncle_indexes, missing_uncles); diff --git a/sync/src/relayer/compact_block_process.rs b/sync/src/relayer/compact_block_process.rs index 0a63d6e0599..a0f5c9e96d7 100644 --- a/sync/src/relayer/compact_block_process.rs +++ b/sync/src/relayer/compact_block_process.rs @@ -122,12 +122,12 @@ impl<'a> CompactBlockProcess<'a> { let parent = parent.unwrap(); - if let Some(flight) = shared + if let Some(peers) = shared .state() .read_inflight_blocks() - .inflight_state_by_block(&block_hash) + .inflight_compact_by_block(&block_hash) { - if flight.peers.contains(&self.peer) { + if peers.contains(&self.peer) { debug_target!( crate::LOG_TARGET_RELAY, "discard already in-flight compact block {}", @@ -240,7 +240,7 @@ impl<'a> CompactBlockProcess<'a> { if !shared .state() .write_inflight_blocks() - .insert(self.peer, block_hash.clone()) + .compact_reconstruct(self.peer, block_hash.clone()) { debug_target!( crate::LOG_TARGET_RELAY, diff --git a/sync/src/relayer/tests/compact_block_process.rs b/sync/src/relayer/tests/compact_block_process.rs index a93a43a41af..23a63bcc42b 100644 --- a/sync/src/relayer/tests/compact_block_process.rs +++ b/sync/src/relayer/tests/compact_block_process.rs @@ -2,7 +2,6 @@ use crate::block_status::BlockStatus; use crate::relayer::compact_block_process::CompactBlockProcess; use crate::relayer::tests::helper::{build_chain, new_header_builder, MockProtocalContext}; use crate::types::InflightBlocks; -use crate::MAX_PEERS_PER_BLOCK; use crate::{NetworkProtocol, Status, StatusCode}; use ckb_network::PeerIndex; use ckb_store::ChainStore; @@ -201,7 +200,7 @@ fn test_already_in_flight() { // Already in flight let mut in_flight_blocks = InflightBlocks::default(); - in_flight_blocks.insert(peer_index, block.header().hash()); + in_flight_blocks.compact_reconstruct(peer_index, block.header().hash()); *relayer.shared.state().write_inflight_blocks() = in_flight_blocks; let compact_block_process = CompactBlockProcess::new( @@ -349,8 +348,8 @@ fn test_inflight_blocks_reach_limit() { // in_flight_blocks is full { let mut in_flight_blocks = InflightBlocks::default(); - for i in 0..=MAX_PEERS_PER_BLOCK { - in_flight_blocks.insert(i.into(), block.header().hash()); + for i in 0..=2 { + in_flight_blocks.compact_reconstruct(i.into(), block.header().hash()); } *relayer.shared.state().write_inflight_blocks() = in_flight_blocks; } diff --git a/sync/src/synchronizer/block_fetcher.rs b/sync/src/synchronizer/block_fetcher.rs index ff06032802d..97fc0e01ab4 100644 --- a/sync/src/synchronizer/block_fetcher.rs +++ b/sync/src/synchronizer/block_fetcher.rs @@ -1,21 +1,21 @@ use crate::block_status::BlockStatus; use crate::synchronizer::Synchronizer; use crate::types::{ActiveChain, HeaderView, IBDState}; -use crate::{BLOCK_DOWNLOAD_WINDOW, MAX_BLOCKS_IN_TRANSIT_PER_PEER}; +use crate::BLOCK_DOWNLOAD_WINDOW; use ckb_logger::{debug, trace}; use ckb_network::PeerIndex; use ckb_types::{core, packed}; use std::cmp::min; -pub struct BlockFetcher { - synchronizer: Synchronizer, +pub struct BlockFetcher<'a> { + synchronizer: &'a Synchronizer, peer: PeerIndex, active_chain: ActiveChain, ibd: IBDState, } -impl BlockFetcher { - pub fn new(synchronizer: Synchronizer, peer: PeerIndex, ibd: IBDState) -> Self { +impl<'a> BlockFetcher<'a> { + pub fn new(synchronizer: &'a Synchronizer, peer: PeerIndex, ibd: IBDState) -> Self { let active_chain = synchronizer.shared.active_chain(); BlockFetcher { peer, @@ -29,7 +29,7 @@ impl BlockFetcher { let inflight = self.synchronizer.shared().state().read_inflight_blocks(); // Can't download any more from this peer - inflight.peer_inflight_count(self.peer) >= MAX_BLOCKS_IN_TRANSIT_PER_PEER + inflight.peer_can_fetch_count(self.peer) == 0 } pub fn is_better_chain(&self, header: &HeaderView) -> bool { @@ -69,7 +69,7 @@ impl BlockFetcher { Some(fixed_last_common_header) } - pub fn fetch(self) -> Option> { + pub fn fetch(self) -> Option>> { trace!("[block downloader] BlockFetcher process"); if self.reached_inflight_limit() { @@ -130,7 +130,7 @@ impl BlockFetcher { let end = min(best_known_header.number(), start + BLOCK_DOWNLOAD_WINDOW); let n_fetch = min( end.saturating_sub(start) as usize + 1, - MAX_BLOCKS_IN_TRANSIT_PER_PEER.saturating_sub(inflight.peer_inflight_count(self.peer)), + inflight.peer_can_fetch_count(self.peer), ); let mut fetch = Vec::with_capacity(n_fetch); @@ -155,7 +155,7 @@ impl BlockFetcher { // If the block is stored, its ancestor must on store // So we can skip the search of this space directly break; - } else if inflight.insert(self.peer, hash) { + } else if inflight.insert(self.peer, hash, header.number()) { fetch.push(header) } @@ -172,6 +172,12 @@ impl BlockFetcher { // The headers in `fetch` may be unordered. Sort them by number. fetch.sort_by_key(|header| header.number()); - Some(fetch.iter().map(core::HeaderView::hash).collect()) + + Some( + fetch + .chunks(crate::MAX_BLOCKS_IN_TRANSIT_PER_PEER) + .map(|headers| headers.iter().map(core::HeaderView::hash).collect()) + .collect(), + ) } } diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index 23f67fb51fb..d27f03c517f 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -20,12 +20,14 @@ use crate::{ }; use ckb_chain::chain::ChainController; use ckb_logger::{debug, error, info, metric, trace, warn}; -use ckb_network::{bytes::Bytes, CKBProtocolContext, CKBProtocolHandler, PeerIndex}; +use ckb_network::{ + bytes::Bytes, CKBProtocolContext, CKBProtocolHandler, PeerIndex, ServiceControl, +}; use ckb_types::{core, packed, prelude::*}; use failure::Error as FailureError; use faketime::unix_time_as_millis; use std::cmp::min; -use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -39,15 +41,74 @@ const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_millis(200); const IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(40); const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200); +enum FetchCMD { + Fetch(Vec), + Shutdown, +} + +struct BlockFetchCMD { + can_fetch_block: Arc, + sync: Synchronizer, + p2p_control: ServiceControl, + recv: crossbeam_channel::Receiver, +} + +impl BlockFetchCMD { + fn run(&self) { + while let Ok(cmd) = self.recv.recv() { + match cmd { + FetchCMD::Fetch(peers) => { + self.can_fetch_block.store(false, Ordering::Release); + for peer in peers { + if let Some(fetch) = + BlockFetcher::new(&self.sync, peer, IBDState::In).fetch() + { + for item in fetch { + BlockFetchCMD::send_getblocks(item, &self.p2p_control, peer); + } + } + } + self.can_fetch_block.store(true, Ordering::Release) + } + FetchCMD::Shutdown => break, + } + } + } + + fn send_getblocks(v_fetch: Vec, nc: &ServiceControl, peer: PeerIndex) { + let content = packed::GetBlocks::new_builder() + .block_hashes(v_fetch.clone().pack()) + .build(); + let message = packed::SyncMessage::new_builder().set(content).build(); + + debug!("send_getblocks len={:?} to peer={}", v_fetch.len(), peer); + if let Err(err) = nc.send_message_to( + peer, + crate::NetworkProtocol::SYNC.into(), + message.as_bytes(), + ) { + debug!("synchronizer send GetBlocks error: {:?}", err); + } + crate::synchronizer::log_sent_metric(message.to_enum().item_name()); + } +} + #[derive(Clone)] pub struct Synchronizer { chain: ChainController, pub shared: Arc, + can_fetch_block: Arc, + fetch_channel: Option>, } impl Synchronizer { pub fn new(chain: ChainController, shared: Arc) -> Synchronizer { - Synchronizer { chain, shared } + Synchronizer { + chain, + shared, + can_fetch_block: Arc::new(AtomicBool::new(true)), + fetch_channel: None, + } } pub fn shared(&self) -> &Arc { @@ -163,8 +224,8 @@ impl Synchronizer { &self, peer: PeerIndex, ibd: IBDState, - ) -> Option> { - BlockFetcher::new(self.clone(), peer, ibd).fetch() + ) -> Option>> { + BlockFetcher::new(&self, peer, ibd).fetch() } fn on_connected(&self, nc: &dyn CKBProtocolContext, peer: PeerIndex) { @@ -357,33 +418,111 @@ impl Synchronizer { } } - fn find_blocks_to_fetch(&self, nc: &dyn CKBProtocolContext, ibd: IBDState) { + fn find_blocks_to_fetch(&mut self, nc: &dyn CKBProtocolContext, ibd: IBDState) { + let tip = self.shared.active_chain().tip_number(); + + let disconnect_list = { + let mut list = self.shared().state().write_inflight_blocks().prune(tip); + if let IBDState::In = ibd { + // best known < tip and in IBD state, these node can be disconnect + list.extend( + self.shared + .state() + .peers() + .get_best_known_less_than_tip(tip), + ) + }; + list + }; + + for peer in disconnect_list.iter() { + if self + .peers() + .get_flag(*peer) + .map(|flag| flag.is_whitelist || flag.is_protect) + .unwrap_or(false) + { + continue; + } + if let Err(err) = nc.disconnect(*peer, "sync disconnect") { + debug!("synchronizer disconnect error: {:?}", err); + } + } + + if !self.can_fetch_block.load(Ordering::Acquire) { + return; + } + let peers: Vec = { - self.peers() + let state = &self + .shared + .state() + .read_inflight_blocks() + .download_schedulers; + let mut peers: Vec = self + .peers() .state .read() .iter() - .filter(|(_, state)| match ibd { - IBDState::In => { - state.peer_flags.is_outbound - || state.peer_flags.is_whitelist - || state.peer_flags.is_protect + .filter(|(id, state)| { + if disconnect_list.contains(id) { + return false; + }; + match ibd { + IBDState::In => { + state.peer_flags.is_outbound + || state.peer_flags.is_whitelist + || state.peer_flags.is_protect + } + IBDState::Out => state.sync_started, } - IBDState::Out => state.sync_started, }) .map(|(peer_id, _)| peer_id) .cloned() - .collect() + .collect(); + peers.sort_by_key(|id| { + state + .get(id) + .map(|d| d.task_count()) + .unwrap_or(crate::MAX_BLOCKS_IN_TRANSIT_PER_PEER) + }); + peers }; trace!("poll find_blocks_to_fetch select peers"); - { - self.shared().state().write_inflight_blocks().prune(); - } - for peer in peers { - if let Some(fetch) = self.get_blocks_to_fetch(peer, ibd) { - if !fetch.is_empty() { - self.send_getblocks(fetch, nc, peer); + // fetch use a lot of cpu time, especially in ibd state + match nc.p2p_control() { + Some(raw) if ibd.into() => match self.fetch_channel { + Some(ref send) => send.send(FetchCMD::Fetch(peers)).unwrap(), + None => { + let p2p_control = raw.clone(); + let sync = self.clone(); + let can_fetch_block = Arc::clone(&self.can_fetch_block); + let (sender, recv) = crossbeam_channel::bounded(2); + sender.send(FetchCMD::Fetch(peers)).unwrap(); + self.fetch_channel = Some(sender); + ::std::thread::spawn(move || { + BlockFetchCMD { + sync, + p2p_control, + recv, + can_fetch_block, + } + .run(); + }); + } + }, + _ => { + if let Some(sender) = self.fetch_channel.take() { + sender.send(FetchCMD::Shutdown).unwrap(); + } + + for peer in peers { + if let Some(fetch) = self.get_blocks_to_fetch(peer, ibd) { + for item in fetch { + self.send_getblocks(item, nc, peer); + } + } } } } @@ -1149,16 +1288,16 @@ mod tests { .unwrap(); assert_eq!( - blocks_to_fetch.first().unwrap(), + blocks_to_fetch[0].first().unwrap(), &shared2.store().get_block_hash(193).unwrap() ); assert_eq!( - blocks_to_fetch.last().unwrap(), + blocks_to_fetch[0].last().unwrap(), &shared2.store().get_block_hash(200).unwrap() ); let mut fetched_blocks = Vec::new(); - for block_hash in &blocks_to_fetch { + for block_hash in &blocks_to_fetch[0] { fetched_blocks.push(shared2.store().get_block(block_hash).unwrap()); } @@ -1176,7 +1315,7 @@ mod tests { .get_last_common_header(peer1) .unwrap() .hash(), - blocks_to_fetch.last().unwrap() + blocks_to_fetch[0].last().unwrap() ); } diff --git a/sync/src/tests/inflight_blocks.rs b/sync/src/tests/inflight_blocks.rs index 56858722e9f..4ed27ea217e 100644 --- a/sync/src/tests/inflight_blocks.rs +++ b/sync/src/tests/inflight_blocks.rs @@ -9,25 +9,21 @@ use std::iter::FromIterator; fn inflight_blocks_count() { let mut inflight_blocks = InflightBlocks::default(); - // allow 2 peer for one block - assert!(inflight_blocks.insert(1.into(), h256!("0x1").pack())); - assert!(inflight_blocks.insert(2.into(), h256!("0x1").pack())); - assert!(!inflight_blocks.insert(3.into(), h256!("0x1").pack())); + // don't allow 2 peer for one block + assert!(inflight_blocks.insert(2.into(), h256!("0x1").pack(), 1)); + assert!(!inflight_blocks.insert(1.into(), h256!("0x1").pack(), 1)); // peer 1 inflight - assert!(!inflight_blocks.insert(1.into(), h256!("0x1").pack())); + assert!(!inflight_blocks.insert(1.into(), h256!("0x1").pack(), 1)); - assert!(inflight_blocks.insert(1.into(), h256!("0x2").pack())); + assert!(inflight_blocks.insert(1.into(), h256!("0x2").pack(), 2)); assert_eq!(inflight_blocks.total_inflight_count(), 2); // 0x1 0x2 - assert_eq!(inflight_blocks.peer_inflight_count(1.into()), 2); + assert_eq!(inflight_blocks.peer_inflight_count(1.into()), 1); assert_eq!(inflight_blocks.peer_inflight_count(2.into()), 1); // one block inflight assert_eq!( inflight_blocks.inflight_block_by_peer(1.into()).cloned(), - Some(HashSet::from_iter(vec![ - h256!("0x1").pack(), - h256!("0x2").pack() - ])) + Some(HashSet::from_iter(vec![h256!("0x2").pack()])) ); // receive block 0x1 @@ -48,29 +44,29 @@ fn inflight_blocks_count() { fn inflight_blocks_state() { let mut inflight_blocks = InflightBlocks::default(); - assert!(inflight_blocks.insert(1.into(), h256!("0x1").pack())); - assert!(inflight_blocks.insert(2.into(), h256!("0x1").pack())); - assert!(!inflight_blocks.insert(3.into(), h256!("0x1").pack())); + assert!(inflight_blocks.insert(1.into(), h256!("0x1").pack(), 1)); + assert!(!inflight_blocks.insert(2.into(), h256!("0x1").pack(), 1)); + assert!(!inflight_blocks.insert(3.into(), h256!("0x1").pack(), 1)); // peer 1 inflight - assert!(!inflight_blocks.insert(1.into(), h256!("0x1").pack())); - assert!(inflight_blocks.insert(1.into(), h256!("0x2").pack())); + assert!(!inflight_blocks.insert(1.into(), h256!("0x1").pack(), 1)); + assert!(inflight_blocks.insert(1.into(), h256!("0x2").pack(), 2)); - assert!(inflight_blocks.insert(3.into(), h256!("0x3").pack())); + assert!(inflight_blocks.insert(3.into(), h256!("0x3").pack(), 3)); assert_eq!( inflight_blocks .inflight_state_by_block(&h256!("0x1").pack()) .cloned() - .map(|state| { state.peers }), - Some(HashSet::from_iter(vec![1.into(), 2.into()])) + .map(|state| { state.peer }), + Some(1.into()) ); assert_eq!( inflight_blocks .inflight_state_by_block(&h256!("0x3").pack()) - .map(|state| state.peers.iter().collect()), - Some(vec![&(3.into())]) + .map(|state| state.peer), + Some(3.into()) ); // peer 1 disconnect @@ -80,15 +76,15 @@ fn inflight_blocks_state() { assert_eq!( inflight_blocks .inflight_state_by_block(&h256!("0x1").pack()) - .map(|state| state.peers.iter().collect()), - Some(vec![&(2.into())]) + .map(|state| state.peer), + None ); assert_eq!( inflight_blocks .inflight_state_by_block(&h256!("0x3").pack()) - .map(|state| state.peers.iter().collect()), - Some(vec![&(3.into())]) + .map(|state| state.peer), + Some(3.into()) ); } @@ -99,42 +95,102 @@ fn inflight_blocks_timeout() { faketime::enable(&faketime_file); let mut inflight_blocks = InflightBlocks::default(); - assert!(inflight_blocks.insert(1.into(), h256!("0x1").pack())); - assert!(inflight_blocks.insert(1.into(), h256!("0x2").pack())); - assert!(inflight_blocks.insert(2.into(), h256!("0x2").pack())); - assert!(inflight_blocks.insert(1.into(), h256!("0x3").pack())); - assert!(inflight_blocks.insert(2.into(), h256!("0x3").pack())); + assert!(inflight_blocks.insert(1.into(), h256!("0x1").pack(), 1)); + assert!(inflight_blocks.insert(1.into(), h256!("0x2").pack(), 2)); + assert!(inflight_blocks.insert(2.into(), h256!("0x3").pack(), 3)); + assert!(!inflight_blocks.insert(1.into(), h256!("0x3").pack(), 3)); + assert!(inflight_blocks.insert(1.into(), h256!("0x4").pack(), 4)); + assert!(inflight_blocks.insert(2.into(), h256!("0x5").pack(), 5)); + assert!(!inflight_blocks.insert(2.into(), h256!("0x5").pack(), 5)); faketime::write_millis(&faketime_file, BLOCK_DOWNLOAD_TIMEOUT + 1).expect("write millis"); - assert!(!inflight_blocks.insert(3.into(), h256!("0x3").pack())); - assert!(!inflight_blocks.insert(3.into(), h256!("0x2").pack())); - assert!(inflight_blocks.insert(4.into(), h256!("0x4").pack())); - assert!(inflight_blocks.insert(1.into(), h256!("0x4").pack())); + assert!(!inflight_blocks.insert(3.into(), h256!("0x3").pack(), 3)); + assert!(!inflight_blocks.insert(3.into(), h256!("0x2").pack(), 2)); + assert!(inflight_blocks.insert(4.into(), h256!("0x6").pack(), 6)); + assert!(inflight_blocks.insert(1.into(), h256!("0x7").pack(), 7)); - inflight_blocks.prune(); - assert!(inflight_blocks.insert(3.into(), h256!("0x2").pack())); - assert!(inflight_blocks.insert(3.into(), h256!("0x3").pack())); + let peers = inflight_blocks.prune(0); + assert_eq!(peers, HashSet::from_iter(vec![1.into(), 2.into()])); + assert!(inflight_blocks.insert(3.into(), h256!("0x2").pack(), 2)); + assert!(inflight_blocks.insert(3.into(), h256!("0x3").pack(), 3)); assert_eq!( inflight_blocks .inflight_state_by_block(&h256!("0x3").pack()) - .map(|state| state.peers.iter().collect()), - Some(vec![&(3.into())]) + .map(|state| state.peer), + Some(3.into()) ); assert_eq!( inflight_blocks .inflight_state_by_block(&h256!("0x2").pack()) - .map(|state| state.peers.iter().collect()), - Some(vec![&(3.into())]) + .map(|state| state.peer), + Some(3.into()) ); assert_eq!( inflight_blocks - .inflight_state_by_block(&h256!("0x4").pack()) + .inflight_state_by_block(&h256!("0x6").pack()) .cloned() - .map(|state| { state.peers }), - Some(HashSet::from_iter(vec![1.into(), 4.into()])) + .map(|state| state.peer), + Some(4.into()) + ); +} + +#[cfg(not(disable_faketime))] +#[test] +fn inflight_trace_number_state() { + let faketime_file = faketime::millis_tempfile(0).expect("create faketime file"); + faketime::enable(&faketime_file); + + let mut inflight_blocks = InflightBlocks::default(); + + assert!(inflight_blocks.insert(1.into(), h256!("0x1").pack(), 1)); + assert!(inflight_blocks.insert(2.into(), h256!("0x2").pack(), 2)); + assert!(inflight_blocks.insert(3.into(), h256!("0x3").pack(), 3)); + assert!(inflight_blocks.insert(4.into(), h256!("0x33").pack(), 3)); + assert!(inflight_blocks.insert(5.into(), h256!("0x4").pack(), 4)); + assert!(inflight_blocks.insert(6.into(), h256!("0x5").pack(), 5)); + assert!(inflight_blocks.insert(7.into(), h256!("0x55").pack(), 5)); + + let list = inflight_blocks.prune(2); + assert!(list.is_empty()); + + let (next_number, (blocks, time)) = inflight_blocks.trace_number.iter().next().unwrap(); + + assert_eq!(next_number, &3); + assert_eq!( + blocks, + &HashSet::from_iter(vec![h256!("0x3").pack(), h256!("0x33").pack()]) ); + assert!(time.is_none()); + + // When an orphan block is inserted + { + if let Some((_, time)) = inflight_blocks.trace_number.get_mut(&3) { + *time = Some(faketime::unix_time_as_millis()) + } + } + + faketime::write_millis(&faketime_file, 2000).expect("write millis"); + + let list = inflight_blocks.prune(2); + assert!(list.is_empty()); + + let (next_number, (blocks, time)) = inflight_blocks.trace_number.iter().next().unwrap(); + + assert_eq!(next_number, &4); + assert_eq!(blocks, &HashSet::from_iter(vec![h256!("0x4").pack()])); + assert!(time.is_none()); + + assert!(inflight_blocks + .inflight_state_by_block(&h256!("0x3").pack()) + .is_none()); + assert!(inflight_blocks + .inflight_state_by_block(&h256!("0x33").pack()) + .is_none()); + + assert_eq!(inflight_blocks.peer_can_fetch_count(3.into()), 8); + assert_eq!(inflight_blocks.peer_can_fetch_count(4.into()), 8); } diff --git a/sync/src/types.rs b/sync/src/types.rs index 0799468e4c0..f8ced066902 100644 --- a/sync/src/types.rs +++ b/sync/src/types.rs @@ -1,9 +1,10 @@ use crate::block_status::BlockStatus; use crate::orphan_block_pool::OrphanBlockPool; use crate::BLOCK_DOWNLOAD_TIMEOUT; -use crate::MAX_PEERS_PER_BLOCK; use crate::{NetworkProtocol, SUSPEND_SYNC_TIME}; -use crate::{MAX_HEADERS_LEN, MAX_TIP_AGE, RETRY_ASK_TX_TIMEOUT_INCREASE}; +use crate::{ + MAX_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN, MAX_TIP_AGE, RETRY_ASK_TX_TIMEOUT_INCREASE, +}; use ckb_chain::chain::ChainController; use ckb_chain_spec::consensus::Consensus; use ckb_logger::{debug, debug_target, error, metric}; @@ -24,7 +25,7 @@ use failure::Error as FailureError; use faketime::unix_time_as_millis; use lru_cache::LruCache; use std::cmp; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}; use std::fmt; use std::hash::Hash; use std::mem; @@ -250,36 +251,103 @@ pub struct Peers { #[derive(Debug, Clone)] pub struct InflightState { - pub(crate) peers: HashSet, + pub(crate) peer: PeerIndex, pub(crate) timestamp: u64, + pub(crate) number: BlockNumber, } -impl Default for InflightState { - fn default() -> Self { - InflightState { - peers: HashSet::default(), +impl InflightState { + fn new(peer: PeerIndex, number: BlockNumber) -> Self { + Self { + peer, timestamp: unix_time_as_millis(), + number, } } } -impl InflightState { - pub fn remove(&mut self, peer: PeerIndex) { - self.peers.remove(&peer); +#[derive(Debug, Clone)] +pub struct DownloadScheduler { + task_count: usize, + timeout_count: usize, + hashes: HashSet, +} + +impl Default for DownloadScheduler { + fn default() -> Self { + Self { + hashes: HashSet::default(), + task_count: MAX_BLOCKS_IN_TRANSIT_PER_PEER, + timeout_count: 0, + } + } +} + +impl DownloadScheduler { + fn inflight_count(&self) -> usize { + self.hashes.len() + } + + fn can_fetch(&self) -> usize { + self.task_count.saturating_sub(self.hashes.len()) + } + + pub(crate) fn task_count(&self) -> usize { + self.task_count + } + + fn adjust(&mut self, time: u64, len: u64) { + let now = unix_time_as_millis(); + // 8 means default max outbound + let (a, b) = if len > 8 { (len >> 3, len % 8) } else { (1, 0) }; + // Dynamically adjust download tasks based on response time. + // + // Block max size about 700k, Under 10m/s bandwidth it may cost 1s to response + // But the mark is on the same time, so multiple tasks may be affected by one task, + // causing all responses to be in the range of reduced tasks. + // So the adjustment to reduce the number of tasks will be calculated by modulo 3 with the actual number of triggers. + match ((now - time) / a).saturating_sub(b * 100) { + 0..=500 => { + if self.task_count < 31 { + self.task_count += 2 + } + } + 501..=1000 => { + if self.task_count < 32 { + self.task_count += 1 + } + } + 1001..=1500 => (), + _ => { + self.timeout_count += 1; + if self.timeout_count > 3 { + self.task_count = self.task_count.saturating_sub(1); + self.timeout_count = 0; + } + } + } + } + + fn punish(&mut self) { + self.task_count >>= 1 } } #[derive(Clone)] pub struct InflightBlocks { - blocks: HashMap>, - states: HashMap, + pub(crate) download_schedulers: HashMap, + inflight_states: HashMap, + pub(crate) trace_number: BTreeMap, Option)>, + compact_reconstruct_inflight: HashMap>, } impl Default for InflightBlocks { fn default() -> Self { InflightBlocks { - blocks: HashMap::default(), - states: HashMap::default(), + download_schedulers: HashMap::default(), + inflight_states: HashMap::default(), + trace_number: BTreeMap::default(), + compact_reconstruct_inflight: HashMap::default(), } } } @@ -297,47 +365,133 @@ impl<'a> fmt::Debug for DebugHashSet<'a> { impl fmt::Debug for InflightBlocks { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_map() - .entries(self.blocks.iter().map(|(k, v)| (k, DebugHashSet(v)))) + .entries( + self.download_schedulers + .iter() + .map(|(k, v)| (k, DebugHashSet(&v.hashes))), + ) .finish()?; fmt.debug_map() - .entries(self.states.iter().map(|(k, v)| (format!("{}", k), v))) + .entries( + self.inflight_states + .iter() + .map(|(k, v)| (format!("{}", k), v)), + ) .finish() } } impl InflightBlocks { pub fn blocks_iter(&self) -> impl Iterator)> { - self.blocks.iter() + self.download_schedulers.iter().map(|(k, v)| (k, &v.hashes)) } pub fn total_inflight_count(&self) -> usize { - self.states.len() + self.inflight_states.len() } pub fn peer_inflight_count(&self, peer: PeerIndex) -> usize { - self.blocks.get(&peer).map(HashSet::len).unwrap_or(0) + self.download_schedulers + .get(&peer) + .map(DownloadScheduler::inflight_count) + .unwrap_or(0) } + + pub fn peer_can_fetch_count(&self, peer: PeerIndex) -> usize { + self.download_schedulers + .get(&peer) + .map(DownloadScheduler::can_fetch) + .unwrap_or(MAX_BLOCKS_IN_TRANSIT_PER_PEER) + } + pub fn inflight_block_by_peer(&self, peer: PeerIndex) -> Option<&HashSet> { - self.blocks.get(&peer) + self.download_schedulers.get(&peer).map(|d| &d.hashes) } pub fn inflight_state_by_block(&self, block: &Byte32) -> Option<&InflightState> { - self.states.get(block) + self.inflight_states.get(block) + } + + pub fn compact_reconstruct(&mut self, peer: PeerIndex, hash: Byte32) -> bool { + let entry = self.compact_reconstruct_inflight.entry(hash).or_default(); + if entry.len() >= 2 { + return false; + } + + entry.insert(peer) + } + + pub fn remove_compact(&mut self, peer: PeerIndex, hash: &Byte32) { + self.compact_reconstruct_inflight + .get_mut(&hash) + .map(|peers| peers.remove(&peer)); } - pub fn prune(&mut self) { + pub fn inflight_compact_by_block(&self, hash: &Byte32) -> Option<&HashSet> { + self.compact_reconstruct_inflight.get(hash) + } + + pub fn prune(&mut self, tip: BlockNumber) -> HashSet { let now = unix_time_as_millis(); let prev_count = self.total_inflight_count(); - let blocks = &mut self.blocks; - self.states.retain(|k, v| { + self.trace_number = self.trace_number.split_off(&(tip + 1)); + let mut disconnect_list = HashSet::new(); + + let trace = &mut self.trace_number; + let blocks = &mut self.download_schedulers; + let states = &mut self.inflight_states; + states.retain(|k, v| { let outdate = (v.timestamp + BLOCK_DOWNLOAD_TIMEOUT) < now; if outdate { - for peer in &v.peers { - blocks.get_mut(peer).map(|set| set.remove(k)); - } + blocks.get_mut(&v.peer).map(|set| set.hashes.remove(k)); + trace.entry(v.number).and_modify(|(trace, _)| { + trace.remove(k); + }); + disconnect_list.insert(v.peer); } !outdate }); + + blocks.retain(|k, v| { + // task number zero means this peer's response is very slow + let zero = v.task_count != 0; + if !zero { + disconnect_list.insert(*k); + } + zero + }); + + let should_remove = trace + .get(&(tip + 1)) + .map(|(hashes, timestamp)| { + // In the normal state, timestamp should always be None + // + // When a sync block is inserted to orphan pool, it means that + // there is an anomaly in the sync at tip + 1, i.e. a node is stuck, + // at which point it will be recorded as the timestamp at that time. + // + // If the time exceeds 1s, delete the task and halve the number of + // executable tasks for the corresponding node + if let Some(time) = timestamp { + if now - *time > 1000 { + for hash in hashes { + let state = states.remove(&hash).unwrap(); + if let Some(d) = blocks.get_mut(&state.peer) { + d.punish(); + d.hashes.remove(&hash); + }; + } + return true; + } + } + false + }) + .unwrap_or(false); + + if should_remove { + self.trace_number.remove(&(tip + 1)); + } + if prev_count == 0 { metric!({ "topic": "blocks_in_flight", @@ -349,32 +503,44 @@ impl InflightBlocks { "fields": { "total": self.total_inflight_count(), "elapsed": BLOCK_DOWNLOAD_TIMEOUT } }); } + + disconnect_list } - pub fn insert(&mut self, peer: PeerIndex, hash: Byte32) -> bool { - let state = self - .states - .entry(hash.clone()) - .or_insert_with(InflightState::default); - if state.peers.len() >= MAX_PEERS_PER_BLOCK { - return false; - } + pub fn insert(&mut self, peer: PeerIndex, hash: Byte32, number: BlockNumber) -> bool { + let state = self.inflight_states.entry(hash.clone()); + match state { + Entry::Occupied(_entry) => return false, + Entry::Vacant(entry) => entry.insert(InflightState::new(peer, number)), + }; - let blocks = self.blocks.entry(peer).or_insert_with(HashSet::default); - let ret = blocks.insert(hash); - if ret { - state.peers.insert(peer); - } + let blocks = self + .download_schedulers + .entry(peer) + .or_insert_with(DownloadScheduler::default); + let ret = blocks.hashes.insert(hash.clone()); + + let trace_number = self + .trace_number + .entry(number) + .or_insert_with(|| (HashSet::default(), None)); + trace_number.0.insert(hash); ret } pub fn remove_by_peer(&mut self, peer: PeerIndex) -> bool { - self.blocks + let trace = &mut self.trace_number; + let state = &mut self.inflight_states; + let compact = &mut self.compact_reconstruct_inflight; + self.download_schedulers .remove(&peer) .map(|blocks| { - for block in blocks { - if let Some(state) = self.states.get_mut(&block) { - state.remove(peer) + for block in blocks.hashes { + compact.get_mut(&block).map(|peers| peers.remove(&peer)); + if let Some(state) = state.remove(&block) { + trace.entry(state.number).and_modify(|(trace, _)| { + trace.remove(&block); + }); } } }) @@ -382,12 +548,21 @@ impl InflightBlocks { } pub fn remove_by_block(&mut self, block: Byte32) -> bool { - self.states + let blocks = &mut self.download_schedulers; + let trace = &mut self.trace_number; + let compact = &mut self.compact_reconstruct_inflight; + let len = blocks.len() as u64; + self.inflight_states .remove(&block) .map(|state| { - for peer in state.peers { - self.blocks.get_mut(&peer).map(|set| set.remove(&block)); - } + if let Some(set) = blocks.get_mut(&state.peer) { + set.hashes.remove(&block); + compact.remove(&block); + set.adjust(state.timestamp, len); + trace.entry(state.number).and_modify(|(trace, _)| { + trace.remove(&block); + }); + }; state.timestamp }) .map(|timestamp| { @@ -475,6 +650,22 @@ impl Peers { }) } + pub fn get_best_known_less_than_tip(&self, tip: BlockNumber) -> Vec { + self.state + .read() + .iter() + .filter_map(|(pi, state)| { + if !state.unknown_header_list.is_empty() { + return None; + } + match state.best_known_header { + Some(ref header) if header.number() < tip => Some(*pi), + _ => None, + } + }) + .collect() + } + pub fn take_unknown_last(&self, peer: PeerIndex) -> Option { self.state .write() @@ -723,6 +914,17 @@ impl SyncShared { block.header().hash() ); self.state.insert_orphan_block(pi, (*block).clone()); + let tip = self.shared.snapshot().tip_number(); + if let Some(entry) = self + .state() + .write_inflight_blocks() + .trace_number + .get_mut(&(tip + 1)) + { + if entry.1.is_none() { + entry.1 = Some(unix_time_as_millis()); + } + } return Ok(false); } diff --git a/test/src/specs/sync/get_blocks.rs b/test/src/specs/sync/get_blocks.rs index 59823004d5d..01f91c01464 100644 --- a/test/src/specs/sync/get_blocks.rs +++ b/test/src/specs/sync/get_blocks.rs @@ -1,5 +1,5 @@ use super::utils::wait_get_blocks; -use crate::utils::build_headers; +use crate::utils::{build_headers, wait_until}; use crate::{Net, Spec, TestProtocol}; use ckb_sync::{NetworkProtocol, BLOCK_DOWNLOAD_TIMEOUT}; use ckb_types::core::HeaderView; @@ -37,10 +37,19 @@ impl Spec for GetBlocksTimeout { let (first, received) = wait_get_blocks_point(block_download_timeout_secs * 2, &net); assert!(received, "Should received GetBlocks"); let (second, received) = wait_get_blocks_point(block_download_timeout_secs * 2, &net); - assert!(received, "Should received GetBlocks"); + assert!(!received, "Should not received GetBlocks"); let elapsed = second.duration_since(first).as_secs(); let error_margin = 2; assert!(elapsed >= block_download_timeout_secs - error_margin); + + let rpc_client = node1.rpc_client(); + let result = wait_until(10, || { + let peers = rpc_client.get_peers(); + peers.is_empty() + }); + if !result { + panic!("node1 must disconnect net"); + } } }