Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Separate out interrupted slots broadcast metrics (#20537)
Browse files Browse the repository at this point in the history
  • Loading branch information
carllin authored Oct 9, 2021
1 parent db9336c commit 838ff3b
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 71 deletions.
1 change: 1 addition & 0 deletions core/src/broadcast_stage/broadcast_fake_shreds_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
slot,
num_expected_batches: None,
slot_start_ts: Instant::now(),
was_interrupted: false,
};
// 3) Start broadcast step
//some indicates fake shreds
Expand Down
117 changes: 75 additions & 42 deletions core/src/broadcast_stage/broadcast_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use super::*;

pub(crate) trait BroadcastStats {
fn update(&mut self, new_stats: &Self);
fn report_stats(&mut self, slot: Slot, slot_start: Instant);
fn report_stats(&mut self, slot: Slot, slot_start: Instant, was_interrupted: bool);
}

#[derive(Clone)]
pub(crate) struct BroadcastShredBatchInfo {
pub(crate) slot: Slot,
pub(crate) num_expected_batches: Option<usize>,
pub(crate) slot_start_ts: Instant,
pub(crate) was_interrupted: bool,
}

#[derive(Default, Clone)]
Expand All @@ -33,25 +34,39 @@ impl BroadcastStats for TransmitShredsStats {
self.total_packets += new_stats.total_packets;
self.dropped_packets += new_stats.dropped_packets;
}
fn report_stats(&mut self, slot: Slot, slot_start: Instant) {
datapoint_info!(
"broadcast-transmit-shreds-stats",
("slot", slot as i64, i64),
(
"end_to_end_elapsed",
// `slot_start` signals when the first batch of shreds was
// received, used to measure duration of broadcast
slot_start.elapsed().as_micros() as i64,
i64
),
("transmit_elapsed", self.transmit_elapsed as i64, i64),
("send_mmsg_elapsed", self.send_mmsg_elapsed as i64, i64),
("get_peers_elapsed", self.get_peers_elapsed as i64, i64),
("num_shreds", self.num_shreds as i64, i64),
("shred_select", self.shred_select as i64, i64),
("total_packets", self.total_packets as i64, i64),
("dropped_packets", self.dropped_packets as i64, i64),
);
fn report_stats(&mut self, slot: Slot, slot_start: Instant, was_interrupted: bool) {
if was_interrupted {
datapoint_info!(
"broadcast-transmit-shreds-interrupted-stats",
("slot", slot as i64, i64),
("transmit_elapsed", self.transmit_elapsed as i64, i64),
("send_mmsg_elapsed", self.send_mmsg_elapsed as i64, i64),
("get_peers_elapsed", self.get_peers_elapsed as i64, i64),
("num_shreds", self.num_shreds as i64, i64),
("shred_select", self.shred_select as i64, i64),
("total_packets", self.total_packets as i64, i64),
("dropped_packets", self.dropped_packets as i64, i64),
);
} else {
datapoint_info!(
"broadcast-transmit-shreds-stats",
("slot", slot as i64, i64),
(
"end_to_end_elapsed",
// `slot_start` signals when the first batch of shreds was
// received, used to measure duration of broadcast
slot_start.elapsed().as_micros() as i64,
i64
),
("transmit_elapsed", self.transmit_elapsed as i64, i64),
("send_mmsg_elapsed", self.send_mmsg_elapsed as i64, i64),
("get_peers_elapsed", self.get_peers_elapsed as i64, i64),
("num_shreds", self.num_shreds as i64, i64),
("shred_select", self.shred_select as i64, i64),
("total_packets", self.total_packets as i64, i64),
("dropped_packets", self.dropped_packets as i64, i64),
);
}
}
}

Expand All @@ -65,24 +80,37 @@ impl BroadcastStats for InsertShredsStats {
self.insert_shreds_elapsed += new_stats.insert_shreds_elapsed;
self.num_shreds += new_stats.num_shreds;
}
fn report_stats(&mut self, slot: Slot, slot_start: Instant) {
datapoint_info!(
"broadcast-insert-shreds-stats",
("slot", slot as i64, i64),
(
"end_to_end_elapsed",
// `slot_start` signals when the first batch of shreds was
// received, used to measure duration of broadcast
slot_start.elapsed().as_micros() as i64,
i64
),
(
"insert_shreds_elapsed",
self.insert_shreds_elapsed as i64,
i64
),
("num_shreds", self.num_shreds as i64, i64),
);
fn report_stats(&mut self, slot: Slot, slot_start: Instant, was_interrupted: bool) {
if was_interrupted {
datapoint_info!(
"broadcast-insert-shreds-interrupted-stats",
("slot", slot as i64, i64),
(
"insert_shreds_elapsed",
self.insert_shreds_elapsed as i64,
i64
),
("num_shreds", self.num_shreds as i64, i64),
);
} else {
datapoint_info!(
"broadcast-insert-shreds-stats",
("slot", slot as i64, i64),
(
"end_to_end_elapsed",
// `slot_start` signals when the first batch of shreds was
// received, used to measure duration of broadcast
slot_start.elapsed().as_micros() as i64,
i64
),
(
"insert_shreds_elapsed",
self.insert_shreds_elapsed as i64,
i64
),
("num_shreds", self.num_shreds as i64, i64),
);
}
}
}

