Skip to content

Commit

Permalink
removes the nested for loop from retransmit-stage
Browse files Browse the repository at this point in the history
The code can be simplified by just flattening the vector of packets.
  • Loading branch information
behzadnouri committed May 21, 2021
1 parent 71de021 commit ff0e623
Showing 1 changed file with 86 additions and 88 deletions.
174 changes: 86 additions & 88 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,10 @@ fn retransmit(
let packets = r_lock.recv_timeout(RECV_TIMEOUT)?;
let mut timer_start = Measure::start("retransmit");
let mut total_packets = packets.packets.len();
let mut packet_v = vec![packets];
let mut packets = vec![packets];
while let Ok(nq) = r_lock.try_recv() {
total_packets += nq.packets.len();
packet_v.push(nq);
packets.push(nq);
if total_packets >= MAX_PACKET_BATCH_SIZE {
break;
}
Expand Down Expand Up @@ -382,95 +382,93 @@ fn retransmit(
let mut packets_by_slot: HashMap<Slot, usize> = HashMap::new();
let mut packets_by_source: HashMap<String, usize> = HashMap::new();
let mut max_slot = 0;
for packets in packet_v {
for packet in packets.packets.iter() {
// skip discarded packets and repair packets
if packet.meta.discard {
total_packets -= 1;
discard_total += 1;
continue;
}
if packet.meta.repair {
total_packets -= 1;
repair_total += 1;
continue;
}
let shred_slot = match check_if_already_received(packet, shreds_received) {
Some(slot) => slot,
None => continue,
};
max_slot = max_slot.max(shred_slot);

if let Some(rpc_subscriptions) = rpc_subscriptions {
if check_if_first_shred_received(shred_slot, first_shreds_received, &root_bank) {
rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived {
slot: shred_slot,
timestamp: timestamp(),
});
}
for packet in packets.iter().flat_map(|p| p.packets.iter()) {
// skip discarded packets and repair packets
if packet.meta.discard {
total_packets -= 1;
discard_total += 1;
continue;
}
if packet.meta.repair {
total_packets -= 1;
repair_total += 1;
continue;
}
let shred_slot = match check_if_already_received(packet, shreds_received) {
Some(slot) => slot,
None => continue,
};
max_slot = max_slot.max(shred_slot);

if let Some(rpc_subscriptions) = rpc_subscriptions {
if check_if_first_shred_received(shred_slot, first_shreds_received, &root_bank) {
rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived {
slot: shred_slot,
timestamp: timestamp(),
});
}
}

let mut compute_turbine_peers = Measure::start("turbine_start");
let stakes_and_index = get_retransmit_peers(
my_id,
shred_slot,
leader_schedule_cache,
r_bank.deref(),
r_epoch_stakes_cache.deref(),
);
let (my_index, shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
&my_id,
&r_epoch_stakes_cache.peers,
&stakes_and_index,
packet.meta.seed,
);
// If the node is on the critical path (i.e. the first node in each
// neighborhood), then we expect that the packet arrives at tvu
// socket as opposed to tvu-forwards. If this is not the case, then
// the turbine broadcast/retransmit tree mismatch across nodes.
if packet.meta.forward == (my_index % DATA_PLANE_FANOUT == 0) {
retransmit_tree_mismatch += 1;
}
peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len());
// split off the indexes, we don't need the stakes anymore
let indexes: Vec<_> = shuffled_stakes_and_index
.into_iter()
.map(|(_, index)| index)
.collect();

let (neighbors, children) =
compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, &indexes);
let neighbors: Vec<_> = neighbors
.into_iter()
.filter_map(|index| {
let peer = &r_epoch_stakes_cache.peers[index];
if peer.id == my_id {
None
} else {
Some(peer)
}
})
.collect();
let children: Vec<_> = children
.into_iter()
.map(|index| &r_epoch_stakes_cache.peers[index])
.collect();
compute_turbine_peers.stop();
compute_turbine_peers_total += compute_turbine_peers.as_us();

*packets_by_slot.entry(packet.meta.slot).or_insert(0) += 1;
*packets_by_source
.entry(packet.meta.addr().to_string())
.or_insert(0) += 1;

let mut retransmit_time = Measure::start("retransmit_to");
if !packet.meta.forward {
ClusterInfo::retransmit_to(&neighbors, packet, sock, true)?;
}
ClusterInfo::retransmit_to(&children, packet, sock, packet.meta.forward)?;
retransmit_time.stop();
retransmit_total += retransmit_time.as_us();
let mut compute_turbine_peers = Measure::start("turbine_start");
let stakes_and_index = get_retransmit_peers(
my_id,
shred_slot,
leader_schedule_cache,
r_bank.deref(),
r_epoch_stakes_cache.deref(),
);
let (my_index, shuffled_stakes_and_index) = ClusterInfo::shuffle_peers_and_index(
&my_id,
&r_epoch_stakes_cache.peers,
&stakes_and_index,
packet.meta.seed,
);
// If the node is on the critical path (i.e. the first node in each
// neighborhood), then we expect that the packet arrives at tvu socket
// as opposed to tvu-forwards. If this is not the case, then the
// turbine broadcast/retransmit tree is mismatched across nodes.
if packet.meta.forward == (my_index % DATA_PLANE_FANOUT == 0) {
retransmit_tree_mismatch += 1;
}
peers_len = cmp::max(peers_len, shuffled_stakes_and_index.len());
// split off the indexes, we don't need the stakes anymore
let indexes: Vec<_> = shuffled_stakes_and_index
.into_iter()
.map(|(_, index)| index)
.collect();
debug_assert_eq!(my_id, r_epoch_stakes_cache.peers[indexes[my_index]].id);

let (neighbors, children) = compute_retransmit_peers(DATA_PLANE_FANOUT, my_index, &indexes);
let neighbors: Vec<_> = neighbors
.into_iter()
.filter_map(|index| {
let peer = &r_epoch_stakes_cache.peers[index];
if peer.id == my_id {
None
} else {
Some(peer)
}
})
.collect();
let children: Vec<_> = children
.into_iter()
.map(|index| &r_epoch_stakes_cache.peers[index])
.collect();
compute_turbine_peers.stop();
compute_turbine_peers_total += compute_turbine_peers.as_us();

*packets_by_slot.entry(packet.meta.slot).or_default() += 1;
*packets_by_source
.entry(packet.meta.addr().to_string())
.or_default() += 1;

let mut retransmit_time = Measure::start("retransmit_to");
if !packet.meta.forward {
ClusterInfo::retransmit_to(&neighbors, packet, sock, true)?;
}
ClusterInfo::retransmit_to(&children, packet, sock, packet.meta.forward)?;
retransmit_time.stop();
retransmit_total += retransmit_time.as_us();
}
max_slots.retransmit.fetch_max(max_slot, Ordering::Relaxed);
timer_start.stop();
Expand Down

0 comments on commit ff0e623

Please sign in to comment.