From 7a07975b7128939e9edb25c7324b93bca8166e29 Mon Sep 17 00:00:00 2001 From: Wonbin Jin <116508975+wbjin@users.noreply.github.com> Date: Thu, 28 Nov 2024 13:45:58 -0600 Subject: [PATCH] [Feat] Implement CPU and DRAM monitoring for `zeusd` (#137) Co-authored-by: Jae-Won Chung --- zeusd/Cargo.toml | 4 +- zeusd/scripts/lint.sh | 0 zeusd/src/devices/cpu/linux.rs | 248 +++++++++++++++++++++++++++++++++ zeusd/src/devices/cpu/macos.rs | 59 ++++++++ zeusd/src/devices/cpu/mod.rs | 189 +++++++++++++++++++++++++ zeusd/src/devices/gpu/linux.rs | 16 ++- zeusd/src/devices/gpu/mod.rs | 2 +- zeusd/src/devices/mod.rs | 1 + zeusd/src/error.rs | 12 ++ zeusd/src/main.rs | 24 +++- zeusd/src/routes/cpu.rs | 53 +++++++ zeusd/src/routes/gpu.rs | 8 +- zeusd/src/routes/mod.rs | 2 + zeusd/src/startup.rs | 32 ++++- zeusd/tests/cpu.rs | 156 +++++++++++++++++++++ zeusd/tests/helpers/mod.rs | 145 +++++++++++++++++-- 16 files changed, 919 insertions(+), 32 deletions(-) mode change 100644 => 100755 zeusd/scripts/lint.sh create mode 100644 zeusd/src/devices/cpu/linux.rs create mode 100644 zeusd/src/devices/cpu/macos.rs create mode 100644 zeusd/src/devices/cpu/mod.rs create mode 100644 zeusd/src/routes/cpu.rs create mode 100644 zeusd/tests/cpu.rs diff --git a/zeusd/Cargo.toml b/zeusd/Cargo.toml index c5d66cce..7cad81df 100644 --- a/zeusd/Cargo.toml +++ b/zeusd/Cargo.toml @@ -20,7 +20,7 @@ name = "zeusd" [dependencies] nvml-wrapper = "0.10" actix-web = "4" -tokio = { version = "1", features = ["macros", "rt-multi-thread"] } +tokio = { version = "1", features = ["macros", "rt-multi-thread", "fs"] } thiserror = "1" clap = { version = "4.5.4", features = ["derive"] } serde = { version = "1", features = ["derive"] } @@ -31,8 +31,8 @@ tracing-log = "0.2.0" tracing-actix-web = "0.7.10" nix = { version = "0.29", default-features = false, features = ["user"] } paste = "1" +once_cell = "1.7.2" [dev-dependencies] -once_cell = "1.7.2" reqwest = { version = "0.11", default-features = false, features = ["json"] } serde_json = "1" diff --git a/zeusd/scripts/lint.sh b/zeusd/scripts/lint.sh old mode 100644 new mode 100755 diff --git a/zeusd/src/devices/cpu/linux.rs b/zeusd/src/devices/cpu/linux.rs new file mode 100644 index 00000000..94f38e9e --- /dev/null +++ b/zeusd/src/devices/cpu/linux.rs @@ -0,0 +1,248 @@ +//! CPU power measurement with RAPL. Only supported on Linux. + +use once_cell::sync::OnceCell; +use std::fs; +use std::io::Read; +use std::path::{Path, PathBuf}; +use std::string::String; +use std::sync::{Arc, RwLock}; +use tokio::io::AsyncReadExt; +use tokio::task::JoinHandle; +use tokio::time::{sleep, Duration}; + +use crate::devices::cpu::{CpuManager, PackageInfo}; +use crate::error::ZeusdError; + +// NOTE: To support Zeusd deployment in a docker container, this should support +// sysfs mounts under places like `/zeus_sys`. +static RAPL_DIR: &str = "/sys/class/powercap/intel-rapl"; + +// Assuming a maximum power draw of 1000 Watts when we are polling every 0.1 seconds, the maximum +// amount the RAPL counter would increase (1000 * 1e6 * 0.1) +static RAPL_COUNTER_MAX_INCREASE: u64 = 1000 * 100000; + +pub struct RaplCpu { + cpu: Arc, + dram: Option>, + cpu_monitoring_task: OnceCell>>, + dram_monitoring_task: OnceCell>>, +} + +impl RaplCpu { + pub fn init(_index: usize) -> Result { + let fields = RaplCpu::get_available_fields(_index)?; + Ok(Self { + cpu: fields.0, + dram: fields.1, + cpu_monitoring_task: OnceCell::new(), + dram_monitoring_task: OnceCell::new(), + }) + } +} + +impl PackageInfo { + pub fn new(base_path: &Path, index: usize) -> anyhow::Result { + let cpu_name_path = base_path.join("name"); + let cpu_energy_path = base_path.join("energy_uj"); + let cpu_max_energy_path = base_path.join("max_energy_range_uj"); + + if !cpu_name_path.exists() || !cpu_max_energy_path.exists() || !cpu_energy_path.exists() { + return Err(ZeusdError::CpuInitializationError(index)); + } + + let cpu_name = fs::read_to_string(&cpu_name_path)?.trim_end().to_string(); + // Try reding from energy_uj file + read_u64(&cpu_energy_path)?; + let cpu_max_energy = read_u64(&cpu_max_energy_path)?; + let wraparound_counter = RwLock::new(0); + Ok(PackageInfo { + index, + name: cpu_name, + energy_uj_path: cpu_energy_path, + max_energy_uj: cpu_max_energy, + num_wraparounds: wraparound_counter, + }) + } +} + +impl CpuManager for RaplCpu { + fn device_count() -> Result { + let mut index_count = 0; + let base_path = PathBuf::from(RAPL_DIR); + + match fs::read_dir(&base_path) { + Ok(entries) => { + for entry in entries.flatten() { + let path = entry.path(); + if path.is_dir() { + if let Some(dir_name_str) = path.file_name() { + let dir_name = dir_name_str.to_string_lossy(); + if dir_name.contains("intel-rapl") { + index_count += 1; + } + } + } + } + } + Err(_) => { + tracing::error!("RAPL not available"); + } + }; + Ok(index_count) + } + + fn get_available_fields( + index: usize, + ) -> Result<(Arc, Option>), ZeusdError> { + let base_path = PathBuf::from(format!("{}/intel-rapl:{}", RAPL_DIR, index)); + let cpu_info = PackageInfo::new(&base_path, index)?; + + match fs::read_dir(&base_path) { + Ok(entries) => { + for entry in entries.flatten() { + let path = entry.path(); + if path.is_dir() { + if let Some(dir_name_str) = path.file_name() { + let dir_name = dir_name_str.to_string_lossy(); + if dir_name.contains("intel-rapl") { + let subpackage_path = base_path.join(&*dir_name); + let subpackage_info = PackageInfo::new(&subpackage_path, index)?; + if subpackage_info.name == "dram" { + return Ok(( + Arc::new(cpu_info), + Some(Arc::new(subpackage_info)), + )); + } + } + } + } + } + } + Err(_) => { + return Err(ZeusdError::CpuInitializationError(index)); + } + }; + + Ok((Arc::new(cpu_info), None)) + } + + fn get_cpu_energy(&mut self) -> Result { + // Assume that RAPL counter will not wrap around twice during a request to poll energy. The + // number of wrap arounds is polled twice to handle the case where the counter wraps around + // a request. If this happens, `measurement` has to be updated as to not return an + // unexpectedly large energy value. + + let handle = self + .cpu_monitoring_task + .get_or_init(|| tokio::spawn(monitor_rapl(Arc::clone(&self.cpu)))); + if handle.is_finished() { + return Err(ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index)); + } + + let num_wraparounds_before = *self + .cpu + .num_wraparounds + .read() + .map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index))?; + let mut measurement = read_u64(&self.cpu.energy_uj_path)?; + let num_wraparounds = *self + .cpu + .num_wraparounds + .read() + .map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index))?; + if num_wraparounds != num_wraparounds_before { + // Wraparound has happened after measurement, take measurement again + measurement = read_u64(&self.cpu.energy_uj_path)?; + } + + Ok(measurement + num_wraparounds * self.cpu.max_energy_uj) + } + + fn get_dram_energy(&mut self) -> Result { + match &self.dram { + None => Err(ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index)), + Some(dram) => { + let handle = self + .dram_monitoring_task + .get_or_init(|| tokio::spawn(monitor_rapl(Arc::clone(dram)))); + if handle.is_finished() { + return Err(ZeusdError::CpuManagementTaskTerminatedError(dram.index)); + } + + let num_wraparounds_before = *dram + .num_wraparounds + .read() + .map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(dram.index))?; + let mut measurement = read_u64(&dram.energy_uj_path)?; + let num_wraparounds = *dram + .num_wraparounds + .read() + .map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(dram.index))?; + if num_wraparounds != num_wraparounds_before { + // Wraparound has happened after measurement, take measurement again + measurement = read_u64(&dram.energy_uj_path)?; + } + + Ok(measurement + num_wraparounds * dram.max_energy_uj) + } + } + } + + fn stop_monitoring(&mut self) { + if let Some(handle) = self.cpu_monitoring_task.take() { + handle.abort(); + } + if let Some(handle) = self.dram_monitoring_task.take() { + handle.abort(); + } + } + + fn is_dram_available(&self) -> bool { + self.dram.is_some() + } +} + +fn read_u64(path: &PathBuf) -> anyhow::Result { + let mut file = std::fs::File::open(path)?; + let mut buf = String::new(); + file.read_to_string(&mut buf)?; + buf.trim() + .parse() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)) +} + +async fn read_u64_async(path: &PathBuf) -> Result { + let mut file = tokio::fs::File::open(path).await?; + let mut buf = String::new(); + file.read_to_string(&mut buf).await?; + buf.trim() + .parse() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)) +} + +async fn monitor_rapl(rapl_file: Arc) -> Result<(), ZeusdError> { + let mut last_energy_uj = read_u64_async(&rapl_file.energy_uj_path).await?; + tracing::info!( + "Monitoring started for {}", + rapl_file.energy_uj_path.display() + ); + loop { + let current_energy_uj = read_u64_async(&rapl_file.energy_uj_path).await?; + + if current_energy_uj < last_energy_uj { + let mut wraparound_guard = rapl_file + .num_wraparounds + .write() + .map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(rapl_file.index))?; + *wraparound_guard += 1; + } + last_energy_uj = current_energy_uj; + let sleep_time = if rapl_file.max_energy_uj - current_energy_uj < RAPL_COUNTER_MAX_INCREASE + { + 100 + } else { + 1000 + }; + sleep(Duration::from_millis(sleep_time)).await; + } +} diff --git a/zeusd/src/devices/cpu/macos.rs b/zeusd/src/devices/cpu/macos.rs new file mode 100644 index 00000000..66edce5c --- /dev/null +++ b/zeusd/src/devices/cpu/macos.rs @@ -0,0 +1,59 @@ +//! Fake `RaplCpu` implementation to allow development and testing on MacOS. +use std::path::PathBuf; +use std::sync::{Arc, RwLock}; + +use crate::devices::cpu::{CpuManager, PackageInfo}; +use crate::error::ZeusdError; + +pub struct RaplCpu {} + +impl RaplCpu { + pub fn init(_index: usize) -> Result { + Ok(Self {}) + } +} + +impl CpuManager for RaplCpu { + fn device_count() -> Result { + Ok(1) + } + + fn get_available_fields( + _index: usize, + ) -> Result<(Arc, Option>), ZeusdError> { + Ok(( + Arc::new(PackageInfo { + index: _index, + name: "package-0".to_string(), + energy_uj_path: PathBuf::from( + "/sys/class/powercap/intel-rapl/intel-rapl:0/energy_uj", + ), + max_energy_uj: 1000000, + num_wraparounds: RwLock::new(0), + }), + Some(Arc::new(PackageInfo { + index: _index, + name: "dram".to_string(), + energy_uj_path: PathBuf::from( + "/sys/class/powercap/intel-rapl/intel-rapl:0/intel-rapl:0:0/energy_uj", + ), + max_energy_uj: 1000000, + num_wraparounds: RwLock::new(0), + })), + )) + } + + fn get_cpu_energy(&mut self) -> Result { + Ok(10001) + } + + fn get_dram_energy(&mut self) -> Result { + Ok(1001) + } + + fn stop_monitoring(&mut self) {} + + fn is_dram_available(&self) -> bool { + true + } +} diff --git a/zeusd/src/devices/cpu/mod.rs b/zeusd/src/devices/cpu/mod.rs new file mode 100644 index 00000000..f79ee1ee --- /dev/null +++ b/zeusd/src/devices/cpu/mod.rs @@ -0,0 +1,189 @@ +// RAPL CPU +// Real RAPL interface. +#[cfg(target_os = "linux")] +mod linux; +#[cfg(target_os = "linux")] +pub use linux::RaplCpu; + +// Fake Rapl interface for dev and testing on macOS. +#[cfg(target_os = "macos")] +mod macos; +#[cfg(target_os = "macos")] +pub use macos::RaplCpu; + +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::RwLock; +use std::time::Instant; +use tokio::sync::mpsc::{Sender, UnboundedReceiver, UnboundedSender}; +use tracing::Span; + +use crate::error::ZeusdError; + +pub struct PackageInfo { + pub index: usize, + pub name: String, + pub energy_uj_path: PathBuf, + pub max_energy_uj: u64, + pub num_wraparounds: RwLock, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct RaplResponse { + pub cpu_energy_uj: Option, + pub dram_energy_uj: Option, +} + +pub trait CpuManager { + /// Get the number of CPUs available. + fn device_count() -> Result; + /// Get the CPU PackageInfo and the DRAM PackageInfo it is available. + fn get_available_fields( + index: usize, + ) -> Result<(Arc, Option>), ZeusdError>; + // Get the cumulative Rapl count value of the CPU after compensating for wraparounds. + fn get_cpu_energy(&mut self) -> Result; + // Get the cumulative Rapl count value of the DRAM after compensating for wraparounds if it is + // available. + fn get_dram_energy(&mut self) -> Result; + // Abort the monitoring tasks for CPU and DRAM if the tasks have been started. + fn stop_monitoring(&mut self); + // Check if DRAM is available. + fn is_dram_available(&self) -> bool; +} + +pub type CpuCommandRequest = ( + CpuCommand, + Option>>, + Instant, + Span, +); + +#[derive(Clone)] +pub struct CpuManagementTasks { + // Senders to the CPU management tasks. index is the CPU ID. + senders: Vec>, +} + +impl CpuManagementTasks { + pub fn start(cpus: Vec) -> Result + where + T: CpuManager + Send + 'static, + { + let mut senders = Vec::with_capacity(cpus.len()); + for (cpu_id, cpu) in cpus.into_iter().enumerate() { + // Channel to send commands to the CPU management task. + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + senders.push(tx); + // The CPU management task will automatically terminate + // when the server terminates and the last sender is dropped. + tokio::spawn(cpu_management_task(cpu, rx)); + tracing::info!("Background task for CPU {} successfully spawned", cpu_id); + } + Ok(Self { senders }) + } + + pub async fn send_command_blocking( + &self, + cpu_id: usize, + command: CpuCommand, + request_start_time: Instant, + ) -> Result { + if cpu_id >= self.senders.len() { + return Err(ZeusdError::CpuNotFoundError(cpu_id)); + } + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + self.senders[cpu_id] + .send((command, Some(tx), request_start_time, Span::current())) + .unwrap(); + match rx.recv().await { + Some(result) => result, + None => Err(ZeusdError::CpuManagementTaskTerminatedError(cpu_id)), + } + } + + pub async fn stop_monitoring(&self) -> Result<(), ZeusdError> { + for (index, sender) in self.senders.iter().enumerate() { + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + sender + .send(( + CpuCommand::StopMonitoring, + Some(tx), + Instant::now(), + Span::current(), + )) + .unwrap(); + match rx.recv().await { + Some(_) => {} + None => return Err(ZeusdError::CpuManagementTaskTerminatedError(index)), + } + } + Ok(()) + } +} + +/// A CPU command that can be executed on a CPU. +#[derive(Debug)] +pub enum CpuCommand { + /// Get the CPU and DRAM energy measurement for the CPU index + GetIndexEnergy { cpu: bool, dram: bool }, + /// Stop the monitoring task for CPU and DRAM if they have been started. + StopMonitoring, +} + +/// Tokio background task that handles requests to each CPU. +/// NOTE: Currently, this serializes the handling of request to a single CPU, which is +/// largely unnecessary as the requests are simply reading energy counters. +/// This is subject to refactoring if it is to become a bottleneck. +async fn cpu_management_task( + mut cpu: T, + mut rx: UnboundedReceiver, +) { + while let Some((command, response, start_time, span)) = rx.recv().await { + let _span_guard = span.enter(); + let result = command.execute(&mut cpu, start_time); + if let Some(response) = response { + if response.send(result).await.is_err() { + tracing::error!("Failed to send response to caller"); + } + } + } +} + +impl CpuCommand { + fn execute( + &self, + device: &mut T, + _request_arrival_time: Instant, + ) -> Result + where + T: CpuManager, + { + match *self { + Self::GetIndexEnergy { cpu, dram } => { + let cpu_energy_uj = if cpu { + Some(device.get_cpu_energy()?) + } else { + None + }; + let dram_energy_uj = if dram && device.is_dram_available() { + Some(device.get_dram_energy()?) + } else { + None + }; + Ok(RaplResponse { + cpu_energy_uj, + dram_energy_uj, + }) + } + Self::StopMonitoring {} => { + device.stop_monitoring(); + Ok(RaplResponse { + cpu_energy_uj: Some(0), + dram_energy_uj: Some(0), + }) + } + } + } +} diff --git a/zeusd/src/devices/gpu/linux.rs b/zeusd/src/devices/gpu/linux.rs index 2b372464..2f01c6c0 100644 --- a/zeusd/src/devices/gpu/linux.rs +++ b/zeusd/src/devices/gpu/linux.rs @@ -3,7 +3,7 @@ //! Note that NVML is only available on Linux. use nvml_wrapper::enums::device::GpuLockedClocksSetting; -use nvml_wrapper::{Device, Nvml}; +use nvml_wrapper::{error::NvmlError, Device, Nvml}; use crate::devices::gpu::GpuManager; use crate::error::ZeusdError; @@ -26,8 +26,18 @@ impl NvmlGpu<'static> { impl GpuManager for NvmlGpu<'static> { fn device_count() -> Result { - let nvml = Nvml::init()?; - Ok(nvml.device_count()?) + match Nvml::init() { + Ok(nvml) => match nvml.device_count() { + Ok(count) => Ok(count), + Err(e) => Err(ZeusdError::NvmlError(e)), + }, + // Specifically catch this error that is thrown when GPU is not available + Err(NvmlError::LibloadingError(e)) => { + tracing::error!("Error initializing NVML, {}", e); + Ok(0) + } + Err(e) => Err(ZeusdError::NvmlError(e)), + } } #[inline] diff --git a/zeusd/src/devices/gpu/mod.rs b/zeusd/src/devices/gpu/mod.rs index 281f54ba..58599736 100644 --- a/zeusd/src/devices/gpu/mod.rs +++ b/zeusd/src/devices/gpu/mod.rs @@ -81,7 +81,7 @@ pub struct GpuManagementTasks { impl GpuManagementTasks { /// Start GPU management tasks for the given GPUs. /// It's generic over the type of GPU manager to allow for testing. - pub fn start(gpus: Vec) -> anyhow::Result + pub fn start(gpus: Vec) -> Result where T: GpuManager + Send + 'static, { diff --git a/zeusd/src/devices/mod.rs b/zeusd/src/devices/mod.rs index eaeb673a..184db2b7 100644 --- a/zeusd/src/devices/mod.rs +++ b/zeusd/src/devices/mod.rs @@ -1,3 +1,4 @@ //! Interfaces for interacting with devices +pub mod cpu; pub mod gpu; diff --git a/zeusd/src/error.rs b/zeusd/src/error.rs index 0733dacd..c1f24af6 100644 --- a/zeusd/src/error.rs +++ b/zeusd/src/error.rs @@ -17,12 +17,20 @@ use crate::devices::gpu::GpuCommandRequest; pub enum ZeusdError { #[error("GPU index {0} does not exist.")] GpuNotFoundError(usize), + #[error("CPU index {0} does not exist.")] + CpuNotFoundError(usize), #[error("NVML error: {0}")] NvmlError(#[from] NvmlError), #[error("GPU command send error: {0}")] GpuCommandSendError(#[from] SendError), #[error("Management task for GPU {0} unexpectedly terminated while handling the request.")] GpuManagementTaskTerminatedError(usize), + #[error("Management task for CPU {0} unexpectedly terminated while handling the request.")] + CpuManagementTaskTerminatedError(usize), + #[error("Initialization for CPU {0} unexpectedly errored.")] + CpuInitializationError(usize), + #[error("IOError: {0}")] + IOError(#[from] std::io::Error), } /// This allows us to return a custom HTTP status code for each error variant. @@ -30,6 +38,7 @@ impl ResponseError for ZeusdError { fn status_code(&self) -> StatusCode { match self { ZeusdError::GpuNotFoundError(_) => StatusCode::BAD_REQUEST, + ZeusdError::CpuNotFoundError(_) => StatusCode::BAD_REQUEST, ZeusdError::NvmlError(e) => match e { NvmlError::NoPermission => StatusCode::FORBIDDEN, NvmlError::InvalidArg => StatusCode::BAD_REQUEST, @@ -37,6 +46,9 @@ impl ResponseError for ZeusdError { }, ZeusdError::GpuCommandSendError(_) => StatusCode::INTERNAL_SERVER_ERROR, ZeusdError::GpuManagementTaskTerminatedError(_) => StatusCode::INTERNAL_SERVER_ERROR, + ZeusdError::CpuManagementTaskTerminatedError(_) => StatusCode::INTERNAL_SERVER_ERROR, + ZeusdError::CpuInitializationError(_) => StatusCode::INTERNAL_SERVER_ERROR, + ZeusdError::IOError(_) => StatusCode::INTERNAL_SERVER_ERROR, } } } diff --git a/zeusd/src/main.rs b/zeusd/src/main.rs index 9030173a..47cd6bc6 100644 --- a/zeusd/src/main.rs +++ b/zeusd/src/main.rs @@ -4,8 +4,8 @@ use std::net::TcpListener; use zeusd::config::{get_config, ConnectionMode}; use zeusd::startup::{ - ensure_root, get_unix_listener, init_tracing, start_device_tasks, start_server_tcp, - start_server_uds, + ensure_root, get_unix_listener, init_tracing, start_cpu_device_tasks, start_gpu_device_tasks, + start_server_tcp, start_server_uds, }; #[tokio::main] @@ -19,7 +19,8 @@ async fn main() -> anyhow::Result<()> { ensure_root()?; } - let device_tasks = start_device_tasks()?; + let gpu_device_tasks = start_gpu_device_tasks()?; + let cpu_device_tasks = start_cpu_device_tasks()?; tracing::info!("Started all device tasks"); let num_workers = config.num_workers.unwrap_or_else(|| { @@ -37,16 +38,29 @@ async fn main() -> anyhow::Result<()> { )?; tracing::info!("Listening on {}", &config.socket_path); - start_server_uds(listener, device_tasks, num_workers)?.await?; + start_server_uds( + listener, + gpu_device_tasks, + cpu_device_tasks.clone(), + num_workers, + )? + .await?; } ConnectionMode::TCP => { let listener = TcpListener::bind(&config.tcp_bind_address)?; tracing::info!("Listening on {}", &listener.local_addr()?); - start_server_tcp(listener, device_tasks, num_workers)?.await?; + start_server_tcp( + listener, + gpu_device_tasks, + cpu_device_tasks.clone(), + num_workers, + )? + .await?; } } + let _ = cpu_device_tasks.stop_monitoring().await; Ok(()) } diff --git a/zeusd/src/routes/cpu.rs b/zeusd/src/routes/cpu.rs new file mode 100644 index 00000000..092c4c74 --- /dev/null +++ b/zeusd/src/routes/cpu.rs @@ -0,0 +1,53 @@ +//! Routes for interacting with CPUs + +use actix_web::{web, HttpResponse}; +use serde::{Deserialize, Serialize}; +use std::time::Instant; + +use crate::devices::cpu::{CpuCommand, CpuManagementTasks}; +use crate::error::ZeusdError; + +#[derive(Serialize, Deserialize, Debug)] +pub struct GetIndexEnergy { + pub cpu: bool, + pub dram: bool, +} + +impl From for CpuCommand { + fn from(_request: GetIndexEnergy) -> Self { + CpuCommand::GetIndexEnergy { + cpu: _request.cpu, + dram: _request.dram, + } + } +} + +#[actix_web::post("/{cpu_id}/get_index_energy")] +#[tracing::instrument( + skip(cpu_id, request, _device_tasks), + fields( + cpu_id = %cpu_id, + cpu = %request.cpu, + dram = %request.dram, + ) +)] +async fn get_index_energy_handler( + cpu_id: web::Path, + request: web::Json, + _device_tasks: web::Data, +) -> Result { + let now = Instant::now(); + tracing::info!("Received request"); + let cpu_id = cpu_id.into_inner(); + let request = request.into_inner(); + + let measurement = _device_tasks + .send_command_blocking(cpu_id, request.into(), now) + .await?; + + Ok(HttpResponse::Ok().json(measurement)) +} + +pub fn cpu_routes(cfg: &mut web::ServiceConfig) { + cfg.service(get_index_energy_handler); +} diff --git a/zeusd/src/routes/gpu.rs b/zeusd/src/routes/gpu.rs index 46f002a3..73b696d4 100644 --- a/zeusd/src/routes/gpu.rs +++ b/zeusd/src/routes/gpu.rs @@ -13,18 +13,18 @@ use crate::error::ZeusdError; /// This macro takes /// - the API name (set_power_limit, set_persistence_mode, etc.), /// - the method and path for the request handler, -/// - and a list of `field name ` pairs of the corresponding `GpuCommand` variant. +/// - and a list of `field name: type` pairs of the corresponding `GpuCommand` variant. /// /// Gien this, the macro generates /// - a request payload struct named API name (e.g., SetPowerLimit) and all the -/// fields specified plus `block: bool` to indicate whether the request should block, +/// fields specified plus `block: bool` to indicate whether the request should block, /// - an implementation of `From` for the payload struct to convert it to the /// - a handler function that takes the request payload, converts it to a `GpuCommand` variant, -/// and sends it to the `GpuManagementTasks` actor. +/// and sends it to the `GpuManagementTasks` actor. /// /// Assumptions: /// - The `GpuCommand` variant name is the same as the API name, but the former is camel case -/// and the latter is snake case (e.g., SetPowerLimit vs. set_power_limit). +/// and the latter is snake case (e.g., SetPowerLimit vs. set_power_limit). macro_rules! impl_handler_for_gpu_command { ($api:ident, $path:expr, $($field:ident: $ftype:ty,)*) => { paste! { diff --git a/zeusd/src/routes/mod.rs b/zeusd/src/routes/mod.rs index fae1350b..9a781a03 100644 --- a/zeusd/src/routes/mod.rs +++ b/zeusd/src/routes/mod.rs @@ -1,5 +1,7 @@ //! Routes and handlers for interacting with devices +pub mod cpu; pub mod gpu; +pub use cpu::cpu_routes; pub use gpu::gpu_routes; diff --git a/zeusd/src/startup.rs b/zeusd/src/startup.rs index 878b2569..04c14d0a 100644 --- a/zeusd/src/startup.rs +++ b/zeusd/src/startup.rs @@ -12,7 +12,9 @@ use tracing_subscriber::fmt::MakeWriter; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::{EnvFilter, Registry}; +use crate::devices::cpu::{CpuManagementTasks, CpuManager, RaplCpu}; use crate::devices::gpu::{GpuManagementTasks, GpuManager, NvmlGpu}; +use crate::routes::cpu_routes; use crate::routes::gpu_routes; /// Initialize tracing with the given where to write logs to. @@ -51,7 +53,7 @@ pub fn get_unix_listener( } /// Initialize NVML and start GPU management tasks. -pub fn start_device_tasks() -> anyhow::Result { +pub fn start_gpu_device_tasks() -> anyhow::Result { tracing::info!("Starting NVML and GPU management tasks."); let num_gpus = NvmlGpu::device_count()?; let mut gpus = Vec::with_capacity(num_gpus as usize); @@ -60,7 +62,19 @@ pub fn start_device_tasks() -> anyhow::Result { tracing::info!("Initialized NVML for GPU {}", gpu_id); gpus.push(gpu); } - GpuManagementTasks::start(gpus) + Ok(GpuManagementTasks::start(gpus)?) +} + +pub fn start_cpu_device_tasks() -> anyhow::Result { + tracing::info!("Starting Rapl and CPU management tasks."); + let num_cpus = RaplCpu::device_count()?; + let mut cpus = Vec::with_capacity(num_cpus); + for cpu_id in 0..num_cpus { + let cpu = RaplCpu::init(cpu_id)?; + tracing::info!("Initialized RAPL for CPU {}", cpu_id); + cpus.push(cpu); + } + Ok(CpuManagementTasks::start(cpus)?) } /// Ensure the daemon is running as root. @@ -78,14 +92,17 @@ pub fn ensure_root() -> anyhow::Result<()> { /// Set up routing and start the server on a unix domain socket. pub fn start_server_uds( listener: UnixListener, - device_tasks: GpuManagementTasks, + gpu_device_tasks: GpuManagementTasks, + cpu_device_tasks: CpuManagementTasks, num_workers: usize, ) -> std::io::Result { let server = HttpServer::new(move || { App::new() .wrap(tracing_actix_web::TracingLogger::default()) .service(web::scope("/gpu").configure(gpu_routes)) - .app_data(web::Data::new(device_tasks.clone())) + .service(web::scope("/cpu").configure(cpu_routes)) + .app_data(web::Data::new(gpu_device_tasks.clone())) + .app_data(web::Data::new(cpu_device_tasks.clone())) }) .workers(num_workers) .listen_uds(listener)? @@ -97,14 +114,17 @@ pub fn start_server_uds( /// Set up routing and start the server over TCP. pub fn start_server_tcp( listener: TcpListener, - device_tasks: GpuManagementTasks, + gpu_device_tasks: GpuManagementTasks, + cpu_device_tasks: CpuManagementTasks, num_workers: usize, ) -> std::io::Result { let server = HttpServer::new(move || { App::new() .wrap(tracing_actix_web::TracingLogger::default()) .service(web::scope("/gpu").configure(gpu_routes)) - .app_data(web::Data::new(device_tasks.clone())) + .service(web::scope("/cpu").configure(cpu_routes)) + .app_data(web::Data::new(gpu_device_tasks.clone())) + .app_data(web::Data::new(cpu_device_tasks.clone())) }) .workers(num_workers) .listen(listener)? diff --git a/zeusd/tests/cpu.rs b/zeusd/tests/cpu.rs new file mode 100644 index 00000000..35c5a92b --- /dev/null +++ b/zeusd/tests/cpu.rs @@ -0,0 +1,156 @@ +mod helpers; + +use zeusd::devices::cpu::RaplResponse; +use zeusd::routes::cpu::GetIndexEnergy; + +use crate::helpers::{TestApp, ZeusdRequest}; + +#[tokio::test] +async fn test_only_cpu_measuremnt() { + let mut app = TestApp::start().await; + let measurements: Vec = vec![10000, 10001, 12313, 8213, 0]; + app.set_cpu_energy_measurements(0, &measurements); + + for expected in measurements { + let resp = app + .send( + 0, + GetIndexEnergy { + cpu: true, + dram: false, + }, + ) + .await + .expect("Failed to send request"); + assert_eq!(resp.status(), 200); + let rapl_response: RaplResponse = serde_json::from_str(&resp.text().await.unwrap()) + .expect("Failed to deserialize response body"); + assert_eq!(rapl_response.cpu_energy_uj.unwrap(), expected); + assert_eq!(rapl_response.dram_energy_uj, None); + } +} + +#[tokio::test] +async fn test_only_dram_measuremnt() { + let mut app = TestApp::start().await; + let measurements: Vec = vec![10000, 10001, 12313, 8213, 0]; + app.set_dram_energy_measurements(0, &measurements); + + for expected in measurements { + let resp = app + .send( + 0, + GetIndexEnergy { + cpu: false, + dram: true, + }, + ) + .await + .expect("Failed to send request"); + assert_eq!(resp.status(), 200); + let rapl_response: RaplResponse = serde_json::from_str(&resp.text().await.unwrap()) + .expect("Failed to deserialiez response body"); + assert_eq!(rapl_response.cpu_energy_uj, None); + assert_eq!(rapl_response.dram_energy_uj.unwrap(), expected); + } +} + +#[tokio::test] +async fn test_both_measuremnt() { + let mut app = TestApp::start().await; + let measurements: Vec = vec![10000, 10001, 12313, 8213, 0]; + app.set_cpu_energy_measurements(0, &measurements); + app.set_dram_energy_measurements(0, &measurements); + + for expected in measurements { + let resp = app + .send( + 0, + GetIndexEnergy { + cpu: true, + dram: true, + }, + ) + .await + .expect("Failed to send request"); + assert_eq!(resp.status(), 200); + let rapl_response: RaplResponse = serde_json::from_str(&resp.text().await.unwrap()) + .expect("Failed to deserialiez response body"); + assert_eq!(rapl_response.cpu_energy_uj.unwrap(), expected); + assert_eq!(rapl_response.dram_energy_uj.unwrap(), expected); + } +} + +#[tokio::test] +async fn test_invalid_requests() { + let app = TestApp::start().await; + + let client = reqwest::Client::new(); + let url = GetIndexEnergy::build_url(&app, 0); + let resp = client + .post(url) + .json(&serde_json::json!( + { + "cpu": true, // Missing dram field + } + )) + .send() + .await + .expect("Failed to send request"); + assert_eq!(resp.status(), 400); + + let url = GetIndexEnergy::build_url(&app, 0); + let resp = client + .post(url) + .json(&serde_json::json!( + { + "dram": true, // Missing cpu field + } + )) + .send() + .await + .expect("Failed to send request"); + assert_eq!(resp.status(), 400); + + let url = GetIndexEnergy::build_url(&app, 0); + let resp = client + .post(url) + .json(&serde_json::json!( + { + "cpu": "true", //Invalid type + "dram": true, + } + )) + .send() + .await + .expect("Failed to send request"); + assert_eq!(resp.status(), 400); + + let url = GetIndexEnergy::build_url(&app, 2); // Out of index CPU + let resp = client + .post(url) + .json(&serde_json::json!( + { + "cp": true, // Invalid field name + "dram": true, + } + )) + .send() + .await + .expect("Failed to send request"); + assert_eq!(resp.status(), 400); + + let url = GetIndexEnergy::build_url(&app, 2); // Out of index CPU + let resp = client + .post(url) + .json(&serde_json::json!( + { + "cpu": true, + "dram": true, + } + )) + .send() + .await + .expect("Failed to send request"); + assert_eq!(resp.status(), 400); +} diff --git a/zeusd/tests/helpers/mod.rs b/zeusd/tests/helpers/mod.rs index 8e6e9860..610f1ca3 100644 --- a/zeusd/tests/helpers/mod.rs +++ b/zeusd/tests/helpers/mod.rs @@ -8,13 +8,18 @@ use once_cell::sync::Lazy; use paste::paste; use std::future::Future; use std::net::TcpListener; +use std::path::PathBuf; +use std::sync::{Arc, RwLock}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use zeusd::devices::cpu::{CpuManagementTasks, CpuManager, PackageInfo}; use zeusd::devices::gpu::{GpuManagementTasks, GpuManager}; use zeusd::error::ZeusdError; use zeusd::startup::{init_tracing, start_server_tcp}; static NUM_GPUS: u32 = 4; +static NUM_CPUS: usize = 1; + static TRACING: Lazy<()> = Lazy::new(|| { if std::env::var("TEST_LOG").is_ok() { init_tracing(std::io::stdout).expect("Failed to initialize tracing"); @@ -117,7 +122,79 @@ impl GpuManager for TestGpu { } } -pub fn start_test_tasks() -> anyhow::Result<(GpuManagementTasks, Vec)> { +pub struct TestCpu { + pub cpu: UnboundedReceiver, + pub dram: UnboundedReceiver, +} + +pub struct TestCpuInjector { + pub cpu: UnboundedSender, + pub dram: UnboundedSender, +} + +impl TestCpu { + fn init(_index: usize) -> Result<(Self, TestCpuInjector), ZeusdError> { + let (cpu_sender, cpu_receiver) = tokio::sync::mpsc::unbounded_channel(); + let (dram_sender, dram_receiver) = tokio::sync::mpsc::unbounded_channel(); + Ok(( + TestCpu { + cpu: cpu_receiver, + dram: dram_receiver, + }, + TestCpuInjector { + cpu: cpu_sender, + dram: dram_sender, + }, + )) + } +} + +impl CpuManager for TestCpu { + fn device_count() -> Result { + Ok(1) + } + + fn get_available_fields( + _index: usize, + ) -> Result<(Arc, Option>), ZeusdError> { + Ok(( + Arc::new(PackageInfo { + index: _index, + name: "package-0".to_string(), + energy_uj_path: PathBuf::from( + "/sys/class/powercap/intel-rapl/intel-rapl:0/energy_uj", + ), + max_energy_uj: 1000000, + num_wraparounds: RwLock::new(0), + }), + Some(Arc::new(PackageInfo { + index: _index, + name: "dram".to_string(), + energy_uj_path: PathBuf::from( + "/sys/class/powercap/intel-rapl/intel-rapl:0/intel-rapl:0:0/energy_uj", + ), + max_energy_uj: 1000000, + num_wraparounds: RwLock::new(0), + })), + )) + } + + fn get_cpu_energy(&mut self) -> Result { + Ok(self.cpu.try_recv().ok().unwrap()) + } + + fn get_dram_energy(&mut self) -> Result { + Ok(self.dram.try_recv().ok().unwrap()) + } + + fn stop_monitoring(&mut self) {} + + fn is_dram_available(&self) -> bool { + true + } +} + +pub fn start_gpu_test_tasks() -> anyhow::Result<(GpuManagementTasks, Vec)> { let mut gpus = Vec::with_capacity(4); let mut observers = Vec::with_capacity(4); for _ in 0..4 { @@ -131,12 +208,24 @@ pub fn start_test_tasks() -> anyhow::Result<(GpuManagementTasks, Vec anyhow::Result<(CpuManagementTasks, Vec)> { + let mut cpus = Vec::with_capacity(NUM_CPUS); + let mut injectors = Vec::with_capacity(NUM_CPUS); + for i in 0..NUM_CPUS { + let (cpu, cpu_injector) = TestCpu::init(i)?; + cpus.push(cpu); + injectors.push(cpu_injector) + } + let tasks = CpuManagementTasks::start(cpus)?; + Ok((tasks, injectors)) +} + /// A helper trait for building URLs to send requests to. pub trait ZeusdRequest: serde::Serialize { fn build_url(app: &TestApp, gpu_id: u32) -> String; } -macro_rules! impl_zeusd_request { +macro_rules! impl_zeusd_request_gpu { ($api:ident) => { paste! { impl ZeusdRequest for zeusd::routes::gpu::[<$api:camel>] { @@ -151,35 +240,57 @@ macro_rules! impl_zeusd_request { }; } -impl_zeusd_request!(SetPersistenceMode); -impl_zeusd_request!(SetPowerLimit); -impl_zeusd_request!(SetGpuLockedClocks); -impl_zeusd_request!(ResetGpuLockedClocks); -impl_zeusd_request!(SetMemLockedClocks); -impl_zeusd_request!(ResetMemLockedClocks); +macro_rules! impl_zeusd_request_cpu { + ($api:ident) => { + paste! { + impl ZeusdRequest for zeusd::routes::cpu::[<$api:camel>] { + fn build_url(app: &TestApp, cpu_id: u32) -> String { + format!( + "http://127.0.0.1:{}/cpu/{}/{}", + app.port, cpu_id, stringify!([<$api:snake>]), + ) + } + } + } + }; +} +impl_zeusd_request_gpu!(SetPersistenceMode); +impl_zeusd_request_gpu!(SetPowerLimit); +impl_zeusd_request_gpu!(SetGpuLockedClocks); +impl_zeusd_request_gpu!(ResetGpuLockedClocks); +impl_zeusd_request_gpu!(SetMemLockedClocks); +impl_zeusd_request_gpu!(ResetMemLockedClocks); + +impl_zeusd_request_cpu!(GetIndexEnergy); /// A test application that starts a server over TCP and provides helper methods /// for sending requests and fetching what happened to the fake GPUs. pub struct TestApp { port: u16, observers: Vec, + cpu_injectors: Vec, } impl TestApp { pub async fn start() -> Self { Lazy::force(&TRACING); - let (test_tasks, test_gpu_observers) = - start_test_tasks().expect("Failed to start test tasks"); + let (gpu_test_tasks, test_gpu_observers) = + start_gpu_test_tasks().expect("Failed to start gpu test tasks"); + + let (cpu_test_tasks, cpu_test_injectors) = + start_cpu_test_tasks().expect("Failed to start cpu test tasks"); let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind TCP listener"); let port = listener.local_addr().unwrap().port(); - let server = start_server_tcp(listener, test_tasks, 8).expect("Failed to start server"); + let server = start_server_tcp(listener, gpu_test_tasks, cpu_test_tasks, 2) + .expect("Failed to start server"); let _ = tokio::spawn(async move { server.await }); TestApp { port, observers: test_gpu_observers, + cpu_injectors: cpu_test_injectors, } } @@ -213,4 +324,16 @@ impl TestApp { let rx = &mut self.observers[gpu_id].mem_locked_clocks_rx; std::iter::from_fn(|| rx.try_recv().ok()).collect() } + + pub fn set_cpu_energy_measurements(&mut self, cpu_id: usize, measurements: &Vec) { + for measurement in measurements { + self.cpu_injectors[cpu_id].cpu.send(*measurement).unwrap(); + } + } + + pub fn set_dram_energy_measurements(&mut self, cpu_id: usize, measurements: &Vec) { + for measurement in measurements { + self.cpu_injectors[cpu_id].dram.send(*measurement).unwrap(); + } + } }