Skip to content

Commit

Permalink
chore: Give the compact block a deadline
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Apr 21, 2020
1 parent 958e90a commit 0768283
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 17 deletions.
63 changes: 57 additions & 6 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ impl Synchronizer {
}
}

if !self.can_fetch_block.load(Ordering::Acquire) {
if ibd.into() && !self.can_fetch_block.load(Ordering::Acquire) {
return;
}

Expand Down Expand Up @@ -493,11 +493,11 @@ impl Synchronizer {
trace!("poll find_blocks_to_fetch select peers");
// fetch use a lot of cpu time, especially in ibd state
// so in ibd, the fetch function use another thread
match nc.p2p_control() {
Some(raw) if ibd.into() => match self.fetch_channel {
match ibd {
IBDState::In => match self.fetch_channel {
Some(ref sender) => sender.send(FetchCMD::Fetch(peers)).unwrap(),
None => {
let p2p_control = raw.clone();
None if nc.p2p_control().is_some() => {
let p2p_control = nc.p2p_control().unwrap().clone();
let sync = self.clone();
let can_fetch_block = Arc::clone(&self.can_fetch_block);
let (sender, recv) = crossbeam_channel::bounded(2);
Expand All @@ -513,8 +513,17 @@ impl Synchronizer {
.run();
});
}
_ => {
for peer in peers {
if let Some(fetch) = self.get_blocks_to_fetch(peer, ibd) {
for item in fetch {
self.send_getblocks(item, nc, peer);
}
}
}
}
},
_ => {
IBDState::Out => {
if let Some(sender) = self.fetch_channel.take() {
sender.send(FetchCMD::Shutdown).unwrap();
}
Expand All @@ -528,6 +537,41 @@ impl Synchronizer {
}
}
}
// match nc.p2p_control() {
// Some(raw) if ibd.into() => match self.fetch_channel {
// Some(ref sender) => sender.send(FetchCMD::Fetch(peers)).unwrap(),
// None => {
// let p2p_control = raw.clone();
// let sync = self.clone();
// let can_fetch_block = Arc::clone(&self.can_fetch_block);
// let (sender, recv) = crossbeam_channel::bounded(2);
// sender.send(FetchCMD::Fetch(peers)).unwrap();
// self.fetch_channel = Some(sender);
// ::std::thread::spawn(move || {
// BlockFetchCMD {
// sync,
// p2p_control,
// recv,
// can_fetch_block,
// }
// .run();
// });
// }
// },
// _ => {
// if let Some(sender) = self.fetch_channel.take() {
// sender.send(FetchCMD::Shutdown).unwrap();
// }
//
// for peer in peers {
// if let Some(fetch) = self.get_blocks_to_fetch(peer, ibd) {
// for item in fetch {
// self.send_getblocks(item, nc, peer);
// }
// }
// }
// }
// }
}

fn send_getblocks(
Expand Down Expand Up @@ -668,6 +712,13 @@ impl CKBProtocolHandler for Synchronizer {
if self.shared.active_chain().is_initial_block_download() {
self.find_blocks_to_fetch(nc.as_ref(), IBDState::In);
} else {
{
let adjustment =
&mut self.shared.state().write_inflight_blocks().adjustment;
if *adjustment {
*adjustment = false;
}
}
self.shared.state().peers().clear_unknown_list();
if nc.remove_notify(IBD_BLOCK_FETCH_TOKEN).is_err() {
trace!("remove ibd block fetch fail");
Expand Down
36 changes: 31 additions & 5 deletions sync/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ pub struct InflightBlocks {
pub(crate) trace_number: HashMap<BlockNumberAndHash, u64>,
compact_reconstruct_inflight: HashMap<Byte32, HashSet<PeerIndex>>,
pub(crate) restart_number: BlockNumber,
pub(crate) adjustment: bool,
}

impl Default for InflightBlocks {
Expand All @@ -387,6 +388,7 @@ impl Default for InflightBlocks {
trace_number: HashMap::default(),
compact_reconstruct_inflight: HashMap::default(),
restart_number: 0,
adjustment: true,
}
}
}
Expand Down Expand Up @@ -464,6 +466,7 @@ impl InflightBlocks {
self.compact_reconstruct_inflight
.get_mut(&hash)
.map(|peers| peers.remove(&peer));
self.trace_number.retain(|k, _| &k.hash != hash)
}

pub fn inflight_compact_by_block(&self, hash: &Byte32) -> Option<&HashSet<PeerIndex>> {
Expand All @@ -488,6 +491,7 @@ impl InflightBlocks {
let trace = &mut self.trace_number;
let download_schedulers = &mut self.download_schedulers;
let states = &mut self.inflight_states;
let compact_inflight = &mut self.compact_reconstruct_inflight;

let mut remove_key = Vec::new();
// Since this is a btreemap, with the data already sorted,
Expand Down Expand Up @@ -545,6 +549,12 @@ impl InflightBlocks {
d.punish();
d.hashes.remove(key);
};
} else if let Some(v) = compact_inflight.remove(&key.hash) {
for peer in v {
if let Some(d) = download_schedulers.get_mut(&peer) {
d.punish();
}
}
}
if key.number > *restart_number {
*restart_number = key.number;
Expand All @@ -570,6 +580,15 @@ impl InflightBlocks {
}

pub fn insert(&mut self, peer: PeerIndex, block: BlockNumberAndHash) -> bool {
if !self.compact_reconstruct_inflight.is_empty()
&& self.compact_reconstruct_inflight.contains_key(&block.hash)
{
// Give the compact block a deadline of 1.5 seconds
self.trace_number
.entry(block)
.or_insert(unix_time_as_millis() + 500);
return false;
}
let state = self.inflight_states.entry(block.clone());
match state {
Entry::Occupied(_entry) => return false,
Expand Down Expand Up @@ -599,9 +618,11 @@ impl InflightBlocks {
.remove(&peer)
.map(|blocks| {
for block in blocks.hashes {
compact
.get_mut(&block.hash)
.map(|peers| peers.remove(&peer));
if !compact.is_empty() {
compact
.get_mut(&block.hash)
.map(|peers| peers.remove(&peer));
}
state.remove(&block);
if !trace.is_empty() {
trace.remove(&block);
Expand All @@ -616,13 +637,18 @@ impl InflightBlocks {
let trace = &mut self.trace_number;
let compact = &mut self.compact_reconstruct_inflight;
let len = download_schedulers.len() as u64;
let adjustment = self.adjustment;
self.inflight_states
.remove(&block)
.map(|state| {
if let Some(set) = download_schedulers.get_mut(&state.peer) {
set.hashes.remove(&block);
compact.remove(&block.hash);
set.adjust(state.timestamp, len);
if !compact.is_empty() {
compact.remove(&block.hash);
}
if adjustment {
set.adjust(state.timestamp, len);
}
if !trace.is_empty() {
trace.remove(&block);
}
Expand Down
12 changes: 6 additions & 6 deletions test/src/specs/relay/compact_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,6 @@ impl Spec for CompactBlockRelayParentOfOrphanBlock {
fn run(&self, net: &mut Net) {
let node = &net.nodes[0];
net.exit_ibd_mode();
net.connect(node);
let (peer_id, _, _) = net.receive();

node.generate_blocks((DEFAULT_TX_PROPOSAL_WINDOW.1 + 2) as usize);
// Proposal a tx, and grow up into proposal window
Expand Down Expand Up @@ -432,13 +430,14 @@ impl Spec for CompactBlockRelayParentOfOrphanBlock {
.build();
let old_tip = node.get_tip_block().header().number();

net.connect(node);
let (peer_id, _, _) = net.receive();

net.send(
NetworkProtocol::RELAY.into(),
peer_id,
build_compact_block(&parent),
);
// pending for GetBlockTransactions
clear_messages(&net);

net.send(
NetworkProtocol::SYNC.into(),
Expand All @@ -450,15 +449,16 @@ impl Spec for CompactBlockRelayParentOfOrphanBlock {
peer_id,
build_header(&block.header()),
);
clear_messages(&net);

net.send(NetworkProtocol::SYNC.into(), peer_id, build_block(&block));
net.send(
NetworkProtocol::RELAY.into(),
peer_id,
build_block_transactions(&parent),
);

clear_messages(&net);
net.send(NetworkProtocol::SYNC.into(), peer_id, build_block(&block));

let ret = wait_until(20, move || {
node.get_tip_block().header().number() == old_tip + 2
});
Expand Down

0 comments on commit 0768283

Please sign in to comment.