Expand Down Expand Up @@ -128,9 +156,11 @@ impl<T: BroadcastStats + Default> SlotBroadcastStats<T> {
}
if let Some(num_expected_batches) = slot_batch_counter.num_expected_batches {
if slot_batch_counter.num_batches == num_expected_batches {
slot_batch_counter
.broadcast_shred_stats
.report_stats(batch_info.slot, batch_info.slot_start_ts);
slot_batch_counter.broadcast_shred_stats.report_stats(
batch_info.slot,
batch_info.slot_start_ts,
batch_info.was_interrupted,
);
should_delete = true;
}
}
Expand Down Expand Up @@ -159,7 +189,7 @@ mod test {
self.count += new_stats.count;
self.sender = new_stats.sender.clone();
}
fn report_stats(&mut self, slot: Slot, slot_start: Instant) {
fn report_stats(&mut self, slot: Slot, slot_start: Instant, _was_interrupted: bool) {
self.sender
.as_ref()
.unwrap()
Expand All @@ -186,6 +216,7 @@ mod test {
slot: 0,
num_expected_batches: Some(2),
slot_start_ts: start,
was_interrupted: false,
}),
);

Expand Down Expand Up @@ -242,6 +273,7 @@ mod test {
slot: 0,
num_expected_batches: None,
slot_start_ts: start,
was_interrupted: false,
}),
);

Expand All @@ -265,6 +297,7 @@ mod test {
slot,
num_expected_batches: None,
slot_start_ts: start,
was_interrupted: false,
};
if i == round % num_threads {
broadcast_batch_info.num_expected_batches = Some(num_threads);
Expand Down
84 changes: 55 additions & 29 deletions core/src/broadcast_stage/standard_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl StandardBroadcastRun {
stats,
);
shreds.insert(0, shred);
self.report_and_reset_stats();
self.report_and_reset_stats(true);
self.unfinished_slot = None;
shreds
}
Expand Down Expand Up @@ -240,6 +240,7 @@ impl StandardBroadcastRun {
"Old broadcast start time for previous slot must exist if the previous slot
was interrupted",
),
was_interrupted: true,
});
let shreds = Arc::new(prev_slot_shreds);
debug_assert!(shreds.iter().all(|shred| shred.slot() == slot));
Expand All @@ -262,6 +263,7 @@ impl StandardBroadcastRun {
slot_start_ts: self
.slot_broadcast_start
.expect("Start timestamp must exist for a slot if we're broadcasting the slot"),
was_interrupted: false,
});
get_leader_schedule_time.stop();

Expand Down Expand Up @@ -297,7 +299,7 @@ impl StandardBroadcastRun {
self.process_shreds_stats.update(&process_stats);

if last_tick_height == bank.max_tick_height() {
self.report_and_reset_stats();
self.report_and_reset_stats(false);
self.unfinished_slot = None;
}

Expand Down Expand Up @@ -380,35 +382,59 @@ impl StandardBroadcastRun {
transmit_shreds_stats.update(new_transmit_shreds_stats, broadcast_shred_batch_info);
}

fn report_and_reset_stats(&mut self) {
fn report_and_reset_stats(&mut self, was_interrupted: bool) {
let stats = &self.process_shreds_stats;
let unfinished_slot = self.unfinished_slot.as_ref().unwrap();
datapoint_info!(
"broadcast-process-shreds-stats",
("slot", unfinished_slot.slot as i64, i64),
("shredding_time", stats.shredding_elapsed, i64),
("receive_time", stats.receive_elapsed, i64),
(
"num_data_shreds",
unfinished_slot.next_shred_index as i64,
i64
),
(
"slot_broadcast_time",
self.slot_broadcast_start.unwrap().elapsed().as_micros() as i64,
i64
),
(
"get_leader_schedule_time",
stats.get_leader_schedule_elapsed,
i64
),
("serialize_shreds_time", stats.serialize_elapsed, i64),
("gen_data_time", stats.gen_data_elapsed, i64),
("gen_coding_time", stats.gen_coding_elapsed, i64),
("sign_coding_time", stats.sign_coding_elapsed, i64),
("coding_send_time", stats.coding_send_elapsed, i64),
);
if was_interrupted {
datapoint_info!(
"broadcast-process-shreds-interrupted-stats",
("slot", unfinished_slot.slot as i64, i64),
("shredding_time", stats.shredding_elapsed, i64),
("receive_time", stats.receive_elapsed, i64),
(
"num_data_shreds",
unfinished_slot.next_shred_index as i64,
i64
),
(
"get_leader_schedule_time",
stats.get_leader_schedule_elapsed,
i64
),
("serialize_shreds_time", stats.serialize_elapsed, i64),
("gen_data_time", stats.gen_data_elapsed, i64),
("gen_coding_time", stats.gen_coding_elapsed, i64),
("sign_coding_time", stats.sign_coding_elapsed, i64),
("coding_send_time", stats.coding_send_elapsed, i64),
);
} else {
datapoint_info!(
"broadcast-process-shreds-stats",
("slot", unfinished_slot.slot as i64, i64),
("shredding_time", stats.shredding_elapsed, i64),
("receive_time", stats.receive_elapsed, i64),
(
"num_data_shreds",
unfinished_slot.next_shred_index as i64,
i64
),
(
"slot_broadcast_time",
self.slot_broadcast_start.unwrap().elapsed().as_micros() as i64,
i64
),
(
"get_leader_schedule_time",
stats.get_leader_schedule_elapsed,
i64
),
("serialize_shreds_time", stats.serialize_elapsed, i64),
("gen_data_time", stats.gen_data_elapsed, i64),
("gen_coding_time", stats.gen_coding_elapsed, i64),
("sign_coding_time", stats.sign_coding_elapsed, i64),
("coding_send_time", stats.coding_send_elapsed, i64),
);
}
self.process_shreds_stats.reset();
}
}
Expand Down

0 comments on commit 838ff3b

Please sign in to comment.