Skip to content

Commit

Permalink
reworks max number of outgoing push messages (solana-labs#3016)
Browse files Browse the repository at this point in the history
max_bytes for outgoing push messages is pretty outdated and does not
allow gossip to function properly with current testnet cluster size.

In particular it does not allow to clear out queue of pending push
messages unless the new_push_messages function is called very frequently
which involves repeatedly locking/unlocking CRDS table.

Additionally leaving gossip entries in the queue for the next round will
add delay to propagating push messages which can compound as messages go
through several hops.
  • Loading branch information
behzadnouri authored Sep 30, 2024
1 parent 0b44eb6 commit 489f483
Showing 1 changed file with 5 additions and 13 deletions.
18 changes: 5 additions & 13 deletions gossip/src/crds_gossip_push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ use {
push_active_set::PushActiveSet,
received_cache::ReceivedCache,
},
bincode::serialized_size,
itertools::Itertools,
solana_sdk::{
packet::PACKET_DATA_SIZE,
pubkey::Pubkey,
signature::{Keypair, Signer},
timing::timestamp,
Expand Down Expand Up @@ -53,8 +51,6 @@ const CRDS_GOSSIP_PRUNE_MIN_INGRESS_NODES: usize = 2;
const CRDS_GOSSIP_PUSH_ACTIVE_SET_SIZE: usize = CRDS_GOSSIP_PUSH_FANOUT + 3;

pub struct CrdsGossipPush {
/// Max bytes per message
max_bytes: usize,
/// Active set of validators for push
active_set: RwLock<PushActiveSet>,
/// Cursor into the crds table for values to push.
Expand All @@ -74,8 +70,6 @@ pub struct CrdsGossipPush {
impl Default for CrdsGossipPush {
fn default() -> Self {
Self {
// Allow upto 64 Crds Values per PUSH
max_bytes: PACKET_DATA_SIZE * 64,
active_set: RwLock::default(),
crds_cursor: Mutex::default(),
received_cache: Mutex::new(ReceivedCache::new(2 * CRDS_UNIQUE_PUBKEY_CAPACITY)),
Expand Down Expand Up @@ -180,10 +174,10 @@ impl CrdsGossipPush {
usize, // number of values
usize, // number of push messages
) {
const MAX_NUM_PUSHES: usize = 1 << 12;
let active_set = self.active_set.read().unwrap();
let mut num_pushes = 0;
let mut num_values = 0;
let mut total_bytes: usize = 0;
let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new();
let wallclock_window = self.wallclock_window(now);
let mut crds_cursor = self.crds_cursor.lock().unwrap();
Expand All @@ -193,12 +187,7 @@ impl CrdsGossipPush {
.get_entries(crds_cursor.deref_mut())
.map(|entry| &entry.value)
.filter(|value| wallclock_window.contains(&value.wallclock()));
for value in entries {
let serialized_size = serialized_size(&value).unwrap();
total_bytes = total_bytes.saturating_add(serialized_size as usize);
if total_bytes > self.max_bytes {
break;
}
'outer: for value in entries {
num_values += 1;
let origin = value.pubkey();
let nodes = active_set.get_nodes(
Expand All @@ -210,6 +199,9 @@ impl CrdsGossipPush {
for node in nodes.take(self.push_fanout) {
push_messages.entry(*node).or_default().push(value.clone());
num_pushes += 1;
if num_pushes >= MAX_NUM_PUSHES {
break 'outer;
}
}
}
drop(crds);
Expand Down

0 comments on commit 489f483

Please sign in to comment.