Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand metrics for errors and warnings #218

Closed
wants to merge 16 commits into from
Closed
4 changes: 4 additions & 0 deletions src/handler/active_requests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::metrics::{NO_MATCHING_NONCE, NO_MATCHING_REQ_CALL};

use super::*;
use delay_map::HashMapDelay;
use more_asserts::debug_unreachable;
Expand Down Expand Up @@ -39,6 +41,7 @@ impl ActiveRequests {
None => {
debug_unreachable!("A matching request call doesn't exist");
error!("A matching request call doesn't exist");
let _ = &METRICS.error(NO_MATCHING_REQ_CALL);
None
}
},
Expand All @@ -58,6 +61,7 @@ impl ActiveRequests {
None => {
debug_unreachable!("A matching nonce mapping doesn't exist");
error!("A matching nonce mapping doesn't exist");
let _ = &METRICS.error(NO_MATCHING_NONCE);
None
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
config::Config,
discv5::PERMIT_BAN_LIST,
error::{Error, RequestError},
metrics::SESS_GENERATE_FAIL,
packet::{ChallengeData, IdNonce, MessageNonce, Packet, PacketKind, ProtocolIdentity},
rpc::{Message, Request, RequestBody, RequestId, Response, ResponseBody},
socket,
Expand Down Expand Up @@ -471,7 +472,7 @@ impl Handler {
trace!("Request queued for node: {}", node_address);
self.pending_requests
.entry(node_address)
.or_insert_with(Vec::new)
.or_default()
.push(PendingRequest {
contact,
request_id,
Expand Down Expand Up @@ -668,6 +669,7 @@ impl Handler {
Ok(v) => v,
Err(e) => {
error!("Could not generate a session. Error: {:?}", e);
let _ = &METRICS.error(SESS_GENERATE_FAIL);
self.fail_request(request_call, RequestError::InvalidRemotePacket, true)
.await;
return;
Expand Down
3 changes: 3 additions & 0 deletions src/kbucket/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

#![allow(dead_code)]

use crate::metrics::{KBUCKET_NOT_FULL, METRICS};

use super::*;
use tracing::{debug, error};

Expand Down Expand Up @@ -354,6 +356,7 @@ where
}
InsertResult::Full => unreachable!("Bucket cannot be full"),
InsertResult::Pending { .. } | InsertResult::NodeExists => {
let _ = &METRICS.error(KBUCKET_NOT_FULL);
error!("Bucket is not full or double node")
}
InsertResult::FailedFilter => debug!("Pending node failed filter"),
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
//! });
//! ```

mod config;
pub mod config;
mod discv5;
mod error;
mod executor;
Expand Down
85 changes: 84 additions & 1 deletion src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{
collections::HashMap,
sync::atomic::{AtomicUsize, Ordering},
};

use crate::service;

lazy_static! {
pub static ref METRICS: InternalMetrics = InternalMetrics::default();
}

/// Represents the severity of a failure within Discv5
pub enum FailureSeverity {
Critical,
Error,
Warning,
}

/// A collection of metrics used throughout the server.
pub struct InternalMetrics {
/// The number of active UDP sessions that are currently established.
Expand All @@ -16,6 +28,12 @@ pub struct InternalMetrics {
pub bytes_sent: AtomicUsize,
/// The number of bytes received.
pub bytes_recv: AtomicUsize,
/// Total number of critical failures that have occurred
pub total_criticals: AtomicUsize,
/// Total number of errors that have occurred
pub total_errors: AtomicUsize,
/// Total number of warnings that have occurred
pub total_warnings: AtomicUsize,
}

impl Default for InternalMetrics {
Expand All @@ -26,6 +44,9 @@ impl Default for InternalMetrics {
unsolicited_requests_per_window: AtomicUsize::new(0),
bytes_sent: AtomicUsize::new(0),
bytes_recv: AtomicUsize::new(0),
total_criticals: AtomicUsize::default(),
total_errors: AtomicUsize::default(),
total_warnings: AtomicUsize::default(),
}
}
}
Expand All @@ -42,6 +63,59 @@ impl InternalMetrics {
self.bytes_sent
.store(current_bytes_sent.saturating_add(bytes), Ordering::Relaxed);
}

/// Logs a failure with the discovery service
///
/// # Parameters #
///
/// - `msg`, the severity of the failure
///
/// This (atomically) increments the appropriate internal counter for the failure type
pub fn log_failure(&self, failure: FailureSeverity) {
match failure {
FailureSeverity::Critical => Self::increment_field(&self.total_criticals),
FailureSeverity::Error => Self::increment_field(&self.total_errors),
FailureSeverity::Warning => Self::increment_field(&self.total_warnings),
}
}

pub fn failures(&self, severity: FailureSeverity) -> usize {
match severity {
FailureSeverity::Critical => Self::read_field(&self.total_criticals),
FailureSeverity::Error => Self::read_field(&self.total_errors),
FailureSeverity::Warning => Self::read_field(&self.total_warnings),
}
}

/// Returns the total number of critical failures that have occurred
pub fn criticals(&self) -> usize {
self.failures(FailureSeverity::Critical)
}

/// Returns the total number of errors that have occurred
pub fn errors(&self) -> usize {
self.failures(FailureSeverity::Error)
}

/// Returns the total number of warnings that have occurred
pub fn warnings(&self) -> usize {
self.failures(FailureSeverity::Warning)
}

/// Retrieves the value of the `AtomicUsize` at the end of the provided reference
///
/// Uses the `Relaxed` memory ordering to do so
fn read_field(field: &AtomicUsize) -> usize {
field.load(Ordering::Relaxed)
}

/// Increments the `AtomicUsize` at the end of the provided reference
///
/// Uses the `Relaxed` memory ordering to do so
fn increment_field(field: &AtomicUsize) {
let curr_val = field.load(Ordering::Relaxed);
field.store(curr_val.saturating_add(1), Ordering::Relaxed);
}
}

#[derive(Clone, Debug)]
Expand All @@ -55,6 +129,12 @@ pub struct Metrics {
pub bytes_sent: usize,
/// The number of bytes received.
pub bytes_recv: usize,
/// Total number of critical failures that have occurred
pub total_criticals: usize,
/// Total number of errors that have occurred
pub total_errors: usize,
/// Total number of warnings that have occurred
pub total_warnings: usize,
}

impl From<&METRICS> for Metrics {
Expand All @@ -67,6 +147,9 @@ impl From<&METRICS> for Metrics {
/ internal_metrics.moving_window as f64,
bytes_sent: internal_metrics.bytes_sent.load(Ordering::Relaxed),
bytes_recv: internal_metrics.bytes_recv.load(Ordering::Relaxed),
total_criticals: internal_metrics.criticals(),
total_errors: internal_metrics.errors(),
total_warnings: internal_metrics.warnings(),
}
}
}
Loading