Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metric to measure the time it takes to gather enough assignments #4587

Merged
merged 9 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions polkadot/node/core/approval-voting/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ pub(crate) mod tests {
clock: Box::new(MockClock::default()),
assignment_criteria: Box::new(MockAssignmentCriteria::default()),
spans: HashMap::new(),
per_block_assignments_gathering_times: Default::default(),
}
}

Expand Down
153 changes: 149 additions & 4 deletions polkadot/node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ use sc_keystore::LocalKeystore;
use sp_application_crypto::Pair;
use sp_consensus::SyncOracle;
use sp_consensus_slots::Slot;
use std::time::Instant;

// The maximum block we keep track of assignments gathering times.
const MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS: u32 = 100;
alexggh marked this conversation as resolved.
Show resolved Hide resolved

use futures::{
channel::oneshot,
Expand Down Expand Up @@ -182,6 +186,8 @@ struct MetricsInner {
time_recover_and_approve: prometheus::Histogram,
candidate_signatures_requests_total: prometheus::Counter<prometheus::U64>,
unapproved_candidates_in_unfinalized_chain: prometheus::Gauge<prometheus::U64>,
// The time it takes in each stage to gather enough assignments.
assignments_gathering_time_by_stage: prometheus::HistogramVec,
alexggh marked this conversation as resolved.
Show resolved Hide resolved
}

/// Approval Voting metrics.
Expand Down Expand Up @@ -302,6 +308,16 @@ impl Metrics {
metrics.unapproved_candidates_in_unfinalized_chain.set(count as u64);
}
}

pub fn observe_assignment_gathering_time(&self, stage: usize, elapsed_as_millis: usize) {
if let Some(metrics) = &self.0 {
let stage_string = stage.to_string();
metrics
.assignments_gathering_time_by_stage
.with_label_values(&[if stage < 10 { stage_string.as_str() } else { "inf" }])
alexggh marked this conversation as resolved.
Show resolved Hide resolved
.observe(elapsed_as_millis as f64);
}
}
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -431,6 +447,17 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
assignments_gathering_time_by_stage: prometheus::register(
prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
"polkadot_parachain_assignments_gather_time_by_stage_ms",
"The time it takes in ms for each stage to gather enough assignments needed for approval",
alexggh marked this conversation as resolved.
Show resolved Hide resolved
)
.buckets(vec![0.0, 250.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0]),
&["stage"],
)?,
registry,
)?,
};

Ok(Metrics(Some(metrics)))
Expand Down Expand Up @@ -788,6 +815,28 @@ struct State {
clock: Box<dyn Clock + Send + Sync>,
assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
spans: HashMap<Hash, jaeger::PerLeafSpan>,
// Per block, candidate records about how long we take until we gather enough
// assignments, this is relevant because it gives us a good idea about how many
// tranches we trigger and why.
per_block_assignments_gathering_times:
HashMap<BlockNumber, HashMap<(Hash, CandidateHash), AssignmentGatheringRecord>>,
alexggh marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct AssignmentGatheringRecord {
// The stage we are in.
// Candidate assignment gathering goes in stages, first we wait for needed_approvals(stage 0)
// Then if we have no-shows, we move into stage 1 and wait for enough tranches to cover all
// no-shows.
stage: usize,
// The time we started the stage.
stage_start: Option<Instant>,
}

impl Default for AssignmentGatheringRecord {
fn default() -> Self {
AssignmentGatheringRecord { stage: 0, stage_start: Some(Instant::now()) }
}
}

#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
Expand Down Expand Up @@ -893,6 +942,93 @@ impl State {
},
}
}

fn mark_begining_of_gathering_assignments(
&mut self,
block_number: BlockNumber,
block_hash: Hash,
candidate: CandidateHash,
) {
let record = self
.per_block_assignments_gathering_times
.entry(block_number)
.or_default()
.entry((block_hash, candidate))
.or_default();

if record.stage_start.is_none() {
record.stage += 1;
gum::debug!(
target: LOG_TARGET,
stage = ?record.stage,
?block_hash,
?candidate,
"Started a new assignment gathering stage",
);
record.stage_start = Some(Instant::now());
}

// Make sure we always cleanup if we have too many records.
if self.per_block_assignments_gathering_times.len() >
MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS as usize &&
block_number >= MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS
{
self.cleanup_assignments_gathering_timestamp(
block_number - MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
)
}
}

