diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index 965daa36026..b4c6de9dbc3 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -43,7 +43,6 @@ const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200); enum FetchCMD { Fetch(Vec), - Shutdown, } struct BlockFetchCMD { @@ -70,7 +69,6 @@ impl BlockFetchCMD { } self.can_fetch_block.store(true, Ordering::Release) } - FetchCMD::Shutdown => break, } } } @@ -492,12 +490,12 @@ impl Synchronizer { trace!("poll find_blocks_to_fetch select peers"); // fetch use a lot of cpu time, especially in ibd state - // so in ibd, the fetch function use another thread - match ibd { - IBDState::In => match self.fetch_channel { + // so, the fetch function use another thread + match nc.p2p_control() { + Some(raw) => match self.fetch_channel { Some(ref sender) => sender.send(FetchCMD::Fetch(peers)).unwrap(), - None if nc.p2p_control().is_some() => { - let p2p_control = nc.p2p_control().unwrap().clone(); + None => { + let p2p_control = raw.clone(); let sync = self.clone(); let can_fetch_block = Arc::clone(&self.can_fetch_block); let (sender, recv) = crossbeam_channel::bounded(2); @@ -513,21 +511,8 @@ impl Synchronizer { .run(); }); } - _ => { - for peer in peers { - if let Some(fetch) = self.get_blocks_to_fetch(peer, ibd) { - for item in fetch { - self.send_getblocks(item, nc, peer); - } - } - } - } }, - IBDState::Out => { - if let Some(sender) = self.fetch_channel.take() { - sender.send(FetchCMD::Shutdown).unwrap(); - } - + _ => { for peer in peers { if let Some(fetch) = self.get_blocks_to_fetch(peer, ibd) { for item in fetch { @@ -537,41 +522,6 @@ impl Synchronizer { } } } - // match nc.p2p_control() { - // Some(raw) if ibd.into() => match self.fetch_channel { - // Some(ref sender) => sender.send(FetchCMD::Fetch(peers)).unwrap(), - // None => { - // let p2p_control = raw.clone(); - // let sync = self.clone(); - // let can_fetch_block = Arc::clone(&self.can_fetch_block); - // let (sender, recv) = crossbeam_channel::bounded(2); - // sender.send(FetchCMD::Fetch(peers)).unwrap(); - // self.fetch_channel = Some(sender); - // ::std::thread::spawn(move || { - // BlockFetchCMD { - // sync, - // p2p_control, - // recv, - // can_fetch_block, - // } - // .run(); - // }); - // } - // }, - // _ => { - // if let Some(sender) = self.fetch_channel.take() { - // sender.send(FetchCMD::Shutdown).unwrap(); - // } - // - // for peer in peers { - // if let Some(fetch) = self.get_blocks_to_fetch(peer, ibd) { - // for item in fetch { - // self.send_getblocks(item, nc, peer); - // } - // } - // } - // } - // } } fn send_getblocks( @@ -713,11 +663,7 @@ impl CKBProtocolHandler for Synchronizer { self.find_blocks_to_fetch(nc.as_ref(), IBDState::In); } else { { - let adjustment = - &mut self.shared.state().write_inflight_blocks().adjustment; - if *adjustment { - *adjustment = false; - } + self.shared.state().write_inflight_blocks().adjustment = false; } self.shared.state().peers().clear_unknown_list(); if nc.remove_notify(IBD_BLOCK_FETCH_TOKEN).is_err() {