From f2fd668d03a8aafef31ab46addd30919146eff2d Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 25 Aug 2022 23:01:45 +0000 Subject: [PATCH] coalesce entries in recv_slot_entries to target byte count (backport #27321) (#27407) coalesce entries in recv_slot_entries to target byte count (#27321) (cherry picked from commit d1522fc7903d893308ead8947abb146d9a93570f) Co-authored-by: Jeff Biseda --- core/src/broadcast_stage/broadcast_utils.rs | 84 +++++++++++++-------- ledger/src/shred.rs | 12 +-- ledger/src/shred/shred_data.rs | 2 +- 3 files changed, 60 insertions(+), 38 deletions(-) diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index a54b0fa79f79c5..8079d396ddab7a 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -1,7 +1,9 @@ use { crate::result::Result, + bincode::serialized_size, crossbeam_channel::Receiver, solana_entry::entry::Entry, + solana_ledger::shred::ShredData, solana_poh::poh_recorder::WorkingBankEntry, solana_runtime::bank::Bank, solana_sdk::clock::Slot, @@ -11,6 +13,8 @@ use { }, }; +const ENTRY_COALESCE_DURATION: Duration = Duration::from_millis(50); + pub(super) struct ReceiveResults { pub entries: Vec, pub time_elapsed: Duration, @@ -26,45 +30,61 @@ pub struct UnfinishedSlotInfo { pub parent: Slot, } -/// This parameter tunes how many entries are received in one iteration of recv loop -/// This will prevent broadcast stage from consuming more entries, that could have led -/// to delays in shredding, and broadcasting shreds to peer validators -const RECEIVE_ENTRY_COUNT_THRESHOLD: usize = 8; - pub(super) fn recv_slot_entries(receiver: &Receiver) -> Result { + let target_serialized_batch_byte_count: u64 = + 32 * ShredData::capacity(/*merkle_proof_size*/ None).unwrap() as u64; let timer = Duration::new(1, 0); let recv_start = Instant::now(); + let (mut bank, (entry, mut last_tick_height)) = receiver.recv_timeout(timer)?; let mut entries = vec![entry]; - let mut slot = bank.slot(); - let mut max_tick_height = bank.max_tick_height(); - - assert!(last_tick_height <= max_tick_height); - - if last_tick_height != max_tick_height { - while let Ok((try_bank, (entry, tick_height))) = receiver.try_recv() { - // If the bank changed, that implies the previous slot was interrupted and we do not have to - // broadcast its entries. - if try_bank.slot() != slot { - warn!("Broadcast for slot: {} interrupted", bank.slot()); - entries.clear(); - bank = try_bank; - slot = bank.slot(); - max_tick_height = bank.max_tick_height(); - } - last_tick_height = tick_height; - entries.push(entry); - - if entries.len() >= RECEIVE_ENTRY_COUNT_THRESHOLD { - break; - } - - assert!(last_tick_height <= max_tick_height); - if last_tick_height == max_tick_height { - break; - } + + assert!(last_tick_height <= bank.max_tick_height()); + + // Drain channel + while last_tick_height != bank.max_tick_height() { + let (try_bank, (entry, tick_height)) = match receiver.try_recv() { + Ok(working_bank_entry) => working_bank_entry, + Err(_) => break, + }; + // If the bank changed, that implies the previous slot was interrupted and we do not have to + // broadcast its entries. + if try_bank.slot() != bank.slot() { + warn!("Broadcast for slot: {} interrupted", bank.slot()); + entries.clear(); + bank = try_bank; + } + last_tick_height = tick_height; + entries.push(entry); + assert!(last_tick_height <= bank.max_tick_height()); + } + + let mut serialized_batch_byte_count = serialized_size(&entries)?; + + // Wait up to `ENTRY_COALESCE_DURATION` to try to coalesce entries into a 32 shred batch + let mut coalesce_deadline = Instant::now() + ENTRY_COALESCE_DURATION; + while last_tick_height != bank.max_tick_height() + && serialized_batch_byte_count < target_serialized_batch_byte_count + { + let (try_bank, (entry, tick_height)) = match receiver.recv_deadline(coalesce_deadline) { + Ok(working_bank_entry) => working_bank_entry, + Err(_) => break, + }; + // If the bank changed, that implies the previous slot was interrupted and we do not have to + // broadcast its entries. + if try_bank.slot() != bank.slot() { + warn!("Broadcast for slot: {} interrupted", bank.slot()); + entries.clear(); + serialized_batch_byte_count = 8; // Vec len + bank = try_bank; + coalesce_deadline = Instant::now() + ENTRY_COALESCE_DURATION; } + last_tick_height = tick_height; + let entry_bytes = serialized_size(&entry)?; + serialized_batch_byte_count += entry_bytes; + entries.push(entry); + assert!(last_tick_height <= bank.max_tick_height()); } let time_elapsed = recv_start.elapsed(); diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index cee63cb45df57d..997b7bd3b6861b 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -51,11 +51,6 @@ #[cfg(test)] pub(crate) use shred_code::MAX_CODE_SHREDS_PER_SLOT; -pub(crate) use shred_data::ShredData; -pub use { - self::stats::{ProcessShredsStats, ShredFetchStats}, - crate::shredder::Shredder, -}; use { self::{shred_code::ShredCode, traits::Shred as _}, crate::blockstore::{self, MAX_DATA_SHREDS_PER_SLOT}, @@ -75,6 +70,13 @@ use { std::fmt::Debug, thiserror::Error, }; +pub use { + self::{ + shred_data::ShredData, + stats::{ProcessShredsStats, ShredFetchStats}, + }, + crate::shredder::Shredder, +}; mod common; mod legacy; diff --git a/ledger/src/shred/shred_data.rs b/ledger/src/shred/shred_data.rs index 777ef6feffb8cb..d6976514e22188 100644 --- a/ledger/src/shred/shred_data.rs +++ b/ledger/src/shred/shred_data.rs @@ -99,7 +99,7 @@ impl ShredData { // Maximum size of ledger data that can be embedded in a data-shred. // merkle_proof_size is the number of proof entries in the merkle tree // branch. None indicates a legacy data-shred. - pub(crate) fn capacity(merkle_proof_size: Option) -> Result { + pub fn capacity(merkle_proof_size: Option) -> Result { match merkle_proof_size { None => Ok(legacy::ShredData::CAPACITY), Some(proof_size) => merkle::ShredData::capacity(proof_size),