Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat] Implement CPU and DRAM monitoring for zeusd #137

Merged
merged 27 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
76207a5
Some initial structure
jaywonchung Jul 8, 2024
b95d990
Implement CPU and DRAM monitoring
wbjin Oct 27, 2024
908ab1e
Add cpu_device_tasks to tcp server
wbjin Oct 27, 2024
ad0369d
Update zeusd/src/devices/cpu/linux.rs for supporting zeusd in docker …
wbjin Nov 2, 2024
9bd41fd
Update zeusd/src/startup.rs
wbjin Nov 2, 2024
fab27e5
Use usize for index
wbjin Nov 2, 2024
e21f31e
Merge branch 'jw-zeusd-rapl' of https://github.com/ml-energy/zeus int…
wbjin Nov 2, 2024
e656f58
Update zeusd/src/devices/cpu/mod.rs
wbjin Nov 2, 2024
826a069
Add comments and change stop monitoring function
wbjin Nov 2, 2024
8448a87
Merge branch 'jw-zeusd-rapl' of https://github.com/ml-energy/zeus int…
wbjin Nov 2, 2024
aade41b
Update zeusd/src/error.rs
wbjin Nov 2, 2024
d3d819f
Merge branch 'jw-zeusd-rapl' of https://github.com/ml-energy/zeus int…
wbjin Nov 2, 2024
b9b93cd
Address PR feedback
wbjin Nov 5, 2024
75abcd7
Clarify assumption made in get_cpu_energy
wbjin Nov 14, 2024
d0bca5a
Address PR feedback
wbjin Nov 14, 2024
144a3b4
Tests for GetIndexEnergy route
wbjin Nov 14, 2024
36a4874
Reduce number of workers for tests
wbjin Nov 19, 2024
757a6ca
Lint
wbjin Nov 19, 2024
c9370ac
Fix linux errors
wbjin Nov 19, 2024
3ab3bf2
Format fix
wbjin Nov 19, 2024
7a48b84
Update zeusd/tests/cpu.rs
wbjin Nov 26, 2024
c152cf5
Address PR feedback
wbjin Nov 26, 2024
67a7c44
Merge branch 'jw-zeusd-rapl' of https://github.com/ml-energy/zeus int…
wbjin Nov 26, 2024
28d0366
Update zeusd/src/devices/cpu/mod.rs
wbjin Nov 26, 2024
708c7f6
Merge branch 'jw-zeusd-rapl' of https://github.com/ml-energy/zeus int…
wbjin Nov 26, 2024
5efbd27
Revert back to Deserialize
wbjin Nov 27, 2024
3db0d49
Merge branch 'master' into jw-zeusd-rapl
jaywonchung Nov 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions zeusd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ name = "zeusd"
[dependencies]
nvml-wrapper = "0.10"
actix-web = "4"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread", "fs"] }
thiserror = "1"
clap = { version = "4.5.4", features = ["derive"] }
serde = { version = "1", features = ["derive"] }
Expand All @@ -31,8 +31,8 @@ tracing-log = "0.2.0"
tracing-actix-web = "0.7.10"
nix = { version = "0.29", default-features = false, features = ["user"] }
paste = "1"
once_cell = "1.7.2"

[dev-dependencies]
once_cell = "1.7.2"
reqwest = { version = "0.11", default-features = false, features = ["json"] }
serde_json = "1"
Empty file modified zeusd/scripts/lint.sh
100644 → 100755
Empty file.
248 changes: 248 additions & 0 deletions zeusd/src/devices/cpu/linux.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
//! CPU power measurement with RAPL. Only supported on Linux.

use once_cell::sync::OnceCell;
use std::fs;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::string::String;
use std::sync::{Arc, RwLock};
use tokio::io::AsyncReadExt;
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};

use crate::devices::cpu::{CpuManager, PackageInfo};
use crate::error::ZeusdError;

// NOTE: To support Zeusd deployment in a docker container, this should support
// sysfs mounts under places like `/zeus_sys`.
static RAPL_DIR: &str = "/sys/class/powercap/intel-rapl";

// Assuming a maximum power draw of 1000 Watts when we are polling every 0.1 seconds, the maximum
// amount the RAPL counter would increase (1000 * 1e6 * 0.1)
static RAPL_COUNTER_MAX_INCREASE: u64 = 1000 * 100000;

