Skip to content

Commit

Permalink
jwk ob counters (#12048)
Browse files Browse the repository at this point in the history
  • Loading branch information
zjma authored Feb 15, 2024
1 parent 04d078f commit 6390aae
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 3 deletions.
11 changes: 10 additions & 1 deletion crates/aptos-jwk-consensus/src/counters.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright © Aptos Foundation

use aptos_metrics_core::{register_int_gauge, IntGauge};
use aptos_metrics_core::{register_histogram_vec, register_int_gauge, HistogramVec, IntGauge};
use once_cell::sync::Lazy;

/// Count of the pending messages sent to itself in the channel
Expand All @@ -11,3 +11,12 @@ pub static PENDING_SELF_MESSAGES: Lazy<IntGauge> = Lazy::new(|| {
)
.unwrap()
});

pub static OBSERVATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"aptos_jwk_observation_seconds",
"JWK observation seconds by issuer and result.",
&["issuer", "result"]
)
.unwrap()
});
12 changes: 10 additions & 2 deletions crates/aptos-jwk-consensus/src/jwk_observer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
// Copyright © Aptos Foundation

use crate::counters::OBSERVATION_SECONDS;
use anyhow::Result;
use aptos_channels::aptos_channel;
use aptos_logger::{debug, info};
use aptos_types::jwks::{jwk::JWK, Issuer};
use futures::{FutureExt, StreamExt};
use move_core_types::account_address::AccountAddress;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::{sync::oneshot, task::JoinHandle, time::MissedTickBehavior};

#[derive(Serialize, Deserialize)]
Expand Down Expand Up @@ -93,17 +94,24 @@ impl JWKObserver {
observation_tx: aptos_channel::Sender<(), (Issuer, Vec<JWK>)>,
close_rx: oneshot::Receiver<()>,
) {
let issuer_str =
String::from_utf8(issuer.clone()).unwrap_or_else(|_e| "UNKNOWN_ISSUER".to_string());
let mut interval = tokio::time::interval(fetch_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
let mut close_rx = close_rx.into_stream();
loop {
tokio::select! {
_ = interval.tick().fuse() => {
let timer = Instant::now();
let result = fetch_jwks(my_addr, open_id_config_url.clone()).await;
debug!("observe_result={:?}", result);
let secs = timer.elapsed().as_secs_f64();
debug!(issuer = issuer_str, "observe_result={:?}", result);
if let Ok(mut jwks) = result {
OBSERVATION_SECONDS.with_label_values(&[&issuer_str, "ok"]).observe(secs);
jwks.sort();
let _ = observation_tx.push((), (issuer.clone(), jwks));
} else {
OBSERVATION_SECONDS.with_label_values(&[&issuer_str, "err"]).observe(secs);
}
},
_ = close_rx.select_next_some() => {
Expand Down

0 comments on commit 6390aae

Please sign in to comment.