fn mark_gathered_enough_assignments(
&mut self,
block_number: BlockNumber,
block_hash: Hash,
candidate: CandidateHash,
) -> AssignmentGatheringRecord {
let record = self
.per_block_assignments_gathering_times
.get_mut(&block_number)
.and_then(|entry| entry.get_mut(&(block_hash, candidate)));
let stage = record.as_ref().map(|record| record.stage).unwrap_or_default();
AssignmentGatheringRecord {
stage,
stage_start: record.and_then(|record| record.stage_start.take()),
}
}

fn cleanup_assignments_gathering_timestamp(&mut self, keep_greater_than: BlockNumber) {
self.per_block_assignments_gathering_times
.retain(|block_number, _| *block_number > keep_greater_than)
alexggh marked this conversation as resolved.
Show resolved Hide resolved
}

fn observe_assignment_gathering_status(
alexggh marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
metrics: &Metrics,
required_tranches: &RequiredTranches,
block_hash: Hash,
block_number: BlockNumber,
candidate_hash: CandidateHash,
) {
match required_tranches {
RequiredTranches::All | RequiredTranches::Pending { .. } => {
self.mark_begining_of_gathering_assignments(
block_number,
block_hash,
candidate_hash,
);
},
RequiredTranches::Exact { .. } => {
let time_to_gather =
self.mark_gathered_enough_assignments(block_number, block_hash, candidate_hash);
if let Some(gathering_started) = time_to_gather.stage_start {
metrics.observe_assignment_gathering_time(
time_to_gather.stage,
gathering_started.elapsed().as_millis() as usize,
)
}
},
}
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -941,6 +1077,7 @@ where
clock: subsystem.clock,
assignment_criteria,
spans: HashMap::new(),
per_block_assignments_gathering_times: Default::default(),
};

// `None` on start-up. Gets initialized/updated on leaf update
Expand Down Expand Up @@ -972,7 +1109,7 @@ where
subsystem.metrics.on_wakeup();
process_wakeup(
&mut ctx,
&state,
&mut state,
&mut overlayed_db,
&mut session_info_provider,
woken_block,
Expand Down Expand Up @@ -1628,6 +1765,7 @@ async fn handle_from_overseer<Context>(
// `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans
// accordingly.
wakeups.prune_finalized_wakeups(block_number, &mut state.spans);
state.cleanup_assignments_gathering_timestamp(block_number);

// // `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans
// accordingly. let hash_set =
Expand Down Expand Up @@ -2474,7 +2612,7 @@ where

async fn check_and_import_approval<T, Sender>(
sender: &mut Sender,
state: &State,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut RuntimeInfo,
metrics: &Metrics,
Expand Down Expand Up @@ -2706,7 +2844,7 @@ impl ApprovalStateTransition {
// as necessary and schedules any further wakeups.
async fn advance_approval_state<Sender>(
sender: &mut Sender,
state: &State,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut RuntimeInfo,
metrics: &Metrics,
Expand Down Expand Up @@ -2757,6 +2895,13 @@ where
approval_entry,
status.required_tranches.clone(),
);
state.observe_assignment_gathering_status(
&metrics,
&status.required_tranches,
block_hash,
block_entry.block_number(),
candidate_hash,
);

// Check whether this is approved, while allowing a maximum
// assignment tick of `now - APPROVAL_DELAY` - that is, that
Expand Down Expand Up @@ -2937,7 +3082,7 @@ fn should_trigger_assignment(
#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
async fn process_wakeup<Context>(
ctx: &mut Context,
state: &State,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut RuntimeInfo,
relay_block: Hash,
Expand Down
Loading
Loading