Skip to content

Commit

Permalink
Ensure cgroup CPU poller works with multiple cgroups (#1149)
Browse files Browse the repository at this point in the history
### What does this PR do?

This commit corrects a similar bug to the one that I corrected in
the stats reader, originally copied from this point. Specifically,
we now store state that maintains the cgroup previous values needed
to calculate diffs over time.
  • Loading branch information
blt authored Dec 14, 2024
1 parent 9f09b9a commit 4191cbb
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 96 deletions.
32 changes: 24 additions & 8 deletions lading/src/observer/linux/cgroup.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
/// Code to read cgroup information.
pub(crate) mod v2;

use std::{collections::VecDeque, io};
use std::{collections::VecDeque, io, path::PathBuf};

use nix::errno::Errno;
use procfs::process::Process;
use rustc_hash::FxHashSet;
use rustc_hash::{FxHashMap, FxHashSet};
use tracing::{debug, error, trace};

#[derive(thiserror::Error, Debug)]
Expand All @@ -26,17 +26,28 @@ pub enum Error {
CGroup(#[from] v2::Error),
}

#[derive(Debug)]
struct CgroupInfo {
cpu_sampler: v2::cpu::Sampler,
}

#[derive(Debug)]
pub(crate) struct Sampler {
parent: Process,
cgroup_info: FxHashMap<PathBuf, CgroupInfo>,
labels: Vec<(String, String)>,
}

impl Sampler {
pub(crate) fn new(parent_pid: i32, labels: Vec<(String, String)>) -> Result<Self, Error> {
let parent = Process::new(parent_pid)?;
let cgroup_info = FxHashMap::default();

Ok(Self { parent, labels })
Ok(Self {
parent,
cgroup_info,
labels,
})
}

#[allow(clippy::cast_possible_wrap)]
Expand Down Expand Up @@ -108,17 +119,22 @@ impl Sampler {

// Now iterate the cgroups and collect samples.
for cgroup_path in cgroups {
debug!(
"Polling cgroup metrics for {path}",
path = cgroup_path.to_string_lossy()
);
// If we haven't seen this cgroup before, initialize its CgroupInfo.
let cinfo = self
.cgroup_info
.entry(cgroup_path.clone())
.or_insert_with(|| CgroupInfo {
cpu_sampler: v2::cpu::Sampler::new(),
});

if let Err(err) = v2::poll(&cgroup_path, &self.labels).await {
error!(
"Unable to poll cgroup memory metrics for {path}: {err}",
path = cgroup_path.to_string_lossy()
);
}
if let Err(err) = v2::cpu::poll(&cgroup_path, &self.labels).await {

if let Err(err) = cinfo.cpu_sampler.poll(&cgroup_path, &self.labels).await {
error!(
"Unable to poll cgroup CPU metrics for {path}: {err}",
path = cgroup_path.to_string_lossy()
Expand Down
180 changes: 92 additions & 88 deletions lading/src/observer/linux/cgroup/v2/cpu.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use metrics::gauge;
use once_cell::sync::OnceCell;
use std::path::Path;
use std::sync::Mutex;
use std::time::Instant;
use tokio::fs;

Expand All @@ -17,108 +15,114 @@ pub(crate) enum Error {
ParseInt(#[from] std::num::ParseIntError),
}

struct PrevStats {
#[derive(Debug)]
struct Stats {
usage_usec: u64,
user_usec: u64,
system_usec: u64,
last_instant: Instant,
}

static PREV: OnceCell<Mutex<PrevStats>> = OnceCell::new();
#[derive(Debug)]
pub(crate) struct Sampler {
prev: Stats,
}

impl Sampler {
pub(crate) fn new() -> Self {
Self {
prev: Stats {
usage_usec: 0,
user_usec: 0,
system_usec: 0,
last_instant: Instant::now(),
},
}
}

// Read cgroup CPU data and calculate a percentage of usage.
pub(crate) async fn poll(group_prefix: &Path, labels: &[(String, String)]) -> Result<(), Error> {
// Read cpu.max (cgroup v2)
let cpu_max = fs::read_to_string(group_prefix.join("cpu.max")).await?;
let parts: Vec<&str> = cpu_max.split_whitespace().collect();
let (max_str, period_str) = (parts[0], parts[1]);
let allowed_cores = if max_str == "max" {
// If the target cgroup has no CPU limit we assume it has access to all
// physical cores.
num_cpus::get_physical() as f64
} else {
let max_val = max_str.parse::<f64>()?;
let period_val = period_str.parse::<f64>()?;
max_val / period_val
};
let limit_millicores = allowed_cores * 1000.0;
// Read cgroup CPU data and calculate a percentage of usage.
pub(crate) async fn poll(
&mut self,
group_prefix: &Path,
labels: &[(String, String)],
) -> Result<(), Error> {
// Read cpu.max (cgroup v2)
let cpu_max = fs::read_to_string(group_prefix.join("cpu.max")).await?;
let parts: Vec<&str> = cpu_max.split_whitespace().collect();
let (max_str, period_str) = (parts[0], parts[1]);
let allowed_cores = if max_str == "max" {
// If the target cgroup has no CPU limit we assume it has access to all
// physical cores.
num_cpus::get_physical() as f64
} else {
let max_val = max_str.parse::<f64>()?;
let period_val = period_str.parse::<f64>()?;
max_val / period_val
};
let limit_millicores = allowed_cores * 1000.0;

// Read cpu.stat
let cpu_stat = fs::read_to_string(group_prefix.join("cpu.stat")).await?;
let mut usage_usec = 0u64;
let mut user_usec = 0u64;
let mut system_usec = 0u64;
// Read cpu.stat
let cpu_stat = fs::read_to_string(group_prefix.join("cpu.stat")).await?;
let mut usage_usec = 0u64;
let mut user_usec = 0u64;
let mut system_usec = 0u64;

for line in cpu_stat.lines() {
let mut parts = line.split_whitespace();
let key = parts.next().expect("no key");
let value: u64 = parts.next().expect("no value").parse()?;
match key {
"usage_usec" => usage_usec = value,
"user_usec" => user_usec = value,
"system_usec" => system_usec = value,
_ => {}
for line in cpu_stat.lines() {
let mut parts = line.split_whitespace();
let key = parts.next().expect("no key");
let value: u64 = parts.next().expect("no value").parse()?;
match key {
"usage_usec" => usage_usec = value,
"user_usec" => user_usec = value,
"system_usec" => system_usec = value,
_ => {}
}
}
}

// Get or initialize the previous stats. Note that the first time this is
// initialized we intentionally set last_instance to now to avoid scheduling
// shenanigans.
let now = Instant::now();
let mut prev = PREV
.get_or_init(|| {
Mutex::new(PrevStats {
usage_usec,
user_usec,
system_usec,
last_instant: now,
})
})
.lock()
.expect("could not lock stats, poisoned by my constituents");
let now = Instant::now();
let delta_time = now.duration_since(self.prev.last_instant).as_micros();
let delta_usage = usage_usec.saturating_sub(self.prev.usage_usec);
let delta_user = user_usec.saturating_sub(self.prev.user_usec);
let delta_system = system_usec.saturating_sub(self.prev.system_usec);

let delta_time = now.duration_since(prev.last_instant).as_micros();
let delta_usage = usage_usec.saturating_sub(prev.usage_usec);
let delta_user = user_usec.saturating_sub(prev.user_usec);
let delta_system = system_usec.saturating_sub(prev.system_usec);
// Update previous stats and if there's a time delta calculate the CPU
// usage.
self.prev.usage_usec = usage_usec;
self.prev.user_usec = user_usec;
self.prev.system_usec = system_usec;
self.prev.last_instant = now;
if delta_time > 0 {
let delta_time = delta_time as f64;

// Update previous stats and if there's a time delta calculate the CPU
// usage.
prev.usage_usec = usage_usec;
prev.user_usec = user_usec;
prev.system_usec = system_usec;
prev.last_instant = now;
if delta_time > 0 {
let delta_time = delta_time as f64;
// Compute CPU usage as a fraction of a single CPU
let usage_fraction = delta_usage as f64 / delta_time;
let user_fraction = delta_user as f64 / delta_time;
let system_fraction = delta_system as f64 / delta_time;

// Compute CPU usage as a fraction of a single CPU
let usage_fraction = delta_usage as f64 / delta_time;
let user_fraction = delta_user as f64 / delta_time;
let system_fraction = delta_system as f64 / delta_time;
// NOTE these metric names are paired with names in procfs/stat.rs and
// must remain consistent. If you change these, change those.

// NOTE these metric names are paired with names in procfs/stat.rs and
// must remain consistent. If you change these, change those.
// Convert usage to a percentage of the cores granted to the target.
let total_cpu = (usage_fraction / allowed_cores) * 100.0;
let user_cpu = (user_fraction / allowed_cores) * 100.0;
let system_cpu = (system_fraction / allowed_cores) * 100.0;
gauge!("total_cpu_percentage", labels).set(total_cpu);
gauge!("cpu_percentage", labels).set(total_cpu); // backward compatibility
gauge!("user_cpu_percentage", labels).set(user_cpu);
gauge!("kernel_cpu_percentage", labels).set(system_cpu); // kernel is a misnomer, keeping for compatibility
gauge!("system_cpu_percentage", labels).set(system_cpu);

// Convert usage to a percentage of the cores granted to the target.
let total_cpu = (usage_fraction / allowed_cores) * 100.0;
let user_cpu = (user_fraction / allowed_cores) * 100.0;
let system_cpu = (system_fraction / allowed_cores) * 100.0;
gauge!("total_cpu_percentage", labels).set(total_cpu);
gauge!("cpu_percentage", labels).set(total_cpu); // backward compatibility
gauge!("user_cpu_percentage", labels).set(user_cpu);
gauge!("kernel_cpu_percentage", labels).set(system_cpu); // kernel is a misnomer, keeping for compatibility
gauge!("system_cpu_percentage", labels).set(system_cpu);
// Convert usage to kubernetes style millicores.
let total_millicores = usage_fraction * 1000.0;
let user_millicores = user_fraction * 1000.0;
let system_millicores = system_fraction * 1000.0;
gauge!("total_cpu_usage_millicores", labels).set(total_millicores);
gauge!("user_cpu_usage_millicores", labels).set(user_millicores);
gauge!("kernel_cpu_usage_millicores", labels).set(system_millicores); // kernel is a misnomer, keeping for compatibility
gauge!("system_cpu_usage_millicores", labels).set(system_millicores);
gauge!("cpu_limit_millicores", labels).set(limit_millicores);
}

// Convert usage to kubernetes style millicores.
let total_millicores = usage_fraction * 1000.0;
let user_millicores = user_fraction * 1000.0;
let system_millicores = system_fraction * 1000.0;
gauge!("total_cpu_usage_millicores", labels).set(total_millicores);
gauge!("user_cpu_usage_millicores", labels).set(user_millicores);
gauge!("kernel_cpu_usage_millicores", labels).set(system_millicores); // kernel is a misnomer, keeping for compatibility
gauge!("system_cpu_usage_millicores", labels).set(system_millicores);
gauge!("cpu_limit_millicores", labels).set(limit_millicores);
Ok(())
}

Ok(())
}

0 comments on commit 4191cbb

Please sign in to comment.