From d31285a1562318959a7b21dbfec95c2fd6f06d7a Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Fri, 12 Jul 2024 12:43:48 +0200 Subject: [PATCH] [statement-distribution] Add metrics for distributed statements in V2 (#4554) Part of https://github.com/paritytech/polkadot-sdk/issues/4334 --- .../network/statement-distribution/src/lib.rs | 30 +++++- .../statement-distribution/src/metrics.rs | 9 +- .../statement-distribution/src/v2/mod.rs | 102 +++++++++++++----- 3 files changed, 109 insertions(+), 32 deletions(-) diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 4d56c795f13b..33431eb1edce 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -284,7 +284,14 @@ impl StatementDistributionSubsystem { ); }, MuxedMessage::Response(result) => { - v2::handle_response(&mut ctx, &mut state, result, &mut self.reputation).await; + v2::handle_response( + &mut ctx, + &mut state, + result, + &mut self.reputation, + &self.metrics, + ) + .await; }, MuxedMessage::RetryRequest(()) => { // A pending request is ready to retry. This is only a signal to call @@ -320,7 +327,8 @@ impl StatementDistributionSubsystem { let mode = prospective_parachains_mode(ctx.sender(), activated.hash).await?; if let ProspectiveParachainsMode::Enabled { .. } = mode { let res = - v2::handle_active_leaves_update(ctx, state, activated, mode).await; + v2::handle_active_leaves_update(ctx, state, activated, mode, &metrics) + .await; // Regardless of the result of leaf activation, we always prune before // handling it to avoid leaks. v2::handle_deactivate_leaves(state, &deactivated); @@ -370,6 +378,7 @@ impl StatementDistributionSubsystem { relay_parent, statement, &mut self.reputation, + &self.metrics, ) .await?; } @@ -428,11 +437,24 @@ impl StatementDistributionSubsystem { if target.targets_current() { // pass to v2. - v2::handle_network_update(ctx, state, event, &mut self.reputation).await; + v2::handle_network_update( + ctx, + state, + event, + &mut self.reputation, + &self.metrics, + ) + .await; } }, StatementDistributionMessage::Backed(candidate_hash) => { - crate::v2::handle_backed_candidate_message(ctx, state, candidate_hash).await; + crate::v2::handle_backed_candidate_message( + ctx, + state, + candidate_hash, + &self.metrics, + ) + .await; }, }, } diff --git a/polkadot/node/network/statement-distribution/src/metrics.rs b/polkadot/node/network/statement-distribution/src/metrics.rs index 1bc994174263..e21fff1e6421 100644 --- a/polkadot/node/network/statement-distribution/src/metrics.rs +++ b/polkadot/node/network/statement-distribution/src/metrics.rs @@ -25,13 +25,13 @@ const HISTOGRAM_LATENCY_BUCKETS: &[f64] = &[ #[derive(Clone)] struct MetricsInner { // V1 - statements_distributed: prometheus::Counter, sent_requests: prometheus::Counter, received_responses: prometheus::CounterVec, network_bridge_update: prometheus::HistogramVec, statements_unexpected: prometheus::CounterVec, created_message_size: prometheus::Gauge, // V1+ + statements_distributed: prometheus::Counter, active_leaves_update: prometheus::Histogram, share: prometheus::Histogram, // V2+ @@ -51,6 +51,13 @@ impl Metrics { } } + /// Update statements distributed counter by an amount + pub fn on_statements_distributed(&self, n: usize) { + if let Some(metrics) = &self.0 { + metrics.statements_distributed.inc_by(n as u64); + } + } + /// Update sent requests counter /// This counter is updated merely for the statements sent via request/response method, /// meaning that it counts large statements only diff --git a/polkadot/node/network/statement-distribution/src/v2/mod.rs b/polkadot/node/network/statement-distribution/src/v2/mod.rs index 2bb9c82c6a6f..47d350849b20 100644 --- a/polkadot/node/network/statement-distribution/src/v2/mod.rs +++ b/polkadot/node/network/statement-distribution/src/v2/mod.rs @@ -400,6 +400,7 @@ pub(crate) async fn handle_network_update( state: &mut State, update: NetworkBridgeEvent, reputation: &mut ReputationAggregator, + metrics: &Metrics, ) { match update { NetworkBridgeEvent::PeerConnected(peer_id, role, protocol_version, mut authority_ids) => { @@ -483,23 +484,33 @@ pub(crate) async fn handle_network_update( net_protocol::StatementDistributionMessage::V3( protocol_v3::StatementDistributionMessage::Statement(relay_parent, statement), ) => - handle_incoming_statement(ctx, state, peer_id, relay_parent, statement, reputation) - .await, + handle_incoming_statement( + ctx, + state, + peer_id, + relay_parent, + statement, + reputation, + metrics, + ) + .await, net_protocol::StatementDistributionMessage::V2( protocol_v2::StatementDistributionMessage::BackedCandidateManifest(inner), ) | net_protocol::StatementDistributionMessage::V3( protocol_v3::StatementDistributionMessage::BackedCandidateManifest(inner), - ) => handle_incoming_manifest(ctx, state, peer_id, inner, reputation).await, + ) => handle_incoming_manifest(ctx, state, peer_id, inner, reputation, metrics).await, net_protocol::StatementDistributionMessage::V2( protocol_v2::StatementDistributionMessage::BackedCandidateKnown(inner), ) | net_protocol::StatementDistributionMessage::V3( protocol_v3::StatementDistributionMessage::BackedCandidateKnown(inner), - ) => handle_incoming_acknowledgement(ctx, state, peer_id, inner, reputation).await, + ) => + handle_incoming_acknowledgement(ctx, state, peer_id, inner, reputation, metrics) + .await, }, NetworkBridgeEvent::PeerViewChange(peer_id, view) => - handle_peer_view_update(ctx, state, peer_id, view).await, + handle_peer_view_update(ctx, state, peer_id, view, metrics).await, NetworkBridgeEvent::OurViewChange(_view) => { // handled by `handle_activated_leaf` }, @@ -539,6 +550,7 @@ pub(crate) async fn handle_active_leaves_update( state: &mut State, activated: &ActivatedLeaf, leaf_mode: ProspectiveParachainsMode, + metrics: &Metrics, ) -> JfyiErrorResult<()> { let max_candidate_depth = match leaf_mode { ProspectiveParachainsMode::Disabled => return Ok(()), @@ -714,7 +726,8 @@ pub(crate) async fn handle_active_leaves_update( for (peer, fresh) in update_peers { for fresh_relay_parent in fresh { - send_peer_messages_for_relay_parent(ctx, state, peer, fresh_relay_parent).await; + send_peer_messages_for_relay_parent(ctx, state, peer, fresh_relay_parent, metrics) + .await; } } } @@ -815,6 +828,7 @@ async fn handle_peer_view_update( state: &mut State, peer: PeerId, new_view: View, + metrics: &Metrics, ) { let fresh_implicit = { let peer_data = match state.peers.get_mut(&peer) { @@ -826,7 +840,7 @@ async fn handle_peer_view_update( }; for new_relay_parent in fresh_implicit { - send_peer_messages_for_relay_parent(ctx, state, peer, new_relay_parent).await; + send_peer_messages_for_relay_parent(ctx, state, peer, new_relay_parent, metrics).await; } } @@ -857,6 +871,7 @@ async fn send_peer_messages_for_relay_parent( state: &mut State, peer: PeerId, relay_parent: Hash, + metrics: &Metrics, ) { let peer_data = match state.peers.get_mut(&peer) { None => return, @@ -889,6 +904,7 @@ async fn send_peer_messages_for_relay_parent( &mut active.cluster_tracker, &state.candidates, &relay_parent_state.statement_store, + metrics, ) .await; } @@ -901,6 +917,7 @@ async fn send_peer_messages_for_relay_parent( &per_session_state.groups, relay_parent_state, &state.candidates, + metrics, ) .await; } @@ -949,6 +966,7 @@ async fn send_pending_cluster_statements( cluster_tracker: &mut ClusterTracker, candidates: &Candidates, statement_store: &StatementStore, + metrics: &Metrics, ) { let pending_statements = cluster_tracker.pending_statements_for(peer_validator_id); let network_messages = pending_statements @@ -974,12 +992,12 @@ async fn send_pending_cluster_statements( }) .collect::>(); - if network_messages.is_empty() { - return + if !network_messages.is_empty() { + let count = network_messages.len(); + ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(network_messages)) + .await; + metrics.on_statements_distributed(count); } - - ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(network_messages)) - .await; } /// Send a peer all pending grid messages / acknowledgements / follow up statements @@ -993,6 +1011,7 @@ async fn send_pending_grid_messages( groups: &Groups, relay_parent_state: &mut PerRelayParentState, candidates: &Candidates, + metrics: &Metrics, ) { let pending_manifests = { let local_validator = match relay_parent_state.local_validator.as_mut() { @@ -1005,6 +1024,7 @@ async fn send_pending_grid_messages( }; let mut messages: Vec<(Vec, net_protocol::VersionedValidationProtocol)> = Vec::new(); + let mut statements_count = 0; for (candidate_hash, kind) in pending_manifests { let confirmed_candidate = match candidates.get_confirmed(&candidate_hash) { None => continue, // sanity @@ -1079,7 +1099,7 @@ async fn send_pending_grid_messages( }; }, grid::ManifestKind::Acknowledgement => { - messages.extend(acknowledgement_and_statement_messages( + let (m, c) = acknowledgement_and_statement_messages( peer_id, peer_validator_id, groups, @@ -1088,7 +1108,9 @@ async fn send_pending_grid_messages( group_index, candidate_hash, local_knowledge, - )); + ); + messages.extend(m); + statements_count += c; }, } } @@ -1107,8 +1129,9 @@ async fn send_pending_grid_messages( let pending_statements = grid_tracker.all_pending_statements_for(peer_validator_id); - let extra_statements = - pending_statements.into_iter().filter_map(|(originator, compact)| { + let extra_statements = pending_statements + .into_iter() + .filter_map(|(originator, compact)| { let res = pending_statement_network_message( &relay_parent_state.statement_store, relay_parent, @@ -1128,15 +1151,17 @@ async fn send_pending_grid_messages( } res - }); + }) + .collect::>(); + statements_count += extra_statements.len(); messages.extend(extra_statements); } - if messages.is_empty() { - return + if !messages.is_empty() { + ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(messages)).await; + metrics.on_statements_distributed(statements_count); } - ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(messages)).await; } // Imports a locally originating statement and distributes it to peers. @@ -1147,6 +1172,7 @@ pub(crate) async fn share_local_statement( relay_parent: Hash, statement: SignedFullStatementWithPVD, reputation: &mut ReputationAggregator, + metrics: &Metrics, ) -> JfyiErrorResult<()> { let per_relay_parent = match state.per_relay_parent.get_mut(&relay_parent) { None => return Err(JfyiError::InvalidShare), @@ -1269,11 +1295,12 @@ pub(crate) async fn share_local_statement( &state.authorities, &state.peers, compact_statement, + metrics, ) .await; if let Some(post_confirmation) = post_confirmation { - apply_post_confirmation(ctx, state, post_confirmation, reputation).await; + apply_post_confirmation(ctx, state, post_confirmation, reputation, metrics).await; } Ok(()) @@ -1310,6 +1337,7 @@ async fn circulate_statement( authorities: &HashMap, peers: &HashMap, statement: SignedStatement, + metrics: &Metrics, ) { let session_info = &per_session.session_info; @@ -1446,6 +1474,7 @@ async fn circulate_statement( .into(), )) .await; + metrics.on_statement_distributed(); } if !statement_to_v3_peers.is_empty() { @@ -1465,6 +1494,7 @@ async fn circulate_statement( .into(), )) .await; + metrics.on_statement_distributed(); } } /// Check a statement signature under this parent hash. @@ -1511,6 +1541,7 @@ async fn handle_incoming_statement( relay_parent: Hash, statement: UncheckedSignedStatement, reputation: &mut ReputationAggregator, + metrics: &Metrics, ) { let peer_state = match state.peers.get(&peer) { None => { @@ -1787,6 +1818,7 @@ async fn handle_incoming_statement( &state.authorities, &state.peers, checked_statement, + metrics, ) .await; } else { @@ -1944,6 +1976,7 @@ async fn provide_candidate_to_grid( per_session: &PerSessionState, authorities: &HashMap, peers: &HashMap, + metrics: &Metrics, ) { let local_validator = match relay_parent_state.local_validator { Some(ref mut v) => v, @@ -2131,8 +2164,10 @@ async fn provide_candidate_to_grid( .await; } if !post_statements.is_empty() { + let count = post_statements.len(); ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(post_statements)) .await; + metrics.on_statements_distributed(count); } } @@ -2532,6 +2567,7 @@ async fn handle_incoming_manifest( peer: PeerId, manifest: net_protocol::v2::BackedCandidateManifest, reputation: &mut ReputationAggregator, + metrics: &Metrics, ) { gum::debug!( target: LOG_TARGET, @@ -2588,7 +2624,7 @@ async fn handle_incoming_manifest( ) }; - let messages = acknowledgement_and_statement_messages( + let (messages, statements_count) = acknowledgement_and_statement_messages( &( peer, state @@ -2609,6 +2645,7 @@ async fn handle_incoming_manifest( if !messages.is_empty() { ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages(messages)).await; + metrics.on_statements_distributed(statements_count); } } else if !state.candidates.is_confirmed(&manifest.candidate_hash) { // 5. if unconfirmed, add request entry @@ -2636,9 +2673,9 @@ fn acknowledgement_and_statement_messages( group_index: GroupIndex, candidate_hash: CandidateHash, local_knowledge: StatementFilter, -) -> Vec<(Vec, net_protocol::VersionedValidationProtocol)> { +) -> (Vec<(Vec, net_protocol::VersionedValidationProtocol)>, usize) { let local_validator = match relay_parent_state.local_validator.as_mut() { - None => return Vec::new(), + None => return (Vec::new(), 0), Some(l) => l, }; @@ -2666,7 +2703,7 @@ fn acknowledgement_and_statement_messages( "Bug ValidationVersion::V1 should not be used in statement-distribution v2, legacy should have handled this" ); - return Vec::new() + return (Vec::new(), 0) }, }; @@ -2687,10 +2724,11 @@ fn acknowledgement_and_statement_messages( candidate_hash, peer, ); + let statements_count = statement_messages.len(); messages.extend(statement_messages.into_iter().map(|m| (vec![peer.0], m))); - messages + (messages, statements_count) } #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] @@ -2700,6 +2738,7 @@ async fn handle_incoming_acknowledgement( peer: PeerId, acknowledgement: net_protocol::v2::BackedCandidateAcknowledgement, reputation: &mut ReputationAggregator, + metrics: &Metrics, ) { // The key difference between acknowledgments and full manifests is that only // the candidate hash is included alongside the bitfields, so the candidate @@ -2780,10 +2819,12 @@ async fn handle_incoming_acknowledgement( ); if !messages.is_empty() { + let count = messages.len(); ctx.send_message(NetworkBridgeTxMessage::SendValidationMessages( messages.into_iter().map(|m| (vec![peer], m)).collect(), )) .await; + metrics.on_statements_distributed(count); } } @@ -2793,6 +2834,7 @@ pub(crate) async fn handle_backed_candidate_message( ctx: &mut Context, state: &mut State, candidate_hash: CandidateHash, + metrics: &Metrics, ) { // If the candidate is unknown or unconfirmed, it's a race (pruned before receiving message) // or a bug. Ignore if so @@ -2834,6 +2876,7 @@ pub(crate) async fn handle_backed_candidate_message( per_session, &state.authorities, &state.peers, + metrics, ) .await; @@ -2855,6 +2898,7 @@ async fn send_cluster_candidate_statements( state: &mut State, candidate_hash: CandidateHash, relay_parent: Hash, + metrics: &Metrics, ) { let relay_parent_state = match state.per_relay_parent.get_mut(&relay_parent) { None => return, @@ -2897,6 +2941,7 @@ async fn send_cluster_candidate_statements( &state.authorities, &state.peers, statement, + metrics, ) .await; } @@ -2914,6 +2959,7 @@ async fn apply_post_confirmation( state: &mut State, post_confirmation: PostConfirmation, reputation: &mut ReputationAggregator, + metrics: &Metrics, ) { for peer in post_confirmation.reckoning.incorrect { modify_reputation(reputation, ctx.sender(), peer, COST_INACCURATE_ADVERTISEMENT).await; @@ -2927,6 +2973,7 @@ async fn apply_post_confirmation( state, candidate_hash, post_confirmation.hypothetical.relay_parent(), + metrics, ) .await; new_confirmed_candidate_fragment_chain_updates(ctx, state, post_confirmation.hypothetical) @@ -3052,6 +3099,7 @@ pub(crate) async fn handle_response( state: &mut State, response: UnhandledResponse, reputation: &mut ReputationAggregator, + metrics: &Metrics, ) { let &requests::CandidateIdentifier { relay_parent, candidate_hash, group_index } = response.candidate_identifier(); @@ -3151,7 +3199,7 @@ pub(crate) async fn handle_response( }; // Note that this implicitly circulates all statements via the cluster. - apply_post_confirmation(ctx, state, post_confirmation, reputation).await; + apply_post_confirmation(ctx, state, post_confirmation, reputation, metrics).await; let confirmed = state.candidates.get_confirmed(&candidate_hash).expect("just confirmed; qed");