Skip to content

Commit

Permalink
AverageIntCounter for backpressure states (#6963)
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-aptos authored Mar 8, 2023
1 parent 4831cf6 commit b65b876
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 23 deletions.
23 changes: 15 additions & 8 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
use aptos_metrics_core::{
exponential_buckets, op_counters::DurationHistogram, register_counter, register_gauge,
register_gauge_vec, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, Counter, Gauge, GaugeVec,
Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, AverageIntCounter,
Counter, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge,
IntGaugeVec,
};
use once_cell::sync::Lazy;

Expand Down Expand Up @@ -185,14 +186,20 @@ pub static LEADER_REPUTATION_ROUND_HISTORY_SIZE: Lazy<IntGauge> = Lazy::new(|| {
.unwrap()
});

/// Number of rounds we were collecting votes for proposer
/// (similar to PROPOSALS_COUNT, but can be larger, if we failed in creating/sending of the proposal)
pub static CHAIN_HEALTH_BACKOFF_TRIGGERED: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
/// Counts when chain_health backoff is triggered
pub static CHAIN_HEALTH_BACKOFF_TRIGGERED: Lazy<AverageIntCounter> = Lazy::new(|| {
AverageIntCounter::register(
"aptos_chain_health_backoff_triggered",
"Total voting power of all votes collected for the round this node was proposer",
"Counts when chain_health backoff is triggered",
)
});

/// Counts when waiting for full blocks is triggered
pub static WAIT_FOR_FULL_BLOCKS_TRIGGERED: Lazy<AverageIntCounter> = Lazy::new(|| {
AverageIntCounter::register(
"aptos_wait_for_full_blocks_triggered",
"Counts when waiting for full blocks is triggered",
)
.unwrap()
});

/// How many pending blocks are there, when we make a proposal
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/liveness/proposal_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,15 @@ impl ProposalGenerator {
.max_block_bytes
.min(value.max_sending_block_bytes_override);

CHAIN_HEALTH_BACKOFF_TRIGGERED.inc();
CHAIN_HEALTH_BACKOFF_TRIGGERED.observe(1);
warn!(
"Generating proposal reducing limits to {} txns and {} bytes, due to chain health backoff",
max_block_txns,
max_block_bytes,
);
(max_block_txns, max_block_bytes)
} else {
CHAIN_HEALTH_BACKOFF_TRIGGERED.observe(0);
(self.max_block_txns, self.max_block_bytes)
};

Expand Down
7 changes: 6 additions & 1 deletion consensus/src/payload_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{error::QuorumStoreError, monitor, state_replication::PayloadClient};
use crate::{
counters::WAIT_FOR_FULL_BLOCKS_TRIGGERED, error::QuorumStoreError, monitor,
state_replication::PayloadClient,
};
use anyhow::Result;
use aptos_consensus_types::{
common::{Payload, PayloadFilter, Round},
Expand Down Expand Up @@ -105,6 +108,8 @@ impl PayloadClient for QuorumStoreClient {
&& pending_uncommitted_blocks < self.wait_for_full_blocks_above_pending_blocks;
let return_empty = pending_ordering && return_non_full;

WAIT_FOR_FULL_BLOCKS_TRIGGERED.observe(u64::from(!return_non_full));

fail_point!("consensus::pull_payload", |_| {
Err(anyhow::anyhow!("Injected error in pull_payload").into())
});
Expand Down
8 changes: 4 additions & 4 deletions consensus/src/quorum_store/batch_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ impl BatchGenerator {
);
debug!("QS: dynamic_max_pull_count: {}", dynamic_max_pull_count);
}
counters::QS_BACKPRESSURE.set(1);
counters::QS_BACKPRESSURE_DYNAMIC_MAX.set(dynamic_max_pull_count as i64);
counters::QS_BACKPRESSURE.observe(1);
counters::QS_BACKPRESSURE_DYNAMIC_MAX.observe(dynamic_max_pull_count);
} else {
// additive increase, every second
if back_pressure_increase_latest.elapsed() >= back_pressure_increase_duration {
Expand All @@ -288,8 +288,8 @@ impl BatchGenerator {
);
debug!("QS: dynamic_max_pull_count: {}", dynamic_max_pull_count);
}
counters::QS_BACKPRESSURE.set(0);
counters::QS_BACKPRESSURE_DYNAMIC_MAX.set(dynamic_max_pull_count as i64);
counters::QS_BACKPRESSURE.observe(0);
counters::QS_BACKPRESSURE_DYNAMIC_MAX.observe(dynamic_max_pull_count);
}
if let Some(proof_rx) = self.handle_scheduled_pull(dynamic_max_pull_count).await {
proofs_in_progress.push(Box::pin(proof_rx));
Expand Down
16 changes: 7 additions & 9 deletions consensus/src/quorum_store/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

use aptos_metrics_core::{
exponential_buckets, op_counters::DurationHistogram, register_histogram,
register_histogram_vec, register_int_counter, register_int_gauge, Histogram, HistogramVec,
IntCounter, IntGauge,
register_histogram_vec, register_int_counter, AverageIntCounter, Histogram, HistogramVec,
IntCounter,
};
use once_cell::sync::Lazy;
use std::time::Duration;
Expand Down Expand Up @@ -400,20 +400,18 @@ pub static RECEIVED_BATCH_COUNT: Lazy<IntCounter> = Lazy::new(|| {
.unwrap()
});

pub static QS_BACKPRESSURE: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
pub static QS_BACKPRESSURE: Lazy<AverageIntCounter> = Lazy::new(|| {
AverageIntCounter::register(
"quorum_store_backpressure",
"Indicator of whether Quorum Store is backpressured. QS should be backpressured when (1) number of batches exceeds the threshold, or (2) consensus is backpressured."
)
.unwrap()
});

pub static QS_BACKPRESSURE_DYNAMIC_MAX: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
pub static QS_BACKPRESSURE_DYNAMIC_MAX: Lazy<AverageIntCounter> = Lazy::new(|| {
AverageIntCounter::register(
"quorum_store_backpressure_dynamic_max",
"What the dynamic max is set to"
"What the dynamic max is set to",
)
.unwrap()
});

/// Latencies
Expand Down
61 changes: 61 additions & 0 deletions crates/aptos-metrics-core/src/avg_counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright © Aptos Foundation

use prometheus::{register_counter, register_int_counter, Counter, IntCounter};

pub struct AverageCounter {
sum: Counter,
count: IntCounter,
}

impl AverageCounter {
pub fn register(name: &str, desc: &str) -> AverageCounter {
AverageCounter {
sum: register_counter!(
format!("{}_sum", name),
format!("{}. Sum part of the counter", desc),
)
.unwrap(),
count: register_int_counter!(
format!("{}_count", name),
format!("{}. Count part of the counter", desc),
)
.unwrap(),
}
}

pub fn observe(&self, value: f64) {
if value != 0.0 {
self.sum.inc_by(value);
}
self.count.inc();
}
}

pub struct AverageIntCounter {
sum: IntCounter,
count: IntCounter,
}

impl AverageIntCounter {
pub fn register(name: &str, desc: &str) -> AverageIntCounter {
AverageIntCounter {
sum: register_int_counter!(
format!("{}_sum", name),
format!("{}. Sum part of the counter", desc),
)
.unwrap(),
count: register_int_counter!(
format!("{}_count", name),
format!("{}. Count part of the counter", desc),
)
.unwrap(),
}
}

pub fn observe(&self, value: u64) {
if value != 0 {
self.sum.inc_by(value);
}
self.count.inc();
}
}
2 changes: 2 additions & 0 deletions crates/aptos-metrics-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ pub use prometheus::{
IntGaugeVec, TextEncoder,
};

mod avg_counter;
pub use avg_counter::{AverageCounter, AverageIntCounter};
pub mod const_metric;
pub mod op_counters;

0 comments on commit b65b876

Please sign in to comment.