pub struct RaplCpu {
cpu: Arc<PackageInfo>,
dram: Option<Arc<PackageInfo>>,
cpu_monitoring_task: OnceCell<JoinHandle<Result<(), ZeusdError>>>,
dram_monitoring_task: OnceCell<JoinHandle<Result<(), ZeusdError>>>,
}

impl RaplCpu {
pub fn init(_index: usize) -> Result<Self, ZeusdError> {
let fields = RaplCpu::get_available_fields(_index)?;
Ok(Self {
cpu: fields.0,
dram: fields.1,
cpu_monitoring_task: OnceCell::new(),
dram_monitoring_task: OnceCell::new(),
})
}
}

impl PackageInfo {
jaywonchung marked this conversation as resolved.
Show resolved Hide resolved
pub fn new(base_path: &Path, index: usize) -> anyhow::Result<Self, ZeusdError> {
let cpu_name_path = base_path.join("name");
let cpu_energy_path = base_path.join("energy_uj");
let cpu_max_energy_path = base_path.join("max_energy_range_uj");

if !cpu_name_path.exists() || !cpu_max_energy_path.exists() || !cpu_energy_path.exists() {
return Err(ZeusdError::CpuInitializationError(index));
}

let cpu_name = fs::read_to_string(&cpu_name_path)?.trim_end().to_string();
// Try reding from energy_uj file
read_u64(&cpu_energy_path)?;
let cpu_max_energy = read_u64(&cpu_max_energy_path)?;
let wraparound_counter = RwLock::new(0);
Ok(PackageInfo {
index,
name: cpu_name,
energy_uj_path: cpu_energy_path,
max_energy_uj: cpu_max_energy,
num_wraparounds: wraparound_counter,
})
}
}

impl CpuManager for RaplCpu {
fn device_count() -> Result<usize, ZeusdError> {
let mut index_count = 0;
let base_path = PathBuf::from(RAPL_DIR);

match fs::read_dir(&base_path) {
Ok(entries) => {
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
if let Some(dir_name_str) = path.file_name() {
let dir_name = dir_name_str.to_string_lossy();
if dir_name.contains("intel-rapl") {
index_count += 1;
}
}
}
}
}
Err(_) => {
tracing::error!("RAPL not available");
}
};
Ok(index_count)
}

fn get_available_fields(
index: usize,
) -> Result<(Arc<PackageInfo>, Option<Arc<PackageInfo>>), ZeusdError> {
let base_path = PathBuf::from(format!("{}/intel-rapl:{}", RAPL_DIR, index));
let cpu_info = PackageInfo::new(&base_path, index)?;

match fs::read_dir(&base_path) {
Ok(entries) => {
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
if let Some(dir_name_str) = path.file_name() {
let dir_name = dir_name_str.to_string_lossy();
if dir_name.contains("intel-rapl") {
let subpackage_path = base_path.join(&*dir_name);
let subpackage_info = PackageInfo::new(&subpackage_path, index)?;
if subpackage_info.name == "dram" {
return Ok((
Arc::new(cpu_info),
Some(Arc::new(subpackage_info)),
));
}
}
}
}
}
}
Err(_) => {
return Err(ZeusdError::CpuInitializationError(index));
}
};

Ok((Arc::new(cpu_info), None))
}

fn get_cpu_energy(&mut self) -> Result<u64, ZeusdError> {
// Assume that RAPL counter will not wrap around twice during a request to poll energy. The
// number of wrap arounds is polled twice to handle the case where the counter wraps around
// a request. If this happens, `measurement` has to be updated as to not return an
// unexpectedly large energy value.

let handle = self
.cpu_monitoring_task
.get_or_init(|| tokio::spawn(monitor_rapl(Arc::clone(&self.cpu))));
if handle.is_finished() {
return Err(ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index));
}

let num_wraparounds_before = *self
.cpu
.num_wraparounds
.read()
.map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index))?;
let mut measurement = read_u64(&self.cpu.energy_uj_path)?;
let num_wraparounds = *self
.cpu
.num_wraparounds
.read()
.map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index))?;
if num_wraparounds != num_wraparounds_before {
// Wraparound has happened after measurement, take measurement again
measurement = read_u64(&self.cpu.energy_uj_path)?;
}

Ok(measurement + num_wraparounds * self.cpu.max_energy_uj)
}

