From da1e2e018dad8a29e00d0d193df0013f26171d21 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Sun, 26 May 2024 14:58:47 +0300 Subject: [PATCH 1/9] Add metric to measure the time it takes to gather enough assignments Signed-off-by: Alexandru Gheorghe --- .../node/core/approval-voting/src/import.rs | 1 + polkadot/node/core/approval-voting/src/lib.rs | 146 +++++++++++- .../node/core/approval-voting/src/tests.rs | 212 +++++++++++++++++- 3 files changed, 354 insertions(+), 5 deletions(-) diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs index f4be42a48450..fca7d16b8166 100644 --- a/polkadot/node/core/approval-voting/src/import.rs +++ b/polkadot/node/core/approval-voting/src/import.rs @@ -658,6 +658,7 @@ pub(crate) mod tests { clock: Box::new(MockClock::default()), assignment_criteria: Box::new(MockAssignmentCriteria::default()), spans: HashMap::new(), + time_started_gathering_assignments: Default::default(), } } diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index b5ed92fa39c8..05b21c01fa5e 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -63,6 +63,10 @@ use sc_keystore::LocalKeystore; use sp_application_crypto::Pair; use sp_consensus::SyncOracle; use sp_consensus_slots::Slot; +use std::time::Instant; + +// The maximum time we keep track of assignments gathering times times. +const MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS: u32 = 100; use futures::{ channel::oneshot, @@ -182,6 +186,8 @@ struct MetricsInner { time_recover_and_approve: prometheus::Histogram, candidate_signatures_requests_total: prometheus::Counter, unapproved_candidates_in_unfinalized_chain: prometheus::Gauge, + // The time it takes in each stage to gather enough assignments. + assignments_gathering_time_by_stage: prometheus::HistogramVec, } /// Approval Voting metrics. @@ -302,6 +308,16 @@ impl Metrics { metrics.unapproved_candidates_in_unfinalized_chain.set(count as u64); } } + + pub fn observe_assignment_gathering_time(&self, stage: usize, elapsed_as_millis: usize) { + if let Some(metrics) = &self.0 { + let stage_string = stage.to_string(); + metrics + .assignments_gathering_time_by_stage + .with_label_values(&[if stage < 10 { stage_string.as_str() } else { "inf" }]) + .observe(elapsed_as_millis as f64); + } + } } impl metrics::Metrics for Metrics { @@ -431,6 +447,17 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + assignments_gathering_time_by_stage: prometheus::register( + prometheus::HistogramVec::new( + prometheus::HistogramOpts::new( + "polkadot_parachain_assignments_gather_time_by_stage", + "The time it takes for each stage to gather enough assignments needed for approval", + ) + .buckets(vec![0.0, 250.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0]), + &["stage"], + )?, + registry, + )?, }; Ok(Metrics(Some(metrics))) @@ -788,6 +815,28 @@ struct State { clock: Box, assignment_criteria: Box, spans: HashMap, + // Per block, candidate records about how long we take until we gather enough + // assignments, this is relevant because it gives us a good idea about how many + // tranches we trigger and why. + time_started_gathering_assignments: + HashMap>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct AssignmentGatheringRecord { + // The stage we are in. + // Candidate assignment gathering goes in stages, first we wait for needed_approvals(stage 0) + // Then if we have no-shows, we move into stage 1 and wait for enough tranches to cover all + // no-shows. + stage: usize, + // The time we started the stage. + stage_start: Option, +} + +impl Default for AssignmentGatheringRecord { + fn default() -> Self { + AssignmentGatheringRecord { stage: 0, stage_start: Some(Instant::now()) } + } } #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] @@ -893,6 +942,86 @@ impl State { }, } } + + fn mark_begining_of_gathering_assignments( + &mut self, + block_number: BlockNumber, + block_hash: Hash, + candidate: CandidateHash, + ) { + let record = self + .time_started_gathering_assignments + .entry(block_number) + .or_default() + .entry((block_hash, candidate)) + .or_default(); + + if record.stage_start.is_none() { + record.stage += 1; + record.stage_start = Some(Instant::now()); + } + + // Make sure we always cleanup if we have too many records. + if self.time_started_gathering_assignments.len() > + MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS as usize && + block_number >= MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS + { + self.cleanup_assignments_gathering_timestamp( + block_number - MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS, + ) + } + } + + fn mark_gathered_enough_assignments( + &mut self, + block_number: BlockNumber, + block_hash: Hash, + candidate: CandidateHash, + ) -> AssignmentGatheringRecord { + let record = self + .time_started_gathering_assignments + .get_mut(&block_number) + .and_then(|entry| entry.get_mut(&(block_hash, candidate))); + let stage = record.as_ref().map(|record| record.stage).unwrap_or_default(); + AssignmentGatheringRecord { + stage, + stage_start: record.and_then(|record| record.stage_start.take()), + } + } + + fn cleanup_assignments_gathering_timestamp(&mut self, keep_greater_than: BlockNumber) { + self.time_started_gathering_assignments + .retain(|block_number, _| *block_number > keep_greater_than) + } + + fn observe_assignment_gathering_status( + &mut self, + metrics: &Metrics, + required_tranches: &RequiredTranches, + block_hash: Hash, + block_number: BlockNumber, + candidate_hash: CandidateHash, + ) { + match required_tranches { + RequiredTranches::All | RequiredTranches::Pending { .. } => { + self.mark_begining_of_gathering_assignments( + block_number, + block_hash, + candidate_hash, + ); + }, + RequiredTranches::Exact { .. } => { + let time_to_gather = + self.mark_gathered_enough_assignments(block_number, block_hash, candidate_hash); + if let Some(gathering_started) = time_to_gather.stage_start { + metrics.observe_assignment_gathering_time( + time_to_gather.stage, + gathering_started.elapsed().as_millis() as usize, + ) + } + }, + } + } } #[derive(Debug, Clone)] @@ -941,6 +1070,7 @@ where clock: subsystem.clock, assignment_criteria, spans: HashMap::new(), + time_started_gathering_assignments: Default::default(), }; // `None` on start-up. Gets initialized/updated on leaf update @@ -972,7 +1102,7 @@ where subsystem.metrics.on_wakeup(); process_wakeup( &mut ctx, - &state, + &mut state, &mut overlayed_db, &mut session_info_provider, woken_block, @@ -1628,6 +1758,7 @@ async fn handle_from_overseer( // `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans // accordingly. wakeups.prune_finalized_wakeups(block_number, &mut state.spans); + state.cleanup_assignments_gathering_timestamp(block_number); // // `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans // accordingly. let hash_set = @@ -2474,7 +2605,7 @@ where async fn check_and_import_approval( sender: &mut Sender, - state: &State, + state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, metrics: &Metrics, @@ -2706,7 +2837,7 @@ impl ApprovalStateTransition { // as necessary and schedules any further wakeups. async fn advance_approval_state( sender: &mut Sender, - state: &State, + state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, metrics: &Metrics, @@ -2757,6 +2888,13 @@ where approval_entry, status.required_tranches.clone(), ); + state.observe_assignment_gathering_status( + &metrics, + &status.required_tranches, + block_hash, + block_entry.block_number(), + candidate_hash, + ); // Check whether this is approved, while allowing a maximum // assignment tick of `now - APPROVAL_DELAY` - that is, that @@ -2937,7 +3075,7 @@ fn should_trigger_assignment( #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] async fn process_wakeup( ctx: &mut Context, - state: &State, + state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, relay_block: Hash, diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index 312d805bbefb..aea2222fd135 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -17,6 +17,10 @@ use self::test_helpers::mock::new_leaf; use super::*; use crate::backend::V1ReadBackend; +use overseer::prometheus::{ + prometheus::{IntCounter, IntCounterVec}, + Histogram, HistogramOpts, HistogramVec, Opts, +}; use polkadot_node_primitives::{ approval::{ v1::{ @@ -40,7 +44,7 @@ use polkadot_primitives::{ ApprovalVote, CandidateCommitments, CandidateEvent, CoreIndex, GroupIndex, Header, Id as ParaId, IndexedVec, NodeFeatures, ValidationCode, ValidatorSignature, }; -use std::time::Duration; +use std::{cmp::max, time::Duration}; use assert_matches::assert_matches; use async_trait::async_trait; @@ -5049,3 +5053,209 @@ fn subsystem_sends_pending_approvals_on_approval_restart() { virtual_overseer }); } + +// Test we correctly update the timer when we mark the beginning of gathering assignments. +#[test] +fn test_gathering_assignments_statements() { + let mut state = State { + keystore: Arc::new(LocalKeystore::in_memory()), + slot_duration_millis: 6_000, + clock: Box::new(MockClock::default()), + assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|_| Ok(0))), + spans: HashMap::new(), + time_started_gathering_assignments: Default::default(), + }; + + for i in 0..200i32 { + state.mark_begining_of_gathering_assignments( + i as u32, + Hash::repeat_byte(i as u8), + CandidateHash(Hash::repeat_byte(i as u8)), + ); + assert!( + state.time_started_gathering_assignments.len() <= + MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS as usize + ); + + assert_eq!( + state.time_started_gathering_assignments.keys().min(), + Some(max(0, i - MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS as i32 + 1) as u32).as_ref() + ) + } + assert_eq!( + state.time_started_gathering_assignments.len(), + MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS as usize + ); + + let nothing_changes = state.time_started_gathering_assignments.clone(); + + for i in 150..200i32 { + state.mark_begining_of_gathering_assignments( + i as u32, + Hash::repeat_byte(i as u8), + CandidateHash(Hash::repeat_byte(i as u8)), + ); + assert_eq!(nothing_changes, state.time_started_gathering_assignments); + } + + for i in 110..120 { + let block_hash = Hash::repeat_byte(i as u8); + let candidate_hash = CandidateHash(Hash::repeat_byte(i as u8)); + + state.mark_gathered_enough_assignments(i as u32, block_hash, candidate_hash); + + assert!(state + .time_started_gathering_assignments + .get(&i) + .unwrap() + .get(&(block_hash, candidate_hash)) + .unwrap() + .stage_start + .is_none()); + state.mark_begining_of_gathering_assignments(i as u32, block_hash, candidate_hash); + let record = state + .time_started_gathering_assignments + .get(&i) + .unwrap() + .get(&(block_hash, candidate_hash)) + .unwrap(); + + assert!(record.stage_start.is_some()); + assert_eq!(record.stage, 1); + } +} + +// Test we note the time we took to transition RequiredTranche from Pending to Exact and +// that we increase the stage when we transition from Exact to Pending. +#[test] +fn test_observe_assignment_gathering_status() { + let mut state = State { + keystore: Arc::new(LocalKeystore::in_memory()), + slot_duration_millis: 6_000, + clock: Box::new(MockClock::default()), + assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|_| Ok(0))), + spans: HashMap::new(), + time_started_gathering_assignments: Default::default(), + }; + + let metrics_inner = MetricsInner { + imported_candidates_total: IntCounter::new("dummy", "dummy").unwrap(), + assignments_produced: Histogram::with_opts(HistogramOpts::new("dummy", "dummy")).unwrap(), + approvals_produced_total: IntCounterVec::new(Opts::new("dummy", "dummy"), &["dummy"]) + .unwrap(), + no_shows_total: IntCounter::new("dummy", "dummy").unwrap(), + observed_no_shows: IntCounter::new("dummy", "dummy").unwrap(), + approved_by_one_third: IntCounter::new("dummy", "dummy").unwrap(), + wakeups_triggered_total: IntCounter::new("dummy", "dummy").unwrap(), + coalesced_approvals_buckets: Histogram::with_opts(HistogramOpts::new("dummy", "dummy")) + .unwrap(), + coalesced_approvals_delay: Histogram::with_opts(HistogramOpts::new("dummy", "dummy")) + .unwrap(), + candidate_approval_time_ticks: Histogram::with_opts(HistogramOpts::new("dummy", "dummy")) + .unwrap(), + block_approval_time_ticks: Histogram::with_opts(HistogramOpts::new("dummy", "dummy")) + .unwrap(), + time_db_transaction: Histogram::with_opts(HistogramOpts::new("dummy", "dummy")).unwrap(), + time_recover_and_approve: Histogram::with_opts(HistogramOpts::new("dummy", "dummy")) + .unwrap(), + candidate_signatures_requests_total: IntCounter::new("dummy", "dummy").unwrap(), + unapproved_candidates_in_unfinalized_chain: prometheus::Gauge::::new( + "dummy", "dummy", + ) + .unwrap(), + assignments_gathering_time_by_stage: HistogramVec::new( + HistogramOpts::new("test", "test"), + &["stage"], + ) + .unwrap(), + }; + + let metrics = Metrics(Some(metrics_inner)); + let block_hash = Hash::repeat_byte(1); + let candidate_hash = CandidateHash(Hash::repeat_byte(1)); + let block_number = 1; + + // Transition from Pending to Exact and check stage 0 time is recorded. + state.observe_assignment_gathering_status( + &metrics, + &RequiredTranches::Pending { + considered: 0, + next_no_show: None, + maximum_broadcast: 0, + clock_drift: 0, + }, + block_hash, + block_number, + candidate_hash, + ); + + state.observe_assignment_gathering_status( + &metrics, + &RequiredTranches::Exact { + needed: 2, + tolerated_missing: 2, + next_no_show: None, + last_assignment_tick: None, + }, + block_hash, + block_number, + candidate_hash, + ); + + let value = metrics + .0 + .as_ref() + .unwrap() + .assignments_gathering_time_by_stage + .get_metric_with_label_values(&["0"]) + .unwrap(); + + assert_eq!(value.get_sample_count(), 1); + + // Transition from Exact to Pending to Exact and check stage 1 time is recorded. + state.observe_assignment_gathering_status( + &metrics, + &RequiredTranches::Pending { + considered: 0, + next_no_show: None, + maximum_broadcast: 0, + clock_drift: 0, + }, + block_hash, + block_number, + candidate_hash, + ); + + state.observe_assignment_gathering_status( + &metrics, + &RequiredTranches::Exact { + needed: 2, + tolerated_missing: 2, + next_no_show: None, + last_assignment_tick: None, + }, + block_hash, + block_number, + candidate_hash, + ); + + let value = metrics + .0 + .as_ref() + .unwrap() + .assignments_gathering_time_by_stage + .get_metric_with_label_values(&["0"]) + .unwrap(); + + assert_eq!(value.get_sample_count(), 1); + + let value = metrics + .0 + .as_ref() + .unwrap() + .assignments_gathering_time_by_stage + .get_metric_with_label_values(&["1"]) + .unwrap(); + + assert_eq!(value.get_sample_count(), 1); +} From 8ccd444b805c56a4fbeca0be5ce001b6456342c4 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Sun, 26 May 2024 15:18:18 +0300 Subject: [PATCH 2/9] Add log message Signed-off-by: Alexandru Gheorghe --- polkadot/node/core/approval-voting/src/lib.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 05b21c01fa5e..84b58ae80248 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -958,6 +958,13 @@ impl State { if record.stage_start.is_none() { record.stage += 1; + gum::debug!( + target: LOG_TARGET, + "Started a new assignment gathering stage", + stage = ?record.stage, + ?block_hash, + ?candidate, + ); record.stage_start = Some(Instant::now()); } From ea004daff2951ff62a10af590287932dc978d938 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Mon, 27 May 2024 08:28:34 +0300 Subject: [PATCH 3/9] Fix clippy Signed-off-by: Alexandru Gheorghe --- polkadot/node/core/approval-voting/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 84b58ae80248..8c56a1f5142c 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -960,10 +960,10 @@ impl State { record.stage += 1; gum::debug!( target: LOG_TARGET, - "Started a new assignment gathering stage", stage = ?record.stage, ?block_hash, ?candidate, + "Started a new assignment gathering stage", ); record.stage_start = Some(Instant::now()); } From 6458aac875fce34ff0cae655b7d42ed3e1a4b91f Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Mon, 27 May 2024 08:54:51 +0300 Subject: [PATCH 4/9] Address review feedback Signed-off-by: Alexandru Gheorghe --- .../node/core/approval-voting/src/import.rs | 2 +- polkadot/node/core/approval-voting/src/lib.rs | 18 +++++++++--------- .../node/core/approval-voting/src/tests.rs | 18 +++++++++--------- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs index fca7d16b8166..77515efc4e45 100644 --- a/polkadot/node/core/approval-voting/src/import.rs +++ b/polkadot/node/core/approval-voting/src/import.rs @@ -658,7 +658,7 @@ pub(crate) mod tests { clock: Box::new(MockClock::default()), assignment_criteria: Box::new(MockAssignmentCriteria::default()), spans: HashMap::new(), - time_started_gathering_assignments: Default::default(), + per_block_assignments_gathering_times: Default::default(), } } diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 8c56a1f5142c..1312fcba226f 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -65,7 +65,7 @@ use sp_consensus::SyncOracle; use sp_consensus_slots::Slot; use std::time::Instant; -// The maximum time we keep track of assignments gathering times times. +// The maximum block we keep track of assignments gathering times. const MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS: u32 = 100; use futures::{ @@ -450,8 +450,8 @@ impl metrics::Metrics for Metrics { assignments_gathering_time_by_stage: prometheus::register( prometheus::HistogramVec::new( prometheus::HistogramOpts::new( - "polkadot_parachain_assignments_gather_time_by_stage", - "The time it takes for each stage to gather enough assignments needed for approval", + "polkadot_parachain_assignments_gather_time_by_stage_ms", + "The time it takes in ms for each stage to gather enough assignments needed for approval", ) .buckets(vec![0.0, 250.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0]), &["stage"], @@ -818,7 +818,7 @@ struct State { // Per block, candidate records about how long we take until we gather enough // assignments, this is relevant because it gives us a good idea about how many // tranches we trigger and why. - time_started_gathering_assignments: + per_block_assignments_gathering_times: HashMap>, } @@ -950,7 +950,7 @@ impl State { candidate: CandidateHash, ) { let record = self - .time_started_gathering_assignments + .per_block_assignments_gathering_times .entry(block_number) .or_default() .entry((block_hash, candidate)) @@ -969,7 +969,7 @@ impl State { } // Make sure we always cleanup if we have too many records. - if self.time_started_gathering_assignments.len() > + if self.per_block_assignments_gathering_times.len() > MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS as usize && block_number >= MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS { @@ -986,7 +986,7 @@ impl State { candidate: CandidateHash, ) -> AssignmentGatheringRecord { let record = self - .time_started_gathering_assignments + .per_block_assignments_gathering_times .get_mut(&block_number) .and_then(|entry| entry.get_mut(&(block_hash, candidate))); let stage = record.as_ref().map(|record| record.stage).unwrap_or_default(); @@ -997,7 +997,7 @@ impl State { } fn cleanup_assignments_gathering_timestamp(&mut self, keep_greater_than: BlockNumber) { - self.time_started_gathering_assignments + self.per_block_assignments_gathering_times .retain(|block_number, _| *block_number > keep_greater_than) } @@ -1077,7 +1077,7 @@ where clock: subsystem.clock, assignment_criteria, spans: HashMap::new(), - time_started_gathering_assignments: Default::default(), + per_block_assignments_gathering_times: Default::default(), }; // `None` on start-up. Gets initialized/updated on leaf update diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index aea2222fd135..f522def5be15 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -5063,7 +5063,7 @@ fn test_gathering_assignments_statements() { clock: Box::new(MockClock::default()), assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|_| Ok(0))), spans: HashMap::new(), - time_started_gathering_assignments: Default::default(), + per_block_assignments_gathering_times: Default::default(), }; for i in 0..200i32 { @@ -5073,21 +5073,21 @@ fn test_gathering_assignments_statements() { CandidateHash(Hash::repeat_byte(i as u8)), ); assert!( - state.time_started_gathering_assignments.len() <= + state.per_block_assignments_gathering_times.len() <= MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS as usize ); assert_eq!( - state.time_started_gathering_assignments.keys().min(), + state.per_block_assignments_gathering_times.keys().min(), Some(max(0, i - MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS as i32 + 1) as u32).as_ref() ) } assert_eq!( - state.time_started_gathering_assignments.len(), + state.per_block_assignments_gathering_times.len(), MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS as usize ); - let nothing_changes = state.time_started_gathering_assignments.clone(); + let nothing_changes = state.per_block_assignments_gathering_times.clone(); for i in 150..200i32 { state.mark_begining_of_gathering_assignments( @@ -5095,7 +5095,7 @@ fn test_gathering_assignments_statements() { Hash::repeat_byte(i as u8), CandidateHash(Hash::repeat_byte(i as u8)), ); - assert_eq!(nothing_changes, state.time_started_gathering_assignments); + assert_eq!(nothing_changes, state.per_block_assignments_gathering_times); } for i in 110..120 { @@ -5105,7 +5105,7 @@ fn test_gathering_assignments_statements() { state.mark_gathered_enough_assignments(i as u32, block_hash, candidate_hash); assert!(state - .time_started_gathering_assignments + .per_block_assignments_gathering_times .get(&i) .unwrap() .get(&(block_hash, candidate_hash)) @@ -5114,7 +5114,7 @@ fn test_gathering_assignments_statements() { .is_none()); state.mark_begining_of_gathering_assignments(i as u32, block_hash, candidate_hash); let record = state - .time_started_gathering_assignments + .per_block_assignments_gathering_times .get(&i) .unwrap() .get(&(block_hash, candidate_hash)) @@ -5135,7 +5135,7 @@ fn test_observe_assignment_gathering_status() { clock: Box::new(MockClock::default()), assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|_| Ok(0))), spans: HashMap::new(), - time_started_gathering_assignments: Default::default(), + per_block_assignments_gathering_times: Default::default(), }; let metrics_inner = MetricsInner { From c0011125974a62a53f61b8b23528a7f07927154a Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Mon, 27 May 2024 16:30:28 +0300 Subject: [PATCH 5/9] Use LRU for keeping track of blocks Signed-off-by: Alexandru Gheorghe --- .../node/core/approval-voting/src/import.rs | 7 ++- polkadot/node/core/approval-voting/src/lib.rs | 61 +++++++++---------- .../node/core/approval-voting/src/tests.rs | 34 +++++++++-- 3 files changed, 64 insertions(+), 38 deletions(-) diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs index 77515efc4e45..13b0b1bae1bc 100644 --- a/polkadot/node/core/approval-voting/src/import.rs +++ b/polkadot/node/core/approval-voting/src/import.rs @@ -607,7 +607,7 @@ pub(crate) mod tests { use super::*; use crate::{ approval_db::common::{load_block_entry, DbBackend}, - RuntimeInfo, RuntimeInfoConfig, + RuntimeInfo, RuntimeInfoConfig, MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS, }; use ::test_helpers::{dummy_candidate_receipt, dummy_hash}; use assert_matches::assert_matches; @@ -622,6 +622,7 @@ pub(crate) mod tests { node_features::FeatureIndex, ExecutorParams, Id as ParaId, IndexedVec, NodeFeatures, SessionInfo, ValidatorId, ValidatorIndex, }; + use schnellru::{ByLength, LruMap}; pub(crate) use sp_consensus_babe::{ digests::{CompatibleDigestItem, PreDigest, SecondaryVRFPreDigest}, AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch, @@ -658,7 +659,9 @@ pub(crate) mod tests { clock: Box::new(MockClock::default()), assignment_criteria: Box::new(MockAssignmentCriteria::default()), spans: HashMap::new(), - per_block_assignments_gathering_times: Default::default(), + per_block_assignments_gathering_times: LruMap::new(ByLength::new( + MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS, + )), } } diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 1312fcba226f..8906d01b4c9f 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -65,7 +65,9 @@ use sp_consensus::SyncOracle; use sp_consensus_slots::Slot; use std::time::Instant; -// The maximum block we keep track of assignments gathering times. +// The maximum block we keep track of assignments gathering times, on normal operation +// this would never be reached because we prune the data on finalization, but we need +// to also ensure the data is not growing unecessarily large. const MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS: u32 = 100; use futures::{ @@ -819,7 +821,7 @@ struct State { // assignments, this is relevant because it gives us a good idea about how many // tranches we trigger and why. per_block_assignments_gathering_times: - HashMap>, + LruMap>, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -949,33 +951,22 @@ impl State { block_hash: Hash, candidate: CandidateHash, ) { - let record = self + if let Some(record) = self .per_block_assignments_gathering_times - .entry(block_number) - .or_default() - .entry((block_hash, candidate)) - .or_default(); - - if record.stage_start.is_none() { - record.stage += 1; - gum::debug!( - target: LOG_TARGET, - stage = ?record.stage, - ?block_hash, - ?candidate, - "Started a new assignment gathering stage", - ); - record.stage_start = Some(Instant::now()); - } - - // Make sure we always cleanup if we have too many records. - if self.per_block_assignments_gathering_times.len() > - MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS as usize && - block_number >= MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS + .get_or_insert(block_number, HashMap::new) + .and_then(|records| Some(records.entry((block_hash, candidate)).or_default())) { - self.cleanup_assignments_gathering_timestamp( - block_number - MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS, - ) + if record.stage_start.is_none() { + record.stage += 1; + gum::debug!( + target: LOG_TARGET, + stage = ?record.stage, + ?block_hash, + ?candidate, + "Started a new assignment gathering stage", + ); + record.stage_start = Some(Instant::now()); + } } } @@ -987,7 +978,7 @@ impl State { ) -> AssignmentGatheringRecord { let record = self .per_block_assignments_gathering_times - .get_mut(&block_number) + .get(&block_number) .and_then(|entry| entry.get_mut(&(block_hash, candidate))); let stage = record.as_ref().map(|record| record.stage).unwrap_or_default(); AssignmentGatheringRecord { @@ -997,8 +988,14 @@ impl State { } fn cleanup_assignments_gathering_timestamp(&mut self, keep_greater_than: BlockNumber) { - self.per_block_assignments_gathering_times - .retain(|block_number, _| *block_number > keep_greater_than) + while let Some((block_number, _)) = self.per_block_assignments_gathering_times.peek_oldest() + { + if *block_number < keep_greater_than { + self.per_block_assignments_gathering_times.pop_oldest(); + } else { + break + } + } } fn observe_assignment_gathering_status( @@ -1077,7 +1074,9 @@ where clock: subsystem.clock, assignment_criteria, spans: HashMap::new(), - per_block_assignments_gathering_times: Default::default(), + per_block_assignments_gathering_times: LruMap::new(ByLength::new( + MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS, + )), }; // `None` on start-up. Gets initialized/updated on leaf update diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index f522def5be15..46908c3b7c00 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -5063,7 +5063,9 @@ fn test_gathering_assignments_statements() { clock: Box::new(MockClock::default()), assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|_| Ok(0))), spans: HashMap::new(), - per_block_assignments_gathering_times: Default::default(), + per_block_assignments_gathering_times: LruMap::new(ByLength::new( + MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS, + )), }; for i in 0..200i32 { @@ -5078,7 +5080,11 @@ fn test_gathering_assignments_statements() { ); assert_eq!( - state.per_block_assignments_gathering_times.keys().min(), + state + .per_block_assignments_gathering_times + .iter() + .map(|(block_number, _)| block_number) + .min(), Some(max(0, i - MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS as i32 + 1) as u32).as_ref() ) } @@ -5087,7 +5093,12 @@ fn test_gathering_assignments_statements() { MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS as usize ); - let nothing_changes = state.per_block_assignments_gathering_times.clone(); + let nothing_changes = state + .per_block_assignments_gathering_times + .iter() + .map(|(block_number, _)| *block_number) + .sorted() + .collect::>(); for i in 150..200i32 { state.mark_begining_of_gathering_assignments( @@ -5095,7 +5106,15 @@ fn test_gathering_assignments_statements() { Hash::repeat_byte(i as u8), CandidateHash(Hash::repeat_byte(i as u8)), ); - assert_eq!(nothing_changes, state.per_block_assignments_gathering_times); + assert_eq!( + nothing_changes, + state + .per_block_assignments_gathering_times + .iter() + .map(|(block_number, _)| *block_number) + .sorted() + .collect::>() + ); } for i in 110..120 { @@ -5123,6 +5142,9 @@ fn test_gathering_assignments_statements() { assert!(record.stage_start.is_some()); assert_eq!(record.stage, 1); } + + state.cleanup_assignments_gathering_timestamp(200); + assert_eq!(state.per_block_assignments_gathering_times.len(), 0); } // Test we note the time we took to transition RequiredTranche from Pending to Exact and @@ -5135,7 +5157,9 @@ fn test_observe_assignment_gathering_status() { clock: Box::new(MockClock::default()), assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|_| Ok(0))), spans: HashMap::new(), - per_block_assignments_gathering_times: Default::default(), + per_block_assignments_gathering_times: LruMap::new(ByLength::new( + MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS, + )), }; let metrics_inner = MetricsInner { From 3bcce008e70ccac28e725a37e943cec32aab0131 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Mon, 27 May 2024 16:36:49 +0300 Subject: [PATCH 6/9] Review feedback Signed-off-by: Alexandru Gheorghe --- polkadot/node/core/approval-voting/src/lib.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 8906d01b4c9f..0d709445521f 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -189,6 +189,12 @@ struct MetricsInner { candidate_signatures_requests_total: prometheus::Counter, unapproved_candidates_in_unfinalized_chain: prometheus::Gauge, // The time it takes in each stage to gather enough assignments. + // We defined a `stage` as being the entire process of gathering enough assignments to + // be able to approve a candidate: + // E.g: + // - Stage 0: We wait for the needed_approvals assignments to be gathered. + // - Stage 1: We wait for enough tranches to cover all no-shows in stage 0. + // - Stage 2: We wait for enough tranches to cover all no-shows of stage 1. assignments_gathering_time_by_stage: prometheus::HistogramVec, } @@ -314,6 +320,10 @@ impl Metrics { pub fn observe_assignment_gathering_time(&self, stage: usize, elapsed_as_millis: usize) { if let Some(metrics) = &self.0 { let stage_string = stage.to_string(); + // We don't want to have too many metrics entries with this label to not put unncessary + // pressure on the metrics infrastructure, so we cap the stage at 10, which is + // equivalent to having already a finalization lag to 10 * no_show_slots, so it should + // be more than enough. metrics .assignments_gathering_time_by_stage .with_label_values(&[if stage < 10 { stage_string.as_str() } else { "inf" }]) @@ -453,7 +463,7 @@ impl metrics::Metrics for Metrics { prometheus::HistogramVec::new( prometheus::HistogramOpts::new( "polkadot_parachain_assignments_gather_time_by_stage_ms", - "The time it takes in ms for each stage to gather enough assignments needed for approval", + "The time in ms it takes for each stage to gather enough assignments needed for approval", ) .buckets(vec![0.0, 250.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0]), &["stage"], From 23d61695afc66c821e086f4b4e04c6e782d83372 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com> Date: Mon, 27 May 2024 17:01:44 +0300 Subject: [PATCH 7/9] Update polkadot/node/core/approval-voting/src/lib.rs Co-authored-by: ordian --- polkadot/node/core/approval-voting/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 0d709445521f..5573592bf22c 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -65,7 +65,7 @@ use sp_consensus::SyncOracle; use sp_consensus_slots::Slot; use std::time::Instant; -// The maximum block we keep track of assignments gathering times, on normal operation +// The max number of blocks we keep track of assignments gathering times. Normally, // this would never be reached because we prune the data on finalization, but we need // to also ensure the data is not growing unecessarily large. const MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS: u32 = 100; From 495939f20590c90c64f95e23da1131227aa0882d Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Mon, 27 May 2024 17:01:21 +0300 Subject: [PATCH 8/9] Remove lower than Signed-off-by: Alexandru Gheorghe --- polkadot/node/core/approval-voting/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 5573592bf22c..f9ea87d3956f 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -997,10 +997,10 @@ impl State { } } - fn cleanup_assignments_gathering_timestamp(&mut self, keep_greater_than: BlockNumber) { + fn cleanup_assignments_gathering_timestamp(&mut self, remove_lower_than: BlockNumber) { while let Some((block_number, _)) = self.per_block_assignments_gathering_times.peek_oldest() { - if *block_number < keep_greater_than { + if *block_number < remove_lower_than { self.per_block_assignments_gathering_times.pop_oldest(); } else { break From 33115d549648bf0001916bdff119e7a69c023d6d Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Tue, 28 May 2024 08:11:39 +0300 Subject: [PATCH 9/9] Add trace log Signed-off-by: Alexandru Gheorghe --- polkadot/node/core/approval-voting/src/lib.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index f9ea87d3956f..d25f538dae1d 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -1028,6 +1028,14 @@ impl State { let time_to_gather = self.mark_gathered_enough_assignments(block_number, block_hash, candidate_hash); if let Some(gathering_started) = time_to_gather.stage_start { + if gathering_started.elapsed().as_millis() > 6000 { + gum::trace!( + target: LOG_TARGET, + ?block_hash, + ?candidate_hash, + "Long assignment gathering time", + ); + } metrics.observe_assignment_gathering_time( time_to_gather.stage, gathering_started.elapsed().as_millis() as usize,