Skip to content

Commit

Permalink
chore: disable adjust on no ibd state
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Apr 21, 2020
1 parent 4caf0d0 commit faf983a
Showing 1 changed file with 10 additions and 62 deletions.
72 changes: 10 additions & 62 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200);

enum FetchCMD {
Fetch(Vec<PeerIndex>),
Shutdown,
}

struct BlockFetchCMD {
Expand All @@ -70,7 +69,6 @@ impl BlockFetchCMD {
}
self.can_fetch_block.store(true, Ordering::Release)
}
FetchCMD::Shutdown => break,
}
}
}
Expand Down Expand Up @@ -492,12 +490,14 @@ impl Synchronizer {

trace!("poll find_blocks_to_fetch select peers");
// fetch use a lot of cpu time, especially in ibd state
// so in ibd, the fetch function use another thread
match ibd {
IBDState::In => match self.fetch_channel {
Some(ref sender) => sender.send(FetchCMD::Fetch(peers)).unwrap(),
None if nc.p2p_control().is_some() => {
let p2p_control = nc.p2p_control().unwrap().clone();
// so, the fetch function use another thread
match nc.p2p_control() {
Some(raw) => match self.fetch_channel {
Some(ref sender) => {
let _ = sender.try_send(FetchCMD::Fetch(peers));
}
None => {
let p2p_control = raw.clone();
let sync = self.clone();
let can_fetch_block = Arc::clone(&self.can_fetch_block);
let (sender, recv) = crossbeam_channel::bounded(2);
Expand All @@ -513,21 +513,8 @@ impl Synchronizer {
.run();
});
}
_ => {
for peer in peers {
if let Some(fetch) = self.get_blocks_to_fetch(peer, ibd) {
for item in fetch {
self.send_getblocks(item, nc, peer);
}
}
}
}
},
IBDState::Out => {
if let Some(sender) = self.fetch_channel.take() {
sender.send(FetchCMD::Shutdown).unwrap();
}

_ => {
for peer in peers {
if let Some(fetch) = self.get_blocks_to_fetch(peer, ibd) {
for item in fetch {
Expand All @@ -537,41 +524,6 @@ impl Synchronizer {
}
}
}
// match nc.p2p_control() {
// Some(raw) if ibd.into() => match self.fetch_channel {
// Some(ref sender) => sender.send(FetchCMD::Fetch(peers)).unwrap(),
// None => {
// let p2p_control = raw.clone();
// let sync = self.clone();
// let can_fetch_block = Arc::clone(&self.can_fetch_block);
// let (sender, recv) = crossbeam_channel::bounded(2);
// sender.send(FetchCMD::Fetch(peers)).unwrap();
// self.fetch_channel = Some(sender);
// ::std::thread::spawn(move || {
// BlockFetchCMD {
// sync,
// p2p_control,
// recv,
// can_fetch_block,
// }
// .run();
// });
// }
// },
// _ => {
// if let Some(sender) = self.fetch_channel.take() {
// sender.send(FetchCMD::Shutdown).unwrap();
// }
//
// for peer in peers {
// if let Some(fetch) = self.get_blocks_to_fetch(peer, ibd) {
// for item in fetch {
// self.send_getblocks(item, nc, peer);
// }
// }
// }
// }
// }
}

fn send_getblocks(
Expand Down Expand Up @@ -713,11 +665,7 @@ impl CKBProtocolHandler for Synchronizer {
self.find_blocks_to_fetch(nc.as_ref(), IBDState::In);
} else {
{
let adjustment =
&mut self.shared.state().write_inflight_blocks().adjustment;
if *adjustment {
*adjustment = false;
}
self.shared.state().write_inflight_blocks().adjustment = false;
}
self.shared.state().peers().clear_unknown_list();
if nc.remove_notify(IBD_BLOCK_FETCH_TOKEN).is_err() {
Expand Down

0 comments on commit faf983a

Please sign in to comment.