Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coalesce entries in recv_slot_entries to target byte count (backport #27321) #27407

Merged
merged 1 commit into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 52 additions & 32 deletions core/src/broadcast_stage/broadcast_utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use {
crate::result::Result,
bincode::serialized_size,
crossbeam_channel::Receiver,
solana_entry::entry::Entry,
solana_ledger::shred::ShredData,
solana_poh::poh_recorder::WorkingBankEntry,
solana_runtime::bank::Bank,
solana_sdk::clock::Slot,
Expand All @@ -11,6 +13,8 @@ use {
},
};

const ENTRY_COALESCE_DURATION: Duration = Duration::from_millis(50);

pub(super) struct ReceiveResults {
pub entries: Vec<Entry>,
pub time_elapsed: Duration,
Expand All @@ -26,45 +30,61 @@ pub struct UnfinishedSlotInfo {
pub parent: Slot,
}

/// This parameter tunes how many entries are received in one iteration of recv loop
/// This will prevent broadcast stage from consuming more entries, that could have led
/// to delays in shredding, and broadcasting shreds to peer validators
const RECEIVE_ENTRY_COUNT_THRESHOLD: usize = 8;

pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result<ReceiveResults> {
let target_serialized_batch_byte_count: u64 =
32 * ShredData::capacity(/*merkle_proof_size*/ None).unwrap() as u64;
let timer = Duration::new(1, 0);
let recv_start = Instant::now();

let (mut bank, (entry, mut last_tick_height)) = receiver.recv_timeout(timer)?;

let mut entries = vec![entry];
let mut slot = bank.slot();
let mut max_tick_height = bank.max_tick_height();

assert!(last_tick_height <= max_tick_height);

if last_tick_height != max_tick_height {
while let Ok((try_bank, (entry, tick_height))) = receiver.try_recv() {
// If the bank changed, that implies the previous slot was interrupted and we do not have to
// broadcast its entries.
if try_bank.slot() != slot {
warn!("Broadcast for slot: {} interrupted", bank.slot());
entries.clear();
bank = try_bank;
slot = bank.slot();
max_tick_height = bank.max_tick_height();
}
last_tick_height = tick_height;
entries.push(entry);

if entries.len() >= RECEIVE_ENTRY_COUNT_THRESHOLD {
break;
}

assert!(last_tick_height <= max_tick_height);
if last_tick_height == max_tick_height {
break;
}

assert!(last_tick_height <= bank.max_tick_height());

// Drain channel
while last_tick_height != bank.max_tick_height() {
let (try_bank, (entry, tick_height)) = match receiver.try_recv() {
Ok(working_bank_entry) => working_bank_entry,
Err(_) => break,
};
// If the bank changed, that implies the previous slot was interrupted and we do not have to
// broadcast its entries.
if try_bank.slot() != bank.slot() {
warn!("Broadcast for slot: {} interrupted", bank.slot());
entries.clear();
bank = try_bank;
}
last_tick_height = tick_height;
entries.push(entry);
assert!(last_tick_height <= bank.max_tick_height());
}

let mut serialized_batch_byte_count = serialized_size(&entries)?;

// Wait up to `ENTRY_COALESCE_DURATION` to try to coalesce entries into a 32 shred batch
let mut coalesce_deadline = Instant::now() + ENTRY_COALESCE_DURATION;
while last_tick_height != bank.max_tick_height()
&& serialized_batch_byte_count < target_serialized_batch_byte_count
{
let (try_bank, (entry, tick_height)) = match receiver.recv_deadline(coalesce_deadline) {
Ok(working_bank_entry) => working_bank_entry,
Err(_) => break,
};
// If the bank changed, that implies the previous slot was interrupted and we do not have to
// broadcast its entries.
if try_bank.slot() != bank.slot() {
warn!("Broadcast for slot: {} interrupted", bank.slot());
entries.clear();
serialized_batch_byte_count = 8; // Vec len
bank = try_bank;
coalesce_deadline = Instant::now() + ENTRY_COALESCE_DURATION;
}
last_tick_height = tick_height;
let entry_bytes = serialized_size(&entry)?;
serialized_batch_byte_count += entry_bytes;
entries.push(entry);
assert!(last_tick_height <= bank.max_tick_height());
}

let time_elapsed = recv_start.elapsed();
Expand Down
12 changes: 7 additions & 5 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@

#[cfg(test)]
pub(crate) use shred_code::MAX_CODE_SHREDS_PER_SLOT;
pub(crate) use shred_data::ShredData;
pub use {
self::stats::{ProcessShredsStats, ShredFetchStats},
crate::shredder::Shredder,
};
use {
self::{shred_code::ShredCode, traits::Shred as _},
crate::blockstore::{self, MAX_DATA_SHREDS_PER_SLOT},
Expand All @@ -75,6 +70,13 @@ use {
std::fmt::Debug,
thiserror::Error,
};
pub use {
self::{
shred_data::ShredData,
stats::{ProcessShredsStats, ShredFetchStats},
},
crate::shredder::Shredder,
};

mod common;
mod legacy;
Expand Down
2 changes: 1 addition & 1 deletion ledger/src/shred/shred_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl ShredData {
// Maximum size of ledger data that can be embedded in a data-shred.
// merkle_proof_size is the number of proof entries in the merkle tree
// branch. None indicates a legacy data-shred.
pub(crate) fn capacity(merkle_proof_size: Option<u8>) -> Result<usize, Error> {
pub fn capacity(merkle_proof_size: Option<u8>) -> Result<usize, Error> {
match merkle_proof_size {
None => Ok(legacy::ShredData::CAPACITY),
Some(proof_size) => merkle::ShredData::capacity(proof_size),
Expand Down