Skip to content

Commit

Permalink
moves slot updates notifications after shreds retransmit (#26094)
Browse files Browse the repository at this point in the history
RetransmitSlotStats can already be utilized to track when the first
shred for a slot was received; therefore
    first_shreds_received: &Mutex<BTreeSet<Slot>>

is redundant. Sending update notifications after shreds retransmit will
also bypass the need for a mutex.
  • Loading branch information
behzadnouri authored Jun 21, 2022
1 parent 61946a4 commit 7542552
Showing 1 changed file with 18 additions and 39 deletions.
57 changes: 18 additions & 39 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use {
solana_sdk::{clock::Slot, epoch_schedule::EpochSchedule, pubkey::Pubkey, timing::timestamp},
solana_streamer::sendmmsg::{multi_target_send, SendPktsError},
std::{
collections::{BTreeSet, HashMap, HashSet},
collections::{HashMap, HashSet},
net::UdpSocket,
ops::AddAssign,
sync::{
Expand Down Expand Up @@ -154,29 +154,6 @@ fn should_skip_retransmit(
}
}

// Returns true if this is the first time receiving a shred for `shred_slot`.
fn check_if_first_shred_received(
shred_slot: Slot,
first_shreds_received: &Mutex<BTreeSet<Slot>>,
root_bank: &Bank,
) -> bool {
if shred_slot <= root_bank.slot() {
return false;
}

let mut first_shreds_received_locked = first_shreds_received.lock().unwrap();
if first_shreds_received_locked.insert(shred_slot) {
datapoint_info!("retransmit-first-shred", ("slot", shred_slot, i64));
if first_shreds_received_locked.len() > 100 {
*first_shreds_received_locked =
first_shreds_received_locked.split_off(&(root_bank.slot() + 1));
}
true
} else {
false
}
}

fn maybe_reset_shreds_received_cache(
shreds_received: &Mutex<ShredFilter>,
packet_hasher: &mut PacketHasher,
Expand Down Expand Up @@ -204,7 +181,6 @@ fn retransmit(
shreds_received: &Mutex<ShredFilter>,
packet_hasher: &mut PacketHasher,
max_slots: &MaxSlots,
first_shreds_received: &Mutex<BTreeSet<Slot>>,
rpc_subscriptions: Option<&RpcSubscriptions>,
) -> Result<(), RecvTimeoutError> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
Expand Down Expand Up @@ -238,15 +214,6 @@ fn retransmit(
.retransmit
.fetch_max(shred_slot, Ordering::Relaxed);

if let Some(rpc_subscriptions) = rpc_subscriptions {
if check_if_first_shred_received(shred_slot, first_shreds_received, &root_bank) {
rpc_subscriptions.notify_slot_update(SlotUpdate::FirstShredReceived {
slot: shred_slot,
timestamp: timestamp(),
});
}
}

let mut compute_turbine_peers = Measure::start("turbine_start");
// TODO: consider using root-bank here for leader lookup!
// Shreds' signatures should be verified before they reach here, and if
Expand Down Expand Up @@ -318,7 +285,7 @@ fn retransmit(
)
.reduce(HashMap::new, RetransmitSlotStats::merge)
});
stats.upsert_slot_stats(slot_stats);
stats.upsert_slot_stats(slot_stats, root_bank.slot(), rpc_subscriptions);
timer_start.stop();
stats.total_time += timer_start.as_us();
stats.maybe_submit(&root_bank, &working_bank, cluster_info, cluster_nodes_cache);
Expand Down Expand Up @@ -350,7 +317,6 @@ pub fn retransmitter(
let mut stats = RetransmitStats::new(Instant::now());
let shreds_received = Mutex::new(LruCache::new(DEFAULT_LRU_SIZE));
let mut packet_hasher = PacketHasher::default();
let first_shreds_received = Mutex::<BTreeSet<Slot>>::default();
let num_threads = get_thread_count().min(8).max(sockets.len());
let thread_pool = ThreadPoolBuilder::new()
.num_threads(num_threads)
Expand All @@ -375,7 +341,6 @@ pub fn retransmitter(
&shreds_received,
&mut packet_hasher,
&max_slots,
&first_shreds_received,
rpc_subscriptions.as_deref(),
) {
Ok(()) => (),
Expand Down Expand Up @@ -538,13 +503,27 @@ impl RetransmitStats {
}
}

fn upsert_slot_stats<I>(&mut self, feed: I)
where
fn upsert_slot_stats<I>(
&mut self,
feed: I,
root: Slot,
rpc_subscriptions: Option<&RpcSubscriptions>,
) where
I: IntoIterator<Item = (Slot, RetransmitSlotStats)>,
{
for (slot, slot_stats) in feed {
match self.slot_stats.get_mut(&slot) {
None => {
if let Some(rpc_subscriptions) = rpc_subscriptions {
if slot > root {
let slot_update = SlotUpdate::FirstShredReceived {
slot,
timestamp: slot_stats.outset,
};
rpc_subscriptions.notify_slot_update(slot_update);
datapoint_info!("retransmit-first-shred", ("slot", slot, i64));
}
}
self.slot_stats.put(slot, slot_stats);
}
Some(entry) => {
Expand Down

0 comments on commit 7542552

Please sign in to comment.