fn get_dram_energy(&mut self) -> Result<u64, ZeusdError> {
jaywonchung marked this conversation as resolved.
Show resolved Hide resolved
match &self.dram {
None => Err(ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index)),
Some(dram) => {
let handle = self
.dram_monitoring_task
.get_or_init(|| tokio::spawn(monitor_rapl(Arc::clone(dram))));
if handle.is_finished() {
return Err(ZeusdError::CpuManagementTaskTerminatedError(dram.index));
}

let num_wraparounds_before = *dram
.num_wraparounds
.read()
.map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(dram.index))?;
let mut measurement = read_u64(&dram.energy_uj_path)?;
let num_wraparounds = *dram
.num_wraparounds
.read()
.map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(dram.index))?;
if num_wraparounds != num_wraparounds_before {
// Wraparound has happened after measurement, take measurement again
measurement = read_u64(&dram.energy_uj_path)?;
}

Ok(measurement + num_wraparounds * dram.max_energy_uj)
}
}
}

fn stop_monitoring(&mut self) {
if let Some(handle) = self.cpu_monitoring_task.take() {
handle.abort();
}
if let Some(handle) = self.dram_monitoring_task.take() {
handle.abort();
}
}

fn is_dram_available(&self) -> bool {
self.dram.is_some()
}
}

fn read_u64(path: &PathBuf) -> anyhow::Result<u64, std::io::Error> {
let mut file = std::fs::File::open(path)?;
let mut buf = String::new();
file.read_to_string(&mut buf)?;
buf.trim()
.parse()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
}

async fn read_u64_async(path: &PathBuf) -> Result<u64, std::io::Error> {
let mut file = tokio::fs::File::open(path).await?;
let mut buf = String::new();
file.read_to_string(&mut buf).await?;
buf.trim()
.parse()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
}

async fn monitor_rapl(rapl_file: Arc<PackageInfo>) -> Result<(), ZeusdError> {
let mut last_energy_uj = read_u64_async(&rapl_file.energy_uj_path).await?;
tracing::info!(
"Monitoring started for {}",
rapl_file.energy_uj_path.display()
);
loop {
let current_energy_uj = read_u64_async(&rapl_file.energy_uj_path).await?;

if current_energy_uj < last_energy_uj {
let mut wraparound_guard = rapl_file
.num_wraparounds
.write()
.map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(rapl_file.index))?;
*wraparound_guard += 1;
}
last_energy_uj = current_energy_uj;
let sleep_time = if rapl_file.max_energy_uj - current_energy_uj < RAPL_COUNTER_MAX_INCREASE
{
100
} else {
1000
};
sleep(Duration::from_millis(sleep_time)).await;
}
}
59 changes: 59 additions & 0 deletions zeusd/src/devices/cpu/macos.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//! Fake `RaplCpu` implementation to allow development and testing on MacOS.
use std::path::PathBuf;
use std::sync::{Arc, RwLock};

use crate::devices::cpu::{CpuManager, PackageInfo};
use crate::error::ZeusdError;

pub struct RaplCpu {}

impl RaplCpu {
pub fn init(_index: usize) -> Result<Self, ZeusdError> {
Ok(Self {})
}
}

impl CpuManager for RaplCpu {
fn device_count() -> Result<usize, ZeusdError> {
Ok(1)
}

fn get_available_fields(
_index: usize,
) -> Result<(Arc<PackageInfo>, Option<Arc<PackageInfo>>), ZeusdError> {
Ok((
Arc::new(PackageInfo {
index: _index,
name: "package-0".to_string(),
energy_uj_path: PathBuf::from(
"/sys/class/powercap/intel-rapl/intel-rapl:0/energy_uj",
),
max_energy_uj: 1000000,
num_wraparounds: RwLock::new(0),
}),
Some(Arc::new(PackageInfo {
index: _index,
name: "dram".to_string(),
energy_uj_path: PathBuf::from(
"/sys/class/powercap/intel-rapl/intel-rapl:0/intel-rapl:0:0/energy_uj",
),
max_energy_uj: 1000000,
num_wraparounds: RwLock::new(0),
})),
))
}

fn get_cpu_energy(&mut self) -> Result<u64, ZeusdError> {
Ok(10001)
}

fn get_dram_energy(&mut self) -> Result<u64, ZeusdError> {
Ok(1001)
}

fn stop_monitoring(&mut self) {}

fn is_dram_available(&self) -> bool {
true
}
}
Loading
Loading