diff --git a/lading/src/observer/linux/cgroup.rs b/lading/src/observer/linux/cgroup.rs index 89eee56d6..e913b162c 100644 --- a/lading/src/observer/linux/cgroup.rs +++ b/lading/src/observer/linux/cgroup.rs @@ -1,11 +1,11 @@ /// Code to read cgroup information. pub(crate) mod v2; -use std::{collections::VecDeque, io}; +use std::{collections::VecDeque, io, path::PathBuf}; use nix::errno::Errno; use procfs::process::Process; -use rustc_hash::FxHashSet; +use rustc_hash::{FxHashMap, FxHashSet}; use tracing::{debug, error, trace}; #[derive(thiserror::Error, Debug)] @@ -26,17 +26,28 @@ pub enum Error { CGroup(#[from] v2::Error), } +#[derive(Debug)] +struct CgroupInfo { + cpu_sampler: v2::cpu::Sampler, +} + #[derive(Debug)] pub(crate) struct Sampler { parent: Process, + cgroup_info: FxHashMap, labels: Vec<(String, String)>, } impl Sampler { pub(crate) fn new(parent_pid: i32, labels: Vec<(String, String)>) -> Result { let parent = Process::new(parent_pid)?; + let cgroup_info = FxHashMap::default(); - Ok(Self { parent, labels }) + Ok(Self { + parent, + cgroup_info, + labels, + }) } #[allow(clippy::cast_possible_wrap)] @@ -108,17 +119,22 @@ impl Sampler { // Now iterate the cgroups and collect samples. for cgroup_path in cgroups { - debug!( - "Polling cgroup metrics for {path}", - path = cgroup_path.to_string_lossy() - ); + // If we haven't seen this cgroup before, initialize its CgroupInfo. + let cinfo = self + .cgroup_info + .entry(cgroup_path.clone()) + .or_insert_with(|| CgroupInfo { + cpu_sampler: v2::cpu::Sampler::new(), + }); + if let Err(err) = v2::poll(&cgroup_path, &self.labels).await { error!( "Unable to poll cgroup memory metrics for {path}: {err}", path = cgroup_path.to_string_lossy() ); } - if let Err(err) = v2::cpu::poll(&cgroup_path, &self.labels).await { + + if let Err(err) = cinfo.cpu_sampler.poll(&cgroup_path, &self.labels).await { error!( "Unable to poll cgroup CPU metrics for {path}: {err}", path = cgroup_path.to_string_lossy() diff --git a/lading/src/observer/linux/cgroup/v2/cpu.rs b/lading/src/observer/linux/cgroup/v2/cpu.rs index 91837fbb3..ef6605e4c 100644 --- a/lading/src/observer/linux/cgroup/v2/cpu.rs +++ b/lading/src/observer/linux/cgroup/v2/cpu.rs @@ -1,7 +1,5 @@ use metrics::gauge; -use once_cell::sync::OnceCell; use std::path::Path; -use std::sync::Mutex; use std::time::Instant; use tokio::fs; @@ -17,108 +15,114 @@ pub(crate) enum Error { ParseInt(#[from] std::num::ParseIntError), } -struct PrevStats { +#[derive(Debug)] +struct Stats { usage_usec: u64, user_usec: u64, system_usec: u64, last_instant: Instant, } -static PREV: OnceCell> = OnceCell::new(); +#[derive(Debug)] +pub(crate) struct Sampler { + prev: Stats, +} + +impl Sampler { + pub(crate) fn new() -> Self { + Self { + prev: Stats { + usage_usec: 0, + user_usec: 0, + system_usec: 0, + last_instant: Instant::now(), + }, + } + } -// Read cgroup CPU data and calculate a percentage of usage. -pub(crate) async fn poll(group_prefix: &Path, labels: &[(String, String)]) -> Result<(), Error> { - // Read cpu.max (cgroup v2) - let cpu_max = fs::read_to_string(group_prefix.join("cpu.max")).await?; - let parts: Vec<&str> = cpu_max.split_whitespace().collect(); - let (max_str, period_str) = (parts[0], parts[1]); - let allowed_cores = if max_str == "max" { - // If the target cgroup has no CPU limit we assume it has access to all - // physical cores. - num_cpus::get_physical() as f64 - } else { - let max_val = max_str.parse::()?; - let period_val = period_str.parse::()?; - max_val / period_val - }; - let limit_millicores = allowed_cores * 1000.0; + // Read cgroup CPU data and calculate a percentage of usage. + pub(crate) async fn poll( + &mut self, + group_prefix: &Path, + labels: &[(String, String)], + ) -> Result<(), Error> { + // Read cpu.max (cgroup v2) + let cpu_max = fs::read_to_string(group_prefix.join("cpu.max")).await?; + let parts: Vec<&str> = cpu_max.split_whitespace().collect(); + let (max_str, period_str) = (parts[0], parts[1]); + let allowed_cores = if max_str == "max" { + // If the target cgroup has no CPU limit we assume it has access to all + // physical cores. + num_cpus::get_physical() as f64 + } else { + let max_val = max_str.parse::()?; + let period_val = period_str.parse::()?; + max_val / period_val + }; + let limit_millicores = allowed_cores * 1000.0; - // Read cpu.stat - let cpu_stat = fs::read_to_string(group_prefix.join("cpu.stat")).await?; - let mut usage_usec = 0u64; - let mut user_usec = 0u64; - let mut system_usec = 0u64; + // Read cpu.stat + let cpu_stat = fs::read_to_string(group_prefix.join("cpu.stat")).await?; + let mut usage_usec = 0u64; + let mut user_usec = 0u64; + let mut system_usec = 0u64; - for line in cpu_stat.lines() { - let mut parts = line.split_whitespace(); - let key = parts.next().expect("no key"); - let value: u64 = parts.next().expect("no value").parse()?; - match key { - "usage_usec" => usage_usec = value, - "user_usec" => user_usec = value, - "system_usec" => system_usec = value, - _ => {} + for line in cpu_stat.lines() { + let mut parts = line.split_whitespace(); + let key = parts.next().expect("no key"); + let value: u64 = parts.next().expect("no value").parse()?; + match key { + "usage_usec" => usage_usec = value, + "user_usec" => user_usec = value, + "system_usec" => system_usec = value, + _ => {} + } } - } - // Get or initialize the previous stats. Note that the first time this is - // initialized we intentionally set last_instance to now to avoid scheduling - // shenanigans. - let now = Instant::now(); - let mut prev = PREV - .get_or_init(|| { - Mutex::new(PrevStats { - usage_usec, - user_usec, - system_usec, - last_instant: now, - }) - }) - .lock() - .expect("could not lock stats, poisoned by my constituents"); + let now = Instant::now(); + let delta_time = now.duration_since(self.prev.last_instant).as_micros(); + let delta_usage = usage_usec.saturating_sub(self.prev.usage_usec); + let delta_user = user_usec.saturating_sub(self.prev.user_usec); + let delta_system = system_usec.saturating_sub(self.prev.system_usec); - let delta_time = now.duration_since(prev.last_instant).as_micros(); - let delta_usage = usage_usec.saturating_sub(prev.usage_usec); - let delta_user = user_usec.saturating_sub(prev.user_usec); - let delta_system = system_usec.saturating_sub(prev.system_usec); + // Update previous stats and if there's a time delta calculate the CPU + // usage. + self.prev.usage_usec = usage_usec; + self.prev.user_usec = user_usec; + self.prev.system_usec = system_usec; + self.prev.last_instant = now; + if delta_time > 0 { + let delta_time = delta_time as f64; - // Update previous stats and if there's a time delta calculate the CPU - // usage. - prev.usage_usec = usage_usec; - prev.user_usec = user_usec; - prev.system_usec = system_usec; - prev.last_instant = now; - if delta_time > 0 { - let delta_time = delta_time as f64; + // Compute CPU usage as a fraction of a single CPU + let usage_fraction = delta_usage as f64 / delta_time; + let user_fraction = delta_user as f64 / delta_time; + let system_fraction = delta_system as f64 / delta_time; - // Compute CPU usage as a fraction of a single CPU - let usage_fraction = delta_usage as f64 / delta_time; - let user_fraction = delta_user as f64 / delta_time; - let system_fraction = delta_system as f64 / delta_time; + // NOTE these metric names are paired with names in procfs/stat.rs and + // must remain consistent. If you change these, change those. - // NOTE these metric names are paired with names in procfs/stat.rs and - // must remain consistent. If you change these, change those. + // Convert usage to a percentage of the cores granted to the target. + let total_cpu = (usage_fraction / allowed_cores) * 100.0; + let user_cpu = (user_fraction / allowed_cores) * 100.0; + let system_cpu = (system_fraction / allowed_cores) * 100.0; + gauge!("total_cpu_percentage", labels).set(total_cpu); + gauge!("cpu_percentage", labels).set(total_cpu); // backward compatibility + gauge!("user_cpu_percentage", labels).set(user_cpu); + gauge!("kernel_cpu_percentage", labels).set(system_cpu); // kernel is a misnomer, keeping for compatibility + gauge!("system_cpu_percentage", labels).set(system_cpu); - // Convert usage to a percentage of the cores granted to the target. - let total_cpu = (usage_fraction / allowed_cores) * 100.0; - let user_cpu = (user_fraction / allowed_cores) * 100.0; - let system_cpu = (system_fraction / allowed_cores) * 100.0; - gauge!("total_cpu_percentage", labels).set(total_cpu); - gauge!("cpu_percentage", labels).set(total_cpu); // backward compatibility - gauge!("user_cpu_percentage", labels).set(user_cpu); - gauge!("kernel_cpu_percentage", labels).set(system_cpu); // kernel is a misnomer, keeping for compatibility - gauge!("system_cpu_percentage", labels).set(system_cpu); + // Convert usage to kubernetes style millicores. + let total_millicores = usage_fraction * 1000.0; + let user_millicores = user_fraction * 1000.0; + let system_millicores = system_fraction * 1000.0; + gauge!("total_cpu_usage_millicores", labels).set(total_millicores); + gauge!("user_cpu_usage_millicores", labels).set(user_millicores); + gauge!("kernel_cpu_usage_millicores", labels).set(system_millicores); // kernel is a misnomer, keeping for compatibility + gauge!("system_cpu_usage_millicores", labels).set(system_millicores); + gauge!("cpu_limit_millicores", labels).set(limit_millicores); + } - // Convert usage to kubernetes style millicores. - let total_millicores = usage_fraction * 1000.0; - let user_millicores = user_fraction * 1000.0; - let system_millicores = system_fraction * 1000.0; - gauge!("total_cpu_usage_millicores", labels).set(total_millicores); - gauge!("user_cpu_usage_millicores", labels).set(user_millicores); - gauge!("kernel_cpu_usage_millicores", labels).set(system_millicores); // kernel is a misnomer, keeping for compatibility - gauge!("system_cpu_usage_millicores", labels).set(system_millicores); - gauge!("cpu_limit_millicores", labels).set(limit_millicores); + Ok(()) } - - Ok(()) }