Skip to content

Commit

Permalink
feat(metrics): add CC metrics (#2390)
Browse files Browse the repository at this point in the history
  • Loading branch information
kmd-fl authored Oct 2, 2024
1 parent eda76b3 commit 6cfa3a8
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 4 deletions.
39 changes: 36 additions & 3 deletions crates/chain-listener/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,18 @@ impl ChainListener {
self.set_current_epoch(init_params.current_epoch).await;
self.set_global_nonce(init_params.global_nonce).await;

self.observe(|m| {
m.observe_epoch_settings(
truncate_to_u64(&self.init_timestamp) as i64,
truncate_to_u64(&self.epoch_duration) as i64,
);

m.observe_allowed_proofs_settings(
truncate_to_u64(&self.max_proofs_per_epoch) as i64,
truncate_to_u64(&self.min_proofs_per_epoch) as i64,
)
});

Ok(())
}

Expand Down Expand Up @@ -556,8 +568,12 @@ impl ChainListener {
async fn refresh_compute_units(&mut self) -> eyre::Result<()> {
let mut units = self.chain_connector.get_compute_units().await?;

self.observe(|m| m.observe_cus_total(units.len() as i64));

let in_deal: Vec<_> = units.extract_if(|cu| !cu.deal.is_zero()).collect();

self.observe(|m| m.observe_cus_in_deals(in_deal.len() as i64));

let current_units: Vec<CUID> = units.iter().map(|unit| CUID::new(unit.id.0)).collect();
self.core_distributor
.cleanup_cache(current_units.as_slice());
Expand Down Expand Up @@ -850,19 +866,22 @@ impl ChainListener {
deal_event.deal
);

let cu_ids = deal_event
let cu_ids: Vec<_> = deal_event
.cuIds
.into_iter()
.map(|cu| CUID::new(cu.0))
.collect();

let cu_ids_len = cu_ids.len();
self.active_deals.insert(
deal_event.deal.to_string().into(),
OnChainWorker {
id: deal_event.onchainWorkerId,
cu_ids,
},
);

self.observe(|m| m.observe_cus_in_deals_added(cu_ids_len as i64));

Ok(())
}

Expand Down Expand Up @@ -1082,6 +1101,9 @@ impl ChainListener {
self.active_deals.clear();
self.current_commitment = None;
self.stop_commitment().await?;

self.observe(|m| m.observe_cus_in_deals(0));

Ok(())
}

Expand Down Expand Up @@ -1342,7 +1364,12 @@ impl ChainListener {
})
.await?;

self.active_deals.remove(deal_id);
let removed_deal = self.active_deals.remove(deal_id);

if let Some(removed_deal) = removed_deal {
self.observe(|m| m.observe_cus_in_deals_removed(removed_deal.cu_ids.len() as i64));
}

Ok(())
}

Expand Down Expand Up @@ -1417,6 +1444,8 @@ impl ChainListener {
}

async fn set_current_epoch(&mut self, epoch_number: U256) {
self.observe(|m| m.observe_current_epoch(truncate_to_u64(&epoch_number) as i64));

self.current_epoch = epoch_number;
self.proof_tracker.set_current_epoch(epoch_number).await;
}
Expand Down Expand Up @@ -1485,3 +1514,7 @@ where
.inspect(|m| m.observe_ccp_reply(elapsed.as_millis() as f64));
result
}

fn truncate_to_u64(value: &U256) -> u64 {
value.as_limbs()[0]
}
107 changes: 106 additions & 1 deletion crates/peer-metrics/src/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

use crate::{execution_time_buckets, register};
use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::exemplar::CounterWithExemplar;
Expand All @@ -26,6 +25,8 @@ use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::Histogram;
use prometheus_client::registry::Registry;

use crate::{execution_time_buckets, register};

#[derive(EncodeLabelSet, Hash, Clone, Eq, PartialEq, Debug)]
struct TxLabel {
tx_hash: String,
Expand Down Expand Up @@ -53,14 +54,32 @@ pub struct ChainListenerMetrics {
ccp_proofs_tx_success: Counter,
// how many proofs transaction are failed
ccp_proofs_tx_failed: CounterWithExemplar<TxLabel>,
// max amount of proofs are allowed
ccp_proofs_per_epoch_allowed_max: Gauge,
// min amount of proofs are allowed
ccp_proofs_per_epoch_allowed_min: Gauge,

// How many blocks we have received from the newHead subscription
blocks_seen: Counter,
last_seen_block: Gauge,
// How many block we manage to process while processing the block
blocks_processed: Counter,
// The number of the latest block
last_process_block: Gauge,

// Current commitment status
current_commitment_status: Gauge,
// Current commitment id
current_commitment: Family<CommitmentLabel, Gauge>,

// CUs metrics
cus_total: Gauge,
cus_in_deals: Gauge,

// Epoch Metrics
current_epoch: Gauge,
current_epoch_start_timestamp_sec: Gauge,
current_epoch_duration_sec: Gauge,
}

impl ChainListenerMetrics {
Expand Down Expand Up @@ -157,6 +176,55 @@ impl ChainListenerMetrics {
"Current commitment",
);

let cus_total = register(
sub_registry,
Gauge::default(),
"cus_total",
"Total number of CUs",
);

let cus_in_deals = register(
sub_registry,
Gauge::default(),
"cus_in_deals",
"Total number of CUs in deals",
);

let current_epoch = register(
sub_registry,
Gauge::default(),
"current_epoch",
"Current epoch",
);

let current_epoch_start_timestamp_sec = register(
sub_registry,
Gauge::default(),
"current_epoch_start_timestamp_sec",
"Current epoch start timestamp",
);

let current_epoch_duration_sec = register(
sub_registry,
Gauge::default(),
"current_epoch_duration_sec",
"Current epoch duration",
);

let ccp_proofs_per_epoch_allowed_max = register(
sub_registry,
Gauge::default(),
"ccp_proofs_per_epoch_allowed_max",
"Max amount of proofs are allowed per epoch",
);

let ccp_proofs_per_epoch_allowed_min = register(
sub_registry,
Gauge::default(),
"ccp_proofs_per_epoch_allowed_min",
"Min amount of proofs are allowed per epoch",
);

Self {
ccp_requests_total,
ccp_replies_total,
Expand All @@ -165,12 +233,19 @@ impl ChainListenerMetrics {
ccp_proofs_submit_failed,
ccp_proofs_tx_success,
ccp_proofs_tx_failed,
ccp_proofs_per_epoch_allowed_max,
ccp_proofs_per_epoch_allowed_min,
blocks_seen,
last_seen_block,
blocks_processed,
last_process_block,
cus_total,
cus_in_deals,
current_commitment_status,
current_commitment,
current_epoch,
current_epoch_start_timestamp_sec,
current_epoch_duration_sec,
}
}

Expand Down Expand Up @@ -225,4 +300,34 @@ impl ChainListenerMetrics {
.get_or_create(&CommitmentLabel { commitment_id })
.set(0);
}

pub fn observe_allowed_proofs_settings(&self, max_allowed: i64, min_allowed: i64) {
self.ccp_proofs_per_epoch_allowed_max.set(max_allowed);
self.ccp_proofs_per_epoch_allowed_min.set(min_allowed);
}

pub fn observe_current_epoch(&self, epoch: i64) {
self.current_epoch.set(epoch);
}

pub fn observe_epoch_settings(&self, start_timestamp: i64, duration: i64) {
self.current_epoch_start_timestamp_sec.set(start_timestamp);
self.current_epoch_duration_sec.set(duration);
}

pub fn observe_cus_total(&self, n: i64) {
self.cus_total.set(n);
}

pub fn observe_cus_in_deals(&self, n: i64) {
self.cus_in_deals.set(n);
}

pub fn observe_cus_in_deals_added(&self, n: i64) {
self.cus_in_deals.inc_by(n);
}

pub fn observe_cus_in_deals_removed(&self, n: i64) {
self.cus_in_deals.dec_by(n);
}
}

0 comments on commit 6cfa3a8

Please sign in to comment.