Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
track time to coalesce entries in recv_slot_entries (backport #27525) (
Browse files Browse the repository at this point in the history
…#27628)

track time to coalesce entries in recv_slot_entries (#27525)

(cherry picked from commit 269eb51)

Co-authored-by: Jeff Biseda <[email protected]>
  • Loading branch information
mergify[bot] and jbiseda authored Sep 7, 2022
1 parent 87af8b9 commit 2722cbc
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 9 deletions.
19 changes: 10 additions & 9 deletions core/src/broadcast_stage/broadcast_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const ENTRY_COALESCE_DURATION: Duration = Duration::from_millis(50);
pub(super) struct ReceiveResults {
pub entries: Vec<Entry>,
pub time_elapsed: Duration,
pub time_coalesced: Duration,
pub bank: Arc<Bank>,
pub last_tick_height: u64,
}
Expand All @@ -35,11 +36,8 @@ pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result
32 * ShredData::capacity(/*merkle_proof_size*/ None).unwrap() as u64;
let timer = Duration::new(1, 0);
let recv_start = Instant::now();

let (mut bank, (entry, mut last_tick_height)) = receiver.recv_timeout(timer)?;

let mut entries = vec![entry];

assert!(last_tick_height <= bank.max_tick_height());

// Drain channel
Expand All @@ -63,34 +61,37 @@ pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result
let mut serialized_batch_byte_count = serialized_size(&entries)?;

// Wait up to `ENTRY_COALESCE_DURATION` to try to coalesce entries into a 32 shred batch
let mut coalesce_deadline = Instant::now() + ENTRY_COALESCE_DURATION;
let mut coalesce_start = Instant::now();
while last_tick_height != bank.max_tick_height()
&& serialized_batch_byte_count < target_serialized_batch_byte_count
{
let (try_bank, (entry, tick_height)) = match receiver.recv_deadline(coalesce_deadline) {
Ok(working_bank_entry) => working_bank_entry,
Err(_) => break,
};
let (try_bank, (entry, tick_height)) =
match receiver.recv_deadline(coalesce_start + ENTRY_COALESCE_DURATION) {
Ok(working_bank_entry) => working_bank_entry,
Err(_) => break,
};
// If the bank changed, that implies the previous slot was interrupted and we do not have to
// broadcast its entries.
if try_bank.slot() != bank.slot() {
warn!("Broadcast for slot: {} interrupted", bank.slot());
entries.clear();
serialized_batch_byte_count = 8; // Vec len
bank = try_bank;
coalesce_deadline = Instant::now() + ENTRY_COALESCE_DURATION;
coalesce_start = Instant::now();
}
last_tick_height = tick_height;
let entry_bytes = serialized_size(&entry)?;
serialized_batch_byte_count += entry_bytes;
entries.push(entry);
assert!(last_tick_height <= bank.max_tick_height());
}
let time_coalesced = coalesce_start.elapsed();

let time_elapsed = recv_start.elapsed();
Ok(ReceiveResults {
entries,
time_elapsed,
time_coalesced,
bank,
last_tick_height,
})
Expand Down
7 changes: 7 additions & 0 deletions core/src/broadcast_stage/standard_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ impl StandardBroadcastRun {
receive_results: ReceiveResults,
) -> Result<()> {
let mut receive_elapsed = receive_results.time_elapsed;
let mut coalesce_elapsed = receive_results.time_coalesced;
let num_entries = receive_results.entries.len();
let bank = receive_results.bank.clone();
let last_tick_height = receive_results.last_tick_height;
Expand All @@ -193,6 +194,7 @@ impl StandardBroadcastRun {

self.current_slot_and_parent = Some((slot, parent_slot));
receive_elapsed = Duration::new(0, 0);
coalesce_elapsed = Duration::new(0, 0);
}

let mut process_stats = ProcessShredsStats::default();
Expand Down Expand Up @@ -291,6 +293,7 @@ impl StandardBroadcastRun {
process_stats.shredding_elapsed = to_shreds_time.as_us();
process_stats.get_leader_schedule_elapsed = get_leader_schedule_time.as_us();
process_stats.receive_elapsed = duration_as_us(&receive_elapsed);
process_stats.coalesce_elapsed = duration_as_us(&coalesce_elapsed);
process_stats.coding_send_elapsed = coding_send_time.as_us();

self.process_shreds_stats += process_stats;
Expand Down Expand Up @@ -552,6 +555,7 @@ mod test {
let receive_results = ReceiveResults {
entries: ticks0.clone(),
time_elapsed: Duration::new(3, 0),
time_coalesced: Duration::new(2, 0),
bank: bank0.clone(),
last_tick_height: (ticks0.len() - 1) as u64,
};
Expand Down Expand Up @@ -620,6 +624,7 @@ mod test {
let receive_results = ReceiveResults {
entries: ticks1.clone(),
time_elapsed: Duration::new(2, 0),
time_coalesced: Duration::new(1, 0),
bank: bank2,
last_tick_height: (ticks1.len() - 1) as u64,
};
Expand Down Expand Up @@ -684,6 +689,7 @@ mod test {
let receive_results = ReceiveResults {
entries: ticks,
time_elapsed: Duration::new(1, 0),
time_coalesced: Duration::new(0, 0),
bank: bank.clone(),
last_tick_height,
};
Expand Down Expand Up @@ -727,6 +733,7 @@ mod test {
let receive_results = ReceiveResults {
entries: ticks.clone(),
time_elapsed: Duration::new(3, 0),
time_coalesced: Duration::new(2, 0),
bank: bank0,
last_tick_height: ticks.len() as u64,
};
Expand Down
4 changes: 4 additions & 0 deletions ledger/src/shred/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct ProcessShredsStats {
pub sign_coding_elapsed: u64,
pub coding_send_elapsed: u64,
pub get_leader_schedule_elapsed: u64,
pub coalesce_elapsed: u64,
// Histogram count of num_data_shreds obtained from serializing entries
// counted in 5 buckets.
num_data_shreds_hist: [usize; 5],
Expand Down Expand Up @@ -83,6 +84,7 @@ impl ProcessShredsStats {
("num_data_shreds_31", self.num_data_shreds_hist[2], i64),
("num_data_shreds_63", self.num_data_shreds_hist[3], i64),
("num_data_shreds_64", self.num_data_shreds_hist[4], i64),
("coalesce_elapsed", self.coalesce_elapsed, i64),
);
*self = Self::default();
}
Expand Down Expand Up @@ -134,6 +136,7 @@ impl AddAssign<ProcessShredsStats> for ProcessShredsStats {
sign_coding_elapsed,
coding_send_elapsed,
get_leader_schedule_elapsed,
coalesce_elapsed,
num_data_shreds_hist,
num_extant_slots,
data_buffer_residual,
Expand All @@ -146,6 +149,7 @@ impl AddAssign<ProcessShredsStats> for ProcessShredsStats {
self.sign_coding_elapsed += sign_coding_elapsed;
self.coding_send_elapsed += coding_send_elapsed;
self.get_leader_schedule_elapsed += get_leader_schedule_elapsed;
self.coalesce_elapsed += coalesce_elapsed;
self.num_extant_slots += num_extant_slots;
self.data_buffer_residual += data_buffer_residual;
for (i, bucket) in self.num_data_shreds_hist.iter_mut().enumerate() {
Expand Down

0 comments on commit 2722cbc

Please sign in to comment.