diff --git a/Cargo.lock b/Cargo.lock index e2053ffdbd..7fd8c8814a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -691,7 +691,7 @@ dependencies = [ "snap", "tempfile", "tentacle", - "tokio 0.2.17", + "tokio 0.2.18", "tokio-util", ] @@ -974,6 +974,7 @@ dependencies = [ "ckb-types", "ckb-util", "ckb-verification", + "crossbeam-channel", "failure", "faketime", "futures 0.3.4", @@ -3981,7 +3982,7 @@ dependencies = [ "molecule 0.5.0", "tentacle-multiaddr", "tentacle-secio", - "tokio 0.2.17", + "tokio 0.2.18", "tokio-util", "tokio-yamux", "winapi 0.3.8", @@ -4016,7 +4017,7 @@ dependencies = [ "rand 0.7.0", "ring", "secp256k1 0.17.2", - "tokio 0.2.17", + "tokio 0.2.18", "tokio-util", "unsigned-varint", ] @@ -4144,9 +4145,9 @@ dependencies = [ [[package]] name = "tokio" -version = "0.2.17" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39fb9142eb6e9cc37f4f29144e62618440b149a138eee01a7bbe9b9226aaf17c" +checksum = "34ef16d072d2b6dc8b4a56c70f5c5ced1a37752116f8e7c1e80c659aa7cb6713" dependencies = [ "bytes 0.5.4", "futures-core", @@ -4348,7 +4349,7 @@ dependencies = [ "futures-sink", "log 0.4.8", "pin-project-lite", - "tokio 0.2.17", + "tokio 0.2.18", ] [[package]] @@ -4360,7 +4361,7 @@ dependencies = [ "bytes 0.5.4", "futures 0.3.4", "log 0.4.8", - "tokio 0.2.17", + "tokio 0.2.18", "tokio-util", ] diff --git a/network/src/protocols/mod.rs b/network/src/protocols/mod.rs index 415faf4eef..70c32487e6 100644 --- a/network/src/protocols/mod.rs +++ b/network/src/protocols/mod.rs @@ -66,6 +66,9 @@ pub trait CKBProtocolContext: Send { fn send_paused(&self) -> bool; // Other methods fn protocol_id(&self) -> ProtocolId; + fn p2p_control(&self) -> Option<&ServiceControl> { + None + } } pub trait CKBProtocolHandler: Sync + Send { @@ -399,6 +402,10 @@ impl CKBProtocolContext for DefaultCKBProtocolContext { fn send_paused(&self) -> bool { self.send_paused } + + fn p2p_control(&self) -> Option<&ServiceControl> { + Some(&self.p2p_control) + } } pub(crate) struct BlockingFutureTask { diff --git a/sync/Cargo.toml b/sync/Cargo.toml index f5ce6163a5..3a99146c92 100644 --- a/sync/Cargo.toml +++ b/sync/Cargo.toml @@ -24,6 +24,7 @@ sentry = "0.16.0" futures = "0.3" ckb-error = {path = "../error"} ckb-tx-pool = { path = "../tx-pool" } +crossbeam-channel = "0.3" [dev-dependencies] ckb-test-chain-utils = { path = "../util/test-chain-utils" } diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 07de21b279..a5008016a5 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -26,10 +26,15 @@ pub const MAX_INVENTORY_LEN: usize = 50_000; pub const MAX_SCHEDULED_LEN: usize = 4 * 1024; pub const MAX_BLOCKS_TO_ANNOUNCE: usize = 8; pub const MAX_UNCONNECTING_HEADERS: usize = 10; -pub const MAX_BLOCKS_IN_TRANSIT_PER_PEER: usize = 16; pub const MAX_TIP_AGE: u64 = 24 * 60 * 60 * 1000; pub const STALE_RELAY_AGE_LIMIT: u64 = 30 * 24 * 60 * 60 * 1000; +/* About Download Scheduler */ +pub const INIT_BLOCKS_IN_TRANSIT_PER_PEER: usize = 16; +pub const FIRST_LEVEL_MAX: usize = 32; +pub const MAX_BLOCKS_IN_TRANSIT_PER_PEER: usize = 128; +pub const CHECK_POINT_WINDOW: u64 = (MAX_BLOCKS_IN_TRANSIT_PER_PEER * 4) as u64; + pub(crate) const LOG_TARGET_RELAY: &str = "ckb-relay"; use ckb_network::ProtocolId; @@ -51,7 +56,6 @@ impl Into for NetworkProtocol { pub const HEADERS_DOWNLOAD_TIMEOUT_BASE: u64 = 6 * 60 * 1000; // 6 minutes pub const HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER: u64 = 1; // 1ms/header pub const POW_SPACE: u64 = 10_000; // 10s -pub const MAX_PEERS_PER_BLOCK: usize = 2; // Protect at least this many outbound peers from disconnection due to slow // behind headers chain. diff --git a/sync/src/relayer/block_transactions_process.rs b/sync/src/relayer/block_transactions_process.rs index 40f9b32f4b..ff42ac735e 100644 --- a/sync/src/relayer/block_transactions_process.rs +++ b/sync/src/relayer/block_transactions_process.rs @@ -61,6 +61,14 @@ impl<'a> BlockTransactionsProcess<'a> { let missing_uncles: Vec; let mut collision = false; + { + self.relayer + .shared + .state() + .write_inflight_blocks() + .remove_compact(self.peer, &block_hash); + } + if let Entry::Occupied(mut pending) = shared .state() .pending_compact_blocks() @@ -135,18 +143,24 @@ impl<'a> BlockTransactionsProcess<'a> { assert!(!missing_transactions.is_empty() || !missing_uncles.is_empty()); - let content = packed::GetBlockTransactions::new_builder() - .block_hash(block_hash.clone()) - .indexes(missing_transactions.pack()) - .uncle_indexes(missing_uncles.pack()) - .build(); - let message = packed::RelayMessage::new_builder().set(content).build(); + if shared + .state() + .write_inflight_blocks() + .compact_reconstruct(self.peer, block_hash.clone()) + { + let content = packed::GetBlockTransactions::new_builder() + .block_hash(block_hash.clone()) + .indexes(missing_transactions.pack()) + .uncle_indexes(missing_uncles.pack()) + .build(); + let message = packed::RelayMessage::new_builder().set(content).build(); - if let Err(err) = self.nc.send_message_to(self.peer, message.as_bytes()) { - return StatusCode::Network - .with_context(format!("Send GetBlockTransactions error: {:?}", err,)); + if let Err(err) = self.nc.send_message_to(self.peer, message.as_bytes()) { + return StatusCode::Network + .with_context(format!("Send GetBlockTransactions error: {:?}", err,)); + } + crate::relayer::log_sent_metric(message.to_enum().item_name()); } - crate::relayer::log_sent_metric(message.to_enum().item_name()); mem::replace(expected_transaction_indexes, missing_transactions); mem::replace(expected_uncle_indexes, missing_uncles); diff --git a/sync/src/relayer/compact_block_process.rs b/sync/src/relayer/compact_block_process.rs index 0a63d6e059..a0f5c9e96d 100644 --- a/sync/src/relayer/compact_block_process.rs +++ b/sync/src/relayer/compact_block_process.rs @@ -122,12 +122,12 @@ impl<'a> CompactBlockProcess<'a> { let parent = parent.unwrap(); - if let Some(flight) = shared + if let Some(peers) = shared .state() .read_inflight_blocks() - .inflight_state_by_block(&block_hash) + .inflight_compact_by_block(&block_hash) { - if flight.peers.contains(&self.peer) { + if peers.contains(&self.peer) { debug_target!( crate::LOG_TARGET_RELAY, "discard already in-flight compact block {}", @@ -240,7 +240,7 @@ impl<'a> CompactBlockProcess<'a> { if !shared .state() .write_inflight_blocks() - .insert(self.peer, block_hash.clone()) + .compact_reconstruct(self.peer, block_hash.clone()) { debug_target!( crate::LOG_TARGET_RELAY, diff --git a/sync/src/relayer/tests/compact_block_process.rs b/sync/src/relayer/tests/compact_block_process.rs index a93a43a41a..23a63bcc42 100644 --- a/sync/src/relayer/tests/compact_block_process.rs +++ b/sync/src/relayer/tests/compact_block_process.rs @@ -2,7 +2,6 @@ use crate::block_status::BlockStatus; use crate::relayer::compact_block_process::CompactBlockProcess; use crate::relayer::tests::helper::{build_chain, new_header_builder, MockProtocalContext}; use crate::types::InflightBlocks; -use crate::MAX_PEERS_PER_BLOCK; use crate::{NetworkProtocol, Status, StatusCode}; use ckb_network::PeerIndex; use ckb_store::ChainStore; @@ -201,7 +200,7 @@ fn test_already_in_flight() { // Already in flight let mut in_flight_blocks = InflightBlocks::default(); - in_flight_blocks.insert(peer_index, block.header().hash()); + in_flight_blocks.compact_reconstruct(peer_index, block.header().hash()); *relayer.shared.state().write_inflight_blocks() = in_flight_blocks; let compact_block_process = CompactBlockProcess::new( @@ -349,8 +348,8 @@ fn test_inflight_blocks_reach_limit() { // in_flight_blocks is full { let mut in_flight_blocks = InflightBlocks::default(); - for i in 0..=MAX_PEERS_PER_BLOCK { - in_flight_blocks.insert(i.into(), block.header().hash()); + for i in 0..=2 { + in_flight_blocks.compact_reconstruct(i.into(), block.header().hash()); } *relayer.shared.state().write_inflight_blocks() = in_flight_blocks; } diff --git a/sync/src/synchronizer/block_fetcher.rs b/sync/src/synchronizer/block_fetcher.rs index ff06032802..0f9d83c030 100644 --- a/sync/src/synchronizer/block_fetcher.rs +++ b/sync/src/synchronizer/block_fetcher.rs @@ -1,21 +1,21 @@ use crate::block_status::BlockStatus; use crate::synchronizer::Synchronizer; use crate::types::{ActiveChain, HeaderView, IBDState}; -use crate::{BLOCK_DOWNLOAD_WINDOW, MAX_BLOCKS_IN_TRANSIT_PER_PEER}; +use crate::BLOCK_DOWNLOAD_WINDOW; use ckb_logger::{debug, trace}; use ckb_network::PeerIndex; use ckb_types::{core, packed}; use std::cmp::min; -pub struct BlockFetcher { - synchronizer: Synchronizer, +pub struct BlockFetcher<'a> { + synchronizer: &'a Synchronizer, peer: PeerIndex, active_chain: ActiveChain, ibd: IBDState, } -impl BlockFetcher { - pub fn new(synchronizer: Synchronizer, peer: PeerIndex, ibd: IBDState) -> Self { +impl<'a> BlockFetcher<'a> { + pub fn new(synchronizer: &'a Synchronizer, peer: PeerIndex, ibd: IBDState) -> Self { let active_chain = synchronizer.shared.active_chain(); BlockFetcher { peer, @@ -29,7 +29,7 @@ impl BlockFetcher { let inflight = self.synchronizer.shared().state().read_inflight_blocks(); // Can't download any more from this peer - inflight.peer_inflight_count(self.peer) >= MAX_BLOCKS_IN_TRANSIT_PER_PEER + inflight.peer_can_fetch_count(self.peer) == 0 } pub fn is_better_chain(&self, header: &HeaderView) -> bool { @@ -69,7 +69,7 @@ impl BlockFetcher { Some(fixed_last_common_header) } - pub fn fetch(self) -> Option> { + pub fn fetch(self) -> Option>> { trace!("[block downloader] BlockFetcher process"); if self.reached_inflight_limit() { @@ -130,7 +130,7 @@ impl BlockFetcher { let end = min(best_known_header.number(), start + BLOCK_DOWNLOAD_WINDOW); let n_fetch = min( end.saturating_sub(start) as usize + 1, - MAX_BLOCKS_IN_TRANSIT_PER_PEER.saturating_sub(inflight.peer_inflight_count(self.peer)), + inflight.peer_can_fetch_count(self.peer), ); let mut fetch = Vec::with_capacity(n_fetch); @@ -146,16 +146,21 @@ impl BlockFetcher { for _ in 0..span { let parent_hash = header.parent_hash(); let hash = header.hash(); - // NOTE: Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding - // stopping synchronization even when orphan_pool maintains dirty items by bugs. - let stored = self - .active_chain - .contains_block_status(&hash, BlockStatus::BLOCK_STORED); - if stored { + + let status = self.active_chain.get_block_status(&hash); + if status == BlockStatus::BLOCK_STORED { // If the block is stored, its ancestor must on store // So we can skip the search of this space directly break; - } else if inflight.insert(self.peer, hash) { + } else if self.ibd.into() && status.contains(BlockStatus::BLOCK_RECEIVED) { + // NOTE: NO-IBD Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding + // stopping synchronization even when orphan_pool maintains dirty items by bugs. + // TODO: If async validation is achieved, then the IBD state judgement here can be removed + + // On IBD, BLOCK_RECEIVED means this block had been received, so this block doesn't need to fetch + // On NO-IBD, because of the model, this block has to be requested again + // But all of these can do nothing on this branch + } else if inflight.insert(self.peer, (header.number(), hash).into()) { fetch.push(header) } @@ -172,6 +177,20 @@ impl BlockFetcher { // The headers in `fetch` may be unordered. Sort them by number. fetch.sort_by_key(|header| header.number()); - Some(fetch.iter().map(core::HeaderView::hash).collect()) + + let tip = self.active_chain.tip_number(); + let should_mark = fetch.last().map_or(false, |header| { + header.number().saturating_sub(crate::CHECK_POINT_WINDOW) > tip + }); + if should_mark { + inflight.mark_slow_block(tip); + } + + Some( + fetch + .chunks(crate::INIT_BLOCKS_IN_TRANSIT_PER_PEER) + .map(|headers| headers.iter().map(core::HeaderView::hash).collect()) + .collect(), + ) } } diff --git a/sync/src/synchronizer/get_blocks_process.rs b/sync/src/synchronizer/get_blocks_process.rs index 23d41076d1..e8cf2248cd 100644 --- a/sync/src/synchronizer/get_blocks_process.rs +++ b/sync/src/synchronizer/get_blocks_process.rs @@ -1,6 +1,6 @@ use crate::block_status::BlockStatus; use crate::synchronizer::Synchronizer; -use crate::{Status, StatusCode, MAX_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN}; +use crate::{Status, StatusCode, INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN}; use ckb_logger::debug; use ckb_network::{CKBProtocolContext, PeerIndex}; use ckb_types::{packed, prelude::*}; @@ -29,7 +29,7 @@ impl<'a> GetBlocksProcess<'a> { pub fn execute(self) -> Status { let block_hashes = self.message.block_hashes(); - // use MAX_HEADERS_LEN as limit, we may increase the value of MAX_BLOCKS_IN_TRANSIT_PER_PEER in the future + // use MAX_HEADERS_LEN as limit, we may increase the value of INIT_BLOCKS_IN_TRANSIT_PER_PEER in the future if block_hashes.len() > MAX_HEADERS_LEN { return StatusCode::ProtocolMessageIsMalformed.with_context(format!( "BlockHashes count({}) > MAX_HEADERS_LEN({})", @@ -39,7 +39,7 @@ impl<'a> GetBlocksProcess<'a> { } let active_chain = self.synchronizer.shared.active_chain(); - for block_hash in block_hashes.iter().take(MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + for block_hash in block_hashes.iter().take(INIT_BLOCKS_IN_TRANSIT_PER_PEER) { debug!("get_blocks {} from peer {:?}", block_hash, self.peer); let block_hash = block_hash.to_entity(); diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index 23f67fb51f..730c5021fe 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -20,12 +20,14 @@ use crate::{ }; use ckb_chain::chain::ChainController; use ckb_logger::{debug, error, info, metric, trace, warn}; -use ckb_network::{bytes::Bytes, CKBProtocolContext, CKBProtocolHandler, PeerIndex}; +use ckb_network::{ + bytes::Bytes, CKBProtocolContext, CKBProtocolHandler, PeerIndex, ServiceControl, +}; use ckb_types::{core, packed, prelude::*}; use failure::Error as FailureError; use faketime::unix_time_as_millis; use std::cmp::min; -use std::sync::atomic::Ordering; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -39,15 +41,72 @@ const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_millis(200); const IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(40); const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200); +enum FetchCMD { + Fetch(Vec), +} + +struct BlockFetchCMD { + can_fetch_block: Arc, + sync: Synchronizer, + p2p_control: ServiceControl, + recv: crossbeam_channel::Receiver, +} + +impl BlockFetchCMD { + fn run(&self) { + while let Ok(cmd) = self.recv.recv() { + match cmd { + FetchCMD::Fetch(peers) => { + self.can_fetch_block.store(false, Ordering::Release); + for peer in peers { + if let Some(fetch) = + BlockFetcher::new(&self.sync, peer, IBDState::In).fetch() + { + for item in fetch { + BlockFetchCMD::send_getblocks(item, &self.p2p_control, peer); + } + } + } + self.can_fetch_block.store(true, Ordering::Release) + } + } + } + } + + fn send_getblocks(v_fetch: Vec, nc: &ServiceControl, peer: PeerIndex) { + let content = packed::GetBlocks::new_builder() + .block_hashes(v_fetch.clone().pack()) + .build(); + let message = packed::SyncMessage::new_builder().set(content).build(); + + debug!("send_getblocks len={:?} to peer={}", v_fetch.len(), peer); + if let Err(err) = nc.send_message_to( + peer, + crate::NetworkProtocol::SYNC.into(), + message.as_bytes(), + ) { + debug!("synchronizer send GetBlocks error: {:?}", err); + } + crate::synchronizer::log_sent_metric(message.to_enum().item_name()); + } +} + #[derive(Clone)] pub struct Synchronizer { chain: ChainController, pub shared: Arc, + can_fetch_block: Arc, + fetch_channel: Option>, } impl Synchronizer { pub fn new(chain: ChainController, shared: Arc) -> Synchronizer { - Synchronizer { chain, shared } + Synchronizer { + chain, + shared, + can_fetch_block: Arc::new(AtomicBool::new(true)), + fetch_channel: None, + } } pub fn shared(&self) -> &Arc { @@ -163,8 +222,8 @@ impl Synchronizer { &self, peer: PeerIndex, ibd: IBDState, - ) -> Option> { - BlockFetcher::new(self.clone(), peer, ibd).fetch() + ) -> Option>> { + BlockFetcher::new(&self, peer, ibd).fetch() } fn on_connected(&self, nc: &dyn CKBProtocolContext, peer: PeerIndex) { @@ -357,33 +416,111 @@ impl Synchronizer { } } - fn find_blocks_to_fetch(&self, nc: &dyn CKBProtocolContext, ibd: IBDState) { + fn find_blocks_to_fetch(&mut self, nc: &dyn CKBProtocolContext, ibd: IBDState) { + let tip = self.shared.active_chain().tip_number(); + + let disconnect_list = { + let mut list = self.shared().state().write_inflight_blocks().prune(tip); + if let IBDState::In = ibd { + // best known < tip and in IBD state, and unknown list is empty, + // these node can be disconnect + list.extend( + self.shared + .state() + .peers() + .get_best_known_less_than_tip_and_unknown_empty(tip), + ) + }; + list + }; + + for peer in disconnect_list.iter() { + if self + .peers() + .get_flag(*peer) + .map(|flag| flag.is_whitelist || flag.is_protect) + .unwrap_or(false) + { + continue; + } + if let Err(err) = nc.disconnect(*peer, "sync disconnect") { + debug!("synchronizer disconnect error: {:?}", err); + } + } + + if ibd.into() && !self.can_fetch_block.load(Ordering::Acquire) { + return; + } + let peers: Vec = { - self.peers() + let state = &self + .shared + .state() + .read_inflight_blocks() + .download_schedulers; + let mut peers: Vec = self + .peers() .state .read() .iter() - .filter(|(_, state)| match ibd { - IBDState::In => { - state.peer_flags.is_outbound - || state.peer_flags.is_whitelist - || state.peer_flags.is_protect + .filter(|(id, state)| { + if disconnect_list.contains(id) { + return false; + }; + match ibd { + IBDState::In => { + state.peer_flags.is_outbound + || state.peer_flags.is_whitelist + || state.peer_flags.is_protect + } + IBDState::Out => state.sync_started, } - IBDState::Out => state.sync_started, }) .map(|(peer_id, _)| peer_id) .cloned() - .collect() + .collect(); + peers.sort_by_key(|id| { + state + .get(id) + .map_or(crate::INIT_BLOCKS_IN_TRANSIT_PER_PEER, |d| d.task_count()) + }); + peers.reverse(); + peers }; trace!("poll find_blocks_to_fetch select peers"); - { - self.shared().state().write_inflight_blocks().prune(); - } - for peer in peers { - if let Some(fetch) = self.get_blocks_to_fetch(peer, ibd) { - if !fetch.is_empty() { - self.send_getblocks(fetch, nc, peer); + // fetch use a lot of cpu time, especially in ibd state + // so, the fetch function use another thread + match nc.p2p_control() { + Some(raw) => match self.fetch_channel { + Some(ref sender) => { + let _ = sender.try_send(FetchCMD::Fetch(peers)); + } + None => { + let p2p_control = raw.clone(); + let sync = self.clone(); + let can_fetch_block = Arc::clone(&self.can_fetch_block); + let (sender, recv) = crossbeam_channel::bounded(2); + sender.send(FetchCMD::Fetch(peers)).unwrap(); + self.fetch_channel = Some(sender); + ::std::thread::spawn(move || { + BlockFetchCMD { + sync, + p2p_control, + recv, + can_fetch_block, + } + .run(); + }); + } + }, + _ => { + for peer in peers { + if let Some(fetch) = self.get_blocks_to_fetch(peer, ibd) { + for item in fetch { + self.send_getblocks(item, nc, peer); + } + } } } } @@ -527,6 +664,9 @@ impl CKBProtocolHandler for Synchronizer { if self.shared.active_chain().is_initial_block_download() { self.find_blocks_to_fetch(nc.as_ref(), IBDState::In); } else { + { + self.shared.state().write_inflight_blocks().adjustment = false; + } self.shared.state().peers().clear_unknown_list(); if nc.remove_notify(IBD_BLOCK_FETCH_TOKEN).is_err() { trace!("remove ibd block fetch fail"); @@ -1149,16 +1289,16 @@ mod tests { .unwrap(); assert_eq!( - blocks_to_fetch.first().unwrap(), + blocks_to_fetch[0].first().unwrap(), &shared2.store().get_block_hash(193).unwrap() ); assert_eq!( - blocks_to_fetch.last().unwrap(), + blocks_to_fetch[0].last().unwrap(), &shared2.store().get_block_hash(200).unwrap() ); let mut fetched_blocks = Vec::new(); - for block_hash in &blocks_to_fetch { + for block_hash in &blocks_to_fetch[0] { fetched_blocks.push(shared2.store().get_block(block_hash).unwrap()); } @@ -1176,7 +1316,7 @@ mod tests { .get_last_common_header(peer1) .unwrap() .hash(), - blocks_to_fetch.last().unwrap() + blocks_to_fetch[0].last().unwrap() ); } diff --git a/sync/src/tests/inflight_blocks.rs b/sync/src/tests/inflight_blocks.rs index 56858722e9..d8131b454e 100644 --- a/sync/src/tests/inflight_blocks.rs +++ b/sync/src/tests/inflight_blocks.rs @@ -1,4 +1,4 @@ -use crate::types::InflightBlocks; +use crate::types::{BlockNumberAndHash, InflightBlocks}; use crate::BLOCK_DOWNLOAD_TIMEOUT; use ckb_types::prelude::*; use ckb_types::{h256, H256}; @@ -9,29 +9,25 @@ use std::iter::FromIterator; fn inflight_blocks_count() { let mut inflight_blocks = InflightBlocks::default(); - // allow 2 peer for one block - assert!(inflight_blocks.insert(1.into(), h256!("0x1").pack())); - assert!(inflight_blocks.insert(2.into(), h256!("0x1").pack())); - assert!(!inflight_blocks.insert(3.into(), h256!("0x1").pack())); + // don't allow 2 peer for one block + assert!(inflight_blocks.insert(2.into(), (1, h256!("0x1").pack()).into())); + assert!(!inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into())); // peer 1 inflight - assert!(!inflight_blocks.insert(1.into(), h256!("0x1").pack())); + assert!(!inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into())); - assert!(inflight_blocks.insert(1.into(), h256!("0x2").pack())); + assert!(inflight_blocks.insert(1.into(), (2, h256!("0x2").pack()).into())); assert_eq!(inflight_blocks.total_inflight_count(), 2); // 0x1 0x2 - assert_eq!(inflight_blocks.peer_inflight_count(1.into()), 2); + assert_eq!(inflight_blocks.peer_inflight_count(1.into()), 1); assert_eq!(inflight_blocks.peer_inflight_count(2.into()), 1); // one block inflight assert_eq!( inflight_blocks.inflight_block_by_peer(1.into()).cloned(), - Some(HashSet::from_iter(vec![ - h256!("0x1").pack(), - h256!("0x2").pack() - ])) + Some(HashSet::from_iter(vec![(2, h256!("0x2").pack()).into()])) ); // receive block 0x1 - inflight_blocks.remove_by_block(h256!("0x1").pack()); + inflight_blocks.remove_by_block((1, h256!("0x1").pack()).into()); assert_eq!(inflight_blocks.total_inflight_count(), 1); // 0x2 assert_eq!(inflight_blocks.peer_inflight_count(1.into()), 1); @@ -40,7 +36,7 @@ fn inflight_blocks_count() { inflight_blocks .inflight_block_by_peer(1.into()) .map(|set| set.iter().collect()), - Some(vec![&h256!("0x2").pack()]) + Some(vec![&(2, h256!("0x2").pack()).into()]) ); } @@ -48,29 +44,29 @@ fn inflight_blocks_count() { fn inflight_blocks_state() { let mut inflight_blocks = InflightBlocks::default(); - assert!(inflight_blocks.insert(1.into(), h256!("0x1").pack())); - assert!(inflight_blocks.insert(2.into(), h256!("0x1").pack())); - assert!(!inflight_blocks.insert(3.into(), h256!("0x1").pack())); + assert!(inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into())); + assert!(!inflight_blocks.insert(2.into(), (1, h256!("0x1").pack()).into())); + assert!(!inflight_blocks.insert(3.into(), (1, h256!("0x1").pack()).into())); // peer 1 inflight - assert!(!inflight_blocks.insert(1.into(), h256!("0x1").pack())); - assert!(inflight_blocks.insert(1.into(), h256!("0x2").pack())); + assert!(!inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into())); + assert!(inflight_blocks.insert(1.into(), (2, h256!("0x2").pack()).into())); - assert!(inflight_blocks.insert(3.into(), h256!("0x3").pack())); + assert!(inflight_blocks.insert(3.into(), (3, h256!("0x3").pack()).into())); assert_eq!( inflight_blocks - .inflight_state_by_block(&h256!("0x1").pack()) + .inflight_state_by_block(&(1, h256!("0x1").pack()).into()) .cloned() - .map(|state| { state.peers }), - Some(HashSet::from_iter(vec![1.into(), 2.into()])) + .map(|state| { state.peer }), + Some(1.into()) ); assert_eq!( inflight_blocks - .inflight_state_by_block(&h256!("0x3").pack()) - .map(|state| state.peers.iter().collect()), - Some(vec![&(3.into())]) + .inflight_state_by_block(&(3, h256!("0x3").pack()).into()) + .map(|state| state.peer), + Some(3.into()) ); // peer 1 disconnect @@ -79,16 +75,16 @@ fn inflight_blocks_state() { assert_eq!( inflight_blocks - .inflight_state_by_block(&h256!("0x1").pack()) - .map(|state| state.peers.iter().collect()), - Some(vec![&(2.into())]) + .inflight_state_by_block(&(1, h256!("0x1").pack()).into()) + .map(|state| state.peer), + None ); assert_eq!( inflight_blocks - .inflight_state_by_block(&h256!("0x3").pack()) - .map(|state| state.peers.iter().collect()), - Some(vec![&(3.into())]) + .inflight_state_by_block(&(3, h256!("0x3").pack()).into()) + .map(|state| state.peer), + Some(3.into()) ); } @@ -99,42 +95,102 @@ fn inflight_blocks_timeout() { faketime::enable(&faketime_file); let mut inflight_blocks = InflightBlocks::default(); - assert!(inflight_blocks.insert(1.into(), h256!("0x1").pack())); - assert!(inflight_blocks.insert(1.into(), h256!("0x2").pack())); - assert!(inflight_blocks.insert(2.into(), h256!("0x2").pack())); - assert!(inflight_blocks.insert(1.into(), h256!("0x3").pack())); - assert!(inflight_blocks.insert(2.into(), h256!("0x3").pack())); + assert!(inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into())); + assert!(inflight_blocks.insert(1.into(), (2, h256!("0x2").pack()).into())); + assert!(inflight_blocks.insert(2.into(), (3, h256!("0x3").pack()).into())); + assert!(!inflight_blocks.insert(1.into(), (3, h256!("0x3").pack()).into())); + assert!(inflight_blocks.insert(1.into(), (4, h256!("0x4").pack()).into())); + assert!(inflight_blocks.insert(2.into(), (5, h256!("0x5").pack()).into())); + assert!(!inflight_blocks.insert(2.into(), (5, h256!("0x5").pack()).into())); faketime::write_millis(&faketime_file, BLOCK_DOWNLOAD_TIMEOUT + 1).expect("write millis"); - assert!(!inflight_blocks.insert(3.into(), h256!("0x3").pack())); - assert!(!inflight_blocks.insert(3.into(), h256!("0x2").pack())); - assert!(inflight_blocks.insert(4.into(), h256!("0x4").pack())); - assert!(inflight_blocks.insert(1.into(), h256!("0x4").pack())); + assert!(!inflight_blocks.insert(3.into(), (3, h256!("0x3").pack()).into())); + assert!(!inflight_blocks.insert(3.into(), (2, h256!("0x2").pack()).into())); + assert!(inflight_blocks.insert(4.into(), (6, h256!("0x6").pack()).into())); + assert!(inflight_blocks.insert(1.into(), (7, h256!("0x7").pack()).into())); - inflight_blocks.prune(); - assert!(inflight_blocks.insert(3.into(), h256!("0x2").pack())); - assert!(inflight_blocks.insert(3.into(), h256!("0x3").pack())); + let peers = inflight_blocks.prune(0); + assert_eq!(peers, HashSet::from_iter(vec![1.into(), 2.into()])); + assert!(inflight_blocks.insert(3.into(), (2, h256!("0x2").pack()).into())); + assert!(inflight_blocks.insert(3.into(), (3, h256!("0x3").pack()).into())); assert_eq!( inflight_blocks - .inflight_state_by_block(&h256!("0x3").pack()) - .map(|state| state.peers.iter().collect()), - Some(vec![&(3.into())]) + .inflight_state_by_block(&(3, h256!("0x3").pack()).into()) + .map(|state| state.peer), + Some(3.into()) ); assert_eq!( inflight_blocks - .inflight_state_by_block(&h256!("0x2").pack()) - .map(|state| state.peers.iter().collect()), - Some(vec![&(3.into())]) + .inflight_state_by_block(&(2, h256!("0x2").pack()).into()) + .map(|state| state.peer), + Some(3.into()) ); assert_eq!( inflight_blocks - .inflight_state_by_block(&h256!("0x4").pack()) + .inflight_state_by_block(&(6, h256!("0x6").pack()).into()) .cloned() - .map(|state| { state.peers }), - Some(HashSet::from_iter(vec![1.into(), 4.into()])) + .map(|state| state.peer), + Some(4.into()) ); } + +#[cfg(not(disable_faketime))] +#[test] +fn inflight_trace_number_state() { + let faketime_file = faketime::millis_tempfile(0).expect("create faketime file"); + faketime::enable(&faketime_file); + + let mut inflight_blocks = InflightBlocks::default(); + + assert!(inflight_blocks.insert(1.into(), (1, h256!("0x1").pack()).into())); + assert!(inflight_blocks.insert(2.into(), (2, h256!("0x2").pack()).into())); + assert!(inflight_blocks.insert(3.into(), (3, h256!("0x3").pack()).into())); + assert!(inflight_blocks.insert(4.into(), (3, h256!("0x33").pack()).into())); + assert!(inflight_blocks.insert(5.into(), (4, h256!("0x4").pack()).into())); + assert!(inflight_blocks.insert(6.into(), (5, h256!("0x5").pack()).into())); + assert!(inflight_blocks.insert(7.into(), (5, h256!("0x55").pack()).into())); + + let list = inflight_blocks.prune(2); + assert!(list.is_empty()); + + assert!(inflight_blocks.trace_number.is_empty()); + assert!(inflight_blocks.restart_number == 0); + + // When 2 + 512 number block request send out + inflight_blocks.mark_slow_block(2); + + assert_eq!( + inflight_blocks + .trace_number + .keys() + .cloned() + .collect::>(), + HashSet::from_iter(vec![ + (1, h256!("0x1").pack()).into(), + (2, h256!("0x2").pack()).into(), + (3, h256!("0x3").pack()).into(), + (3, h256!("0x33").pack()).into() + ]) + ); + + faketime::write_millis(&faketime_file, 2000).expect("write millis"); + + let list = inflight_blocks.prune(2); + assert!(list.is_empty()); + + assert!(inflight_blocks.restart_number == 3); + + assert!(inflight_blocks + .inflight_state_by_block(&(3, h256!("0x3").pack()).into()) + .is_none()); + assert!(inflight_blocks + .inflight_state_by_block(&(3, h256!("0x33").pack()).into()) + .is_none()); + + assert_eq!(inflight_blocks.peer_can_fetch_count(3.into()), 8); + assert_eq!(inflight_blocks.peer_can_fetch_count(4.into()), 8); +} diff --git a/sync/src/types.rs b/sync/src/types.rs index 0799468e4c..566985fb8e 100644 --- a/sync/src/types.rs +++ b/sync/src/types.rs @@ -1,9 +1,11 @@ use crate::block_status::BlockStatus; use crate::orphan_block_pool::OrphanBlockPool; use crate::BLOCK_DOWNLOAD_TIMEOUT; -use crate::MAX_PEERS_PER_BLOCK; use crate::{NetworkProtocol, SUSPEND_SYNC_TIME}; -use crate::{MAX_HEADERS_LEN, MAX_TIP_AGE, RETRY_ASK_TX_TIMEOUT_INCREASE}; +use crate::{ + FIRST_LEVEL_MAX, INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_BLOCKS_IN_TRANSIT_PER_PEER, + MAX_HEADERS_LEN, MAX_TIP_AGE, RETRY_ASK_TX_TIMEOUT_INCREASE, +}; use ckb_chain::chain::ChainController; use ckb_chain_spec::consensus::Consensus; use ckb_logger::{debug, debug_target, error, metric}; @@ -24,7 +26,7 @@ use failure::Error as FailureError; use faketime::unix_time_as_millis; use lru_cache::LruCache; use std::cmp; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{btree_map::Entry, BTreeMap, HashMap, HashSet}; use std::fmt; use std::hash::Hash; use std::mem; @@ -250,46 +252,153 @@ pub struct Peers { #[derive(Debug, Clone)] pub struct InflightState { - pub(crate) peers: HashSet, + pub(crate) peer: PeerIndex, pub(crate) timestamp: u64, } -impl Default for InflightState { - fn default() -> Self { - InflightState { - peers: HashSet::default(), +impl InflightState { + fn new(peer: PeerIndex) -> Self { + Self { + peer, timestamp: unix_time_as_millis(), } } } -impl InflightState { - pub fn remove(&mut self, peer: PeerIndex) { - self.peers.remove(&peer); +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct BlockNumberAndHash { + pub number: BlockNumber, + pub hash: Byte32, +} + +impl From<(BlockNumber, Byte32)> for BlockNumberAndHash { + fn from(inner: (BlockNumber, Byte32)) -> Self { + Self { + number: inner.0, + hash: inner.1, + } + } +} + +#[derive(Debug, Clone)] +pub struct DownloadScheduler { + task_count: usize, + timeout_count: usize, + breakthroughs_count: usize, + hashes: HashSet, +} + +impl Default for DownloadScheduler { + fn default() -> Self { + Self { + hashes: HashSet::default(), + task_count: INIT_BLOCKS_IN_TRANSIT_PER_PEER, + breakthroughs_count: 0, + timeout_count: 0, + } + } +} + +impl DownloadScheduler { + fn inflight_count(&self) -> usize { + self.hashes.len() + } + + fn can_fetch(&self) -> usize { + self.task_count.saturating_sub(self.hashes.len()) + } + + pub(crate) fn task_count(&self) -> usize { + self.task_count + } + + fn adjust(&mut self, time: u64, len: u64) { + let now = unix_time_as_millis(); + // 8 means default max outbound + // All synchronization tests are based on the assumption of 8 nodes. + // If the number of nodes is increased, the number of requests and processing time will increase, + // and the corresponding adjustment criteria are needed, so the adjustment is based on 8. + // + // note: This is an interim scenario and the next step is to consider the median response time + // within a certain interval as the adjustment criterion + let (quotient, remainder) = if len > 8 { (len >> 3, len % 8) } else { (1, 0) }; + // Dynamically adjust download tasks based on response time. + // + // Block max size about 700k, Under 10m/s bandwidth it may cost 1s to response + // But the mark is on the same time, so multiple tasks may be affected by one task, + // causing all responses to be in the range of reduced tasks. + // So the adjustment to reduce the number of tasks will be calculated by modulo 3 with the actual number of triggers. + // Adjust for each block received. + match ((now - time) / quotient).saturating_sub(remainder * 100) { + // Within 500ms of response, considered better to communicate with that node's network + 0..=500 => { + if self.task_count < FIRST_LEVEL_MAX - 1 { + self.task_count += 2 + } else { + self.breakthroughs_count += 1 + } + } + // Within 1000ms of response time, the network is relatively good and communication should be maintained + 501..=1000 => { + if self.task_count < FIRST_LEVEL_MAX { + self.task_count += 1 + } else { + self.breakthroughs_count += 1 + } + } + // Response time within 1500ms, acceptable range, but not relatively good for nodes and do not adjust + 1001..=1500 => (), + // Above 1500ms, relatively poor network, reduced + _ => { + self.timeout_count += 1; + if self.timeout_count > 3 { + self.task_count = self.task_count.saturating_sub(1); + self.timeout_count = 0; + } + } + } + if self.breakthroughs_count > 4 { + if self.task_count < MAX_BLOCKS_IN_TRANSIT_PER_PEER { + self.task_count += 1; + } + self.breakthroughs_count = 0; + } + } + + fn punish(&mut self) { + self.task_count >>= 1 } } #[derive(Clone)] pub struct InflightBlocks { - blocks: HashMap>, - states: HashMap, + pub(crate) download_schedulers: HashMap, + inflight_states: BTreeMap, + pub(crate) trace_number: HashMap, + compact_reconstruct_inflight: HashMap>, + pub(crate) restart_number: BlockNumber, + pub(crate) adjustment: bool, } impl Default for InflightBlocks { fn default() -> Self { InflightBlocks { - blocks: HashMap::default(), - states: HashMap::default(), + download_schedulers: HashMap::default(), + inflight_states: BTreeMap::default(), + trace_number: HashMap::default(), + compact_reconstruct_inflight: HashMap::default(), + restart_number: 0, + adjustment: true, } } } -struct DebugHashSet<'a>(&'a HashSet); +struct DebugHashSet<'a>(&'a HashSet); impl<'a> fmt::Debug for DebugHashSet<'a> { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_set() - .entries(self.0.iter().map(|h| format!("{}", h))) + .entries(self.0.iter().map(|h| format!("{}", h.hash))) .finish() } } @@ -297,47 +406,164 @@ impl<'a> fmt::Debug for DebugHashSet<'a> { impl fmt::Debug for InflightBlocks { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_map() - .entries(self.blocks.iter().map(|(k, v)| (k, DebugHashSet(v)))) + .entries( + self.download_schedulers + .iter() + .map(|(k, v)| (k, DebugHashSet(&v.hashes))), + ) .finish()?; fmt.debug_map() - .entries(self.states.iter().map(|(k, v)| (format!("{}", k), v))) + .entries( + self.inflight_states + .iter() + .map(|(k, v)| (format!("{}", k.hash), v)), + ) .finish() } } impl InflightBlocks { - pub fn blocks_iter(&self) -> impl Iterator)> { - self.blocks.iter() + pub fn blocks_iter(&self) -> impl Iterator)> { + self.download_schedulers.iter().map(|(k, v)| (k, &v.hashes)) } pub fn total_inflight_count(&self) -> usize { - self.states.len() + self.inflight_states.len() } pub fn peer_inflight_count(&self, peer: PeerIndex) -> usize { - self.blocks.get(&peer).map(HashSet::len).unwrap_or(0) + self.download_schedulers + .get(&peer) + .map(DownloadScheduler::inflight_count) + .unwrap_or(0) + } + + pub fn peer_can_fetch_count(&self, peer: PeerIndex) -> usize { + self.download_schedulers.get(&peer).map_or( + INIT_BLOCKS_IN_TRANSIT_PER_PEER, + DownloadScheduler::can_fetch, + ) + } + + pub fn inflight_block_by_peer(&self, peer: PeerIndex) -> Option<&HashSet> { + self.download_schedulers.get(&peer).map(|d| &d.hashes) + } + + pub fn inflight_state_by_block(&self, block: &BlockNumberAndHash) -> Option<&InflightState> { + self.inflight_states.get(block) + } + + pub fn compact_reconstruct(&mut self, peer: PeerIndex, hash: Byte32) -> bool { + let entry = self.compact_reconstruct_inflight.entry(hash).or_default(); + if entry.len() >= 2 { + return false; + } + + entry.insert(peer) } - pub fn inflight_block_by_peer(&self, peer: PeerIndex) -> Option<&HashSet> { - self.blocks.get(&peer) + + pub fn remove_compact(&mut self, peer: PeerIndex, hash: &Byte32) { + self.compact_reconstruct_inflight + .get_mut(&hash) + .map(|peers| peers.remove(&peer)); + self.trace_number.retain(|k, _| &k.hash != hash) } - pub fn inflight_state_by_block(&self, block: &Byte32) -> Option<&InflightState> { - self.states.get(block) + pub fn inflight_compact_by_block(&self, hash: &Byte32) -> Option<&HashSet> { + self.compact_reconstruct_inflight.get(hash) } - pub fn prune(&mut self) { + pub fn mark_slow_block(&mut self, tip: BlockNumber) { + let now = faketime::unix_time_as_millis(); + for key in self.inflight_states.keys() { + if key.number > tip + 1 { + break; + } + self.trace_number.entry(key.clone()).or_insert(now); + } + } + + pub fn prune(&mut self, tip: BlockNumber) -> HashSet { let now = unix_time_as_millis(); let prev_count = self.total_inflight_count(); - let blocks = &mut self.blocks; - self.states.retain(|k, v| { - let outdate = (v.timestamp + BLOCK_DOWNLOAD_TIMEOUT) < now; - if outdate { - for peer in &v.peers { - blocks.get_mut(peer).map(|set| set.remove(k)); + let mut disconnect_list = HashSet::new(); + + let trace = &mut self.trace_number; + let download_schedulers = &mut self.download_schedulers; + let states = &mut self.inflight_states; + let compact_inflight = &mut self.compact_reconstruct_inflight; + + let mut remove_key = Vec::new(); + // Since this is a btreemap, with the data already sorted, + // we don't have to worry about missing points, and we don't need to + // iterate through all the data each time, just check within tip + 20, + // with the checkpoint marking possible blocking points, it's enough + let end = tip + 20; + for (key, value) in states.iter() { + if key.number > end { + break; + } + if value.timestamp + BLOCK_DOWNLOAD_TIMEOUT < now { + download_schedulers + .get_mut(&value.peer) + .map(|set| set.hashes.remove(key)); + if !trace.is_empty() { + trace.remove(&key); + } + disconnect_list.insert(value.peer); + remove_key.push(key.clone()); + } + } + + for key in remove_key { + states.remove(&key); + } + + download_schedulers.retain(|k, v| { + // task number zero means this peer's response is very slow + if v.task_count == 0 { + disconnect_list.insert(*k); + false + } else { + true + } + }); + + if self.restart_number != 0 && tip + 1 > self.restart_number { + self.restart_number = 0; + } + + let restart_number = &mut self.restart_number; + trace.retain(|key, time| { + // In the normal state, trace will always empty + // + // When the inflight request reaches the checkpoint(inflight > tip + 512), + // it means that there is an anomaly in the sync less than tip + 1, i.e. some nodes are stuck, + // at which point it will be recorded as the timestamp at that time. + // + // If the time exceeds 1s, delete the task and halve the number of + // executable tasks for the corresponding node + if now > 1000 + *time { + if let Some(state) = states.remove(key) { + if let Some(d) = download_schedulers.get_mut(&state.peer) { + d.punish(); + d.hashes.remove(key); + }; + } else if let Some(v) = compact_inflight.remove(&key.hash) { + for peer in v { + if let Some(d) = download_schedulers.get_mut(&peer) { + d.punish(); + } + } + } + if key.number > *restart_number { + *restart_number = key.number; } + return false; } - !outdate + true }); + if prev_count == 0 { metric!({ "topic": "blocks_in_flight", @@ -349,45 +575,84 @@ impl InflightBlocks { "fields": { "total": self.total_inflight_count(), "elapsed": BLOCK_DOWNLOAD_TIMEOUT } }); } + + disconnect_list } - pub fn insert(&mut self, peer: PeerIndex, hash: Byte32) -> bool { - let state = self - .states - .entry(hash.clone()) - .or_insert_with(InflightState::default); - if state.peers.len() >= MAX_PEERS_PER_BLOCK { + pub fn insert(&mut self, peer: PeerIndex, block: BlockNumberAndHash) -> bool { + if !self.compact_reconstruct_inflight.is_empty() + && self.compact_reconstruct_inflight.contains_key(&block.hash) + { + // Give the compact block a deadline of 1.5 seconds + self.trace_number + .entry(block) + .or_insert(unix_time_as_millis() + 500); return false; } + let state = self.inflight_states.entry(block.clone()); + match state { + Entry::Occupied(_entry) => return false, + Entry::Vacant(entry) => entry.insert(InflightState::new(peer)), + }; - let blocks = self.blocks.entry(peer).or_insert_with(HashSet::default); - let ret = blocks.insert(hash); - if ret { - state.peers.insert(peer); + if self.restart_number >= block.number { + // All new requests smaller than restart_number mean that they are cleaned up and + // cannot be immediately marked as cleaned up again, so give it a normal response time of 1.5s. + // (timeout check is 1s, plus 0.5s given in advance) + self.trace_number + .insert(block.clone(), unix_time_as_millis() + 500); } - ret + + let download_scheduler = self + .download_schedulers + .entry(peer) + .or_insert_with(DownloadScheduler::default); + download_scheduler.hashes.insert(block) } pub fn remove_by_peer(&mut self, peer: PeerIndex) -> bool { - self.blocks + let trace = &mut self.trace_number; + let state = &mut self.inflight_states; + let compact = &mut self.compact_reconstruct_inflight; + self.download_schedulers .remove(&peer) .map(|blocks| { - for block in blocks { - if let Some(state) = self.states.get_mut(&block) { - state.remove(peer) + for block in blocks.hashes { + if !compact.is_empty() { + compact + .get_mut(&block.hash) + .map(|peers| peers.remove(&peer)); + } + state.remove(&block); + if !trace.is_empty() { + trace.remove(&block); } } }) .is_some() } - pub fn remove_by_block(&mut self, block: Byte32) -> bool { - self.states + pub fn remove_by_block(&mut self, block: BlockNumberAndHash) -> bool { + let download_schedulers = &mut self.download_schedulers; + let trace = &mut self.trace_number; + let compact = &mut self.compact_reconstruct_inflight; + let len = download_schedulers.len() as u64; + let adjustment = self.adjustment; + self.inflight_states .remove(&block) .map(|state| { - for peer in state.peers { - self.blocks.get_mut(&peer).map(|set| set.remove(&block)); - } + if let Some(set) = download_schedulers.get_mut(&state.peer) { + set.hashes.remove(&block); + if !compact.is_empty() { + compact.remove(&block.hash); + } + if adjustment { + set.adjust(state.timestamp, len); + } + if !trace.is_empty() { + trace.remove(&block); + } + }; state.timestamp }) .map(|timestamp| { @@ -475,6 +740,25 @@ impl Peers { }) } + pub fn get_best_known_less_than_tip_and_unknown_empty( + &self, + tip: BlockNumber, + ) -> Vec { + self.state + .read() + .iter() + .filter_map(|(pi, state)| { + if !state.unknown_header_list.is_empty() { + return None; + } + match state.best_known_header { + Some(ref header) if header.number() < tip => Some(*pi), + _ => None, + } + }) + .collect() + } + pub fn take_unknown_last(&self, peer: PeerIndex) -> Option { self.state .write() @@ -1025,7 +1309,8 @@ impl SyncState { // Return true when the block is that we have requested and received first time. pub fn new_block_received(&self, block: &core::BlockView) -> bool { - self.write_inflight_blocks().remove_by_block(block.hash()) + self.write_inflight_blocks() + .remove_by_block((block.number(), block.hash()).into()) } pub fn insert_inflight_proposals(&self, ids: Vec) -> Vec { diff --git a/test/src/specs/relay/compact_block.rs b/test/src/specs/relay/compact_block.rs index fc65a7f6a0..ba57f2e6ac 100644 --- a/test/src/specs/relay/compact_block.rs +++ b/test/src/specs/relay/compact_block.rs @@ -334,8 +334,6 @@ impl Spec for CompactBlockRelayParentOfOrphanBlock { fn run(&self, net: &mut Net) { let node = &net.nodes[0]; net.exit_ibd_mode(); - net.connect(node); - let (peer_id, _, _) = net.receive(); node.generate_blocks((DEFAULT_TX_PROPOSAL_WINDOW.1 + 2) as usize); // Proposal a tx, and grow up into proposal window @@ -432,13 +430,14 @@ impl Spec for CompactBlockRelayParentOfOrphanBlock { .build(); let old_tip = node.get_tip_block().header().number(); + net.connect(node); + let (peer_id, _, _) = net.receive(); + net.send( NetworkProtocol::RELAY.into(), peer_id, build_compact_block(&parent), ); - // pending for GetBlockTransactions - clear_messages(&net); net.send( NetworkProtocol::SYNC.into(), @@ -450,15 +449,16 @@ impl Spec for CompactBlockRelayParentOfOrphanBlock { peer_id, build_header(&block.header()), ); - clear_messages(&net); - net.send(NetworkProtocol::SYNC.into(), peer_id, build_block(&block)); net.send( NetworkProtocol::RELAY.into(), peer_id, build_block_transactions(&parent), ); + clear_messages(&net); + net.send(NetworkProtocol::SYNC.into(), peer_id, build_block(&block)); + let ret = wait_until(20, move || { node.get_tip_block().header().number() == old_tip + 2 }); diff --git a/test/src/specs/sync/block_sync.rs b/test/src/specs/sync/block_sync.rs index a9675a59d7..89e74fa2f2 100644 --- a/test/src/specs/sync/block_sync.rs +++ b/test/src/specs/sync/block_sync.rs @@ -1,5 +1,5 @@ use crate::utils::{ - build_block, build_get_blocks, build_header, new_block_with_template, sleep, wait_until, + build_block, build_get_blocks, build_header, new_block_with_template, wait_until, }; use crate::{Net, Node, Spec, TestProtocol}; use ckb_jsonrpc_types::ChainInfo; @@ -254,12 +254,6 @@ impl Spec for BlockSyncOrphanBlocks { let node0 = &net.nodes[0]; let node1 = &net.nodes[1]; net.exit_ibd_mode(); - net.connect(node0); - let (peer_id, _, _) = net - .receive_timeout(Duration::new(10, 0)) - .expect("net receive timeout"); - let rpc_client = node0.rpc_client(); - let tip_number = rpc_client.get_tip_block_number(); // Generate some blocks from node1 let mut blocks: Vec = (1..=5) @@ -270,13 +264,22 @@ impl Spec for BlockSyncOrphanBlocks { }) .collect(); + net.connect(node0); + let (peer_id, _, _) = net + .receive_timeout(Duration::new(10, 0)) + .expect("net receive timeout"); + let rpc_client = node0.rpc_client(); + let tip_number = rpc_client.get_tip_block_number(); + // Send headers to node0, keep blocks body blocks.iter().for_each(|block| { sync_header(&net, peer_id, block); }); // Wait for block fetch timer - sleep(5); + let (_, _, _) = net + .receive_timeout(Duration::new(10, 0)) + .expect("net receive timeout"); // Skip the next block, send the rest blocks to node0 let first = blocks.remove(0); diff --git a/test/src/specs/sync/get_blocks.rs b/test/src/specs/sync/get_blocks.rs index 59823004d5..01f91c0146 100644 --- a/test/src/specs/sync/get_blocks.rs +++ b/test/src/specs/sync/get_blocks.rs @@ -1,5 +1,5 @@ use super::utils::wait_get_blocks; -use crate::utils::build_headers; +use crate::utils::{build_headers, wait_until}; use crate::{Net, Spec, TestProtocol}; use ckb_sync::{NetworkProtocol, BLOCK_DOWNLOAD_TIMEOUT}; use ckb_types::core::HeaderView; @@ -37,10 +37,19 @@ impl Spec for GetBlocksTimeout { let (first, received) = wait_get_blocks_point(block_download_timeout_secs * 2, &net); assert!(received, "Should received GetBlocks"); let (second, received) = wait_get_blocks_point(block_download_timeout_secs * 2, &net); - assert!(received, "Should received GetBlocks"); + assert!(!received, "Should not received GetBlocks"); let elapsed = second.duration_since(first).as_secs(); let error_margin = 2; assert!(elapsed >= block_download_timeout_secs - error_margin); + + let rpc_client = node1.rpc_client(); + let result = wait_until(10, || { + let peers = rpc_client.get_peers(); + peers.is_empty() + }); + if !result { + panic!("node1 must disconnect net"); + } } }