Skip to content

Commit

Permalink
coalesce entries in recv_slot_entries to target byte count (#27321)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbiseda authored Aug 25, 2022
1 parent 1a836ab commit d1522fc
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 38 deletions.
84 changes: 52 additions & 32 deletions core/src/broadcast_stage/broadcast_utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use {
crate::result::Result,
bincode::serialized_size,
crossbeam_channel::Receiver,
solana_entry::entry::Entry,
solana_ledger::shred::ShredData,
solana_poh::poh_recorder::WorkingBankEntry,
solana_runtime::bank::Bank,
solana_sdk::clock::Slot,
Expand All @@ -11,6 +13,8 @@ use {
},
};

const ENTRY_COALESCE_DURATION: Duration = Duration::from_millis(50);

pub(super) struct ReceiveResults {
pub entries: Vec<Entry>,
pub time_elapsed: Duration,
Expand All @@ -26,45 +30,61 @@ pub struct UnfinishedSlotInfo {
pub parent: Slot,
}

/// This parameter tunes how many entries are received in one iteration of recv loop
/// This will prevent broadcast stage from consuming more entries, that could have led
/// to delays in shredding, and broadcasting shreds to peer validators
const RECEIVE_ENTRY_COUNT_THRESHOLD: usize = 8;

pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result<ReceiveResults> {
let target_serialized_batch_byte_count: u64 =
32 * ShredData::capacity(/*merkle_proof_size*/ None).unwrap() as u64;
let timer = Duration::new(1, 0);
let recv_start = Instant::now();

let (mut bank, (entry, mut last_tick_height)) = receiver.recv_timeout(timer)?;

let mut entries = vec![entry];
let mut slot = bank.slot();
let mut max_tick_height = bank.max_tick_height();

assert!(last_tick_height <= max_tick_height);

if last_tick_height != max_tick_height {
while let Ok((try_bank, (entry, tick_height))) = receiver.try_recv() {
// If the bank changed, that implies the previous slot was interrupted and we do not have to
// broadcast its entries.
if try_bank.slot() != slot {
warn!("Broadcast for slot: {} interrupted", bank.slot());
entries.clear();
bank = try_bank;
slot = bank.slot();
max_tick_height = bank.max_tick_height();
}
last_tick_height = tick_height;
entries.push(entry);

if entries.len() >= RECEIVE_ENTRY_COUNT_THRESHOLD {
break;
}

assert!(last_tick_height <= max_tick_height);
if last_tick_height == max_tick_height {
break;
}

assert!(last_tick_height <= bank.max_tick_height());

// Drain channel
while last_tick_height != bank.max_tick_height() {
let (try_bank, (entry, tick_height)) = match receiver.try_recv() {
Ok(working_bank_entry) => working_bank_entry,
Err(_) => break,
};
// If the bank changed, that implies the previous slot was interrupted and we do not have to
// broadcast its entries.
if try_bank.slot() != bank.slot() {
warn!("Broadcast for slot: {} interrupted", bank.slot());
entries.clear();
bank = try_bank;
}
last_tick_height = tick_height;
entries.push(entry);
assert!(last_tick_height <= bank.max_tick_height());
}

let mut serialized_batch_byte_count = serialized_size(&entries)?;

// Wait up to `ENTRY_COALESCE_DURATION` to try to coalesce entries into a 32 shred batch
let mut coalesce_deadline = Instant::now() + ENTRY_COALESCE_DURATION;
while last_tick_height != bank.max_tick_height()
&& serialized_batch_byte_count < target_serialized_batch_byte_count
{
let (try_bank, (entry, tick_height)) = match receiver.recv_deadline(coalesce_deadline) {
Ok(working_bank_entry) => working_bank_entry,
Err(_) => break,
};
// If the bank changed, that implies the previous slot was interrupted and we do not have to
// broadcast its entries.
if try_bank.slot() != bank.slot() {
warn!("Broadcast for slot: {} interrupted", bank.slot());
entries.clear();
serialized_batch_byte_count = 8; // Vec len
bank = try_bank;
coalesce_deadline = Instant::now() + ENTRY_COALESCE_DURATION;
}
last_tick_height = tick_height;
let entry_bytes = serialized_size(&entry)?;
serialized_batch_byte_count += entry_bytes;
entries.push(entry);
assert!(last_tick_height <= bank.max_tick_height());
}

let time_elapsed = recv_start.elapsed();
Expand Down
12 changes: 7 additions & 5 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@
#[cfg(test)]
pub(crate) use shred_code::MAX_CODE_SHREDS_PER_SLOT;
pub(crate) use shred_data::ShredData;
pub use {
self::stats::{ProcessShredsStats, ShredFetchStats},
crate::shredder::Shredder,
};
use {
self::{shred_code::ShredCode, traits::Shred as _},
crate::blockstore::{self, MAX_DATA_SHREDS_PER_SLOT},
Expand All @@ -75,6 +70,13 @@ use {
std::fmt::Debug,
thiserror::Error,
};
pub use {
self::{
shred_data::ShredData,
stats::{ProcessShredsStats, ShredFetchStats},
},
crate::shredder::Shredder,
};

mod common;
mod legacy;
Expand Down
2 changes: 1 addition & 1 deletion ledger/src/shred/shred_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl ShredData {
// Maximum size of ledger data that can be embedded in a data-shred.
// merkle_proof_size is the number of proof entries in the merkle tree
// branch. None indicates a legacy data-shred.
pub(crate) fn capacity(merkle_proof_size: Option<u8>) -> Result<usize, Error> {
pub fn capacity(merkle_proof_size: Option<u8>) -> Result<usize, Error> {
match merkle_proof_size {
None => Ok(legacy::ShredData::CAPACITY),
Some(proof_size) => merkle::ShredData::capacity(proof_size),
Expand Down

0 comments on commit d1522fc

Please sign in to comment.