Skip to content

Commit

Permalink
direct save NodeUsage in annotation value
Browse files Browse the repository at this point in the history
Signed-off-by: Johnson Shih <[email protected]>
  • Loading branch information
johnsonshih committed Jul 26, 2023
1 parent 3fbda5c commit b4f259e
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 135 deletions.
138 changes: 26 additions & 112 deletions agent/src/util/crictl_containers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use akri_shared::akri::{
instance::device_usage::DeviceUsageKind, AKRI_SLOT_ANNOTATION_NAME_PREFIX,
};
use akri_shared::akri::{instance::device_usage::NodeUsage, AKRI_SLOT_ANNOTATION_NAME_PREFIX};
use std::collections::HashMap;
use std::str::FromStr;

Expand All @@ -18,96 +16,8 @@ struct CriCtlContainer {
annotations: HashMap<String, String>,
}

#[derive(Debug, PartialEq, Eq)]
pub struct ParseSlotUsageError;
#[derive(PartialEq, Clone, Debug, Default)]
pub struct SlotUsage {
kind: DeviceUsageKind,
slot_id: String,
}

impl std::fmt::Display for SlotUsage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.kind {
DeviceUsageKind::Free => write!(f, ""),
DeviceUsageKind::Configuration(vdev_id) => {
write!(f, "C:{}:{}", vdev_id, self.slot_id)
}
DeviceUsageKind::Instance => write!(f, "{}", self.slot_id),
}
}
}

impl std::str::FromStr for SlotUsage {
type Err = ParseSlotUsageError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.is_empty() {
return Ok(SlotUsage {
kind: DeviceUsageKind::Free,
slot_id: s.to_string(),
});
}

// Format "C:<vdev_id>:<slot_id>"
if let Some((vdev_id, slot_id)) = s.strip_prefix("C:").and_then(|s| s.split_once(':')) {
if slot_id.is_empty() {
return Err(ParseSlotUsageError);
}
return Ok(SlotUsage {
kind: DeviceUsageKind::Configuration(vdev_id.to_string()),
slot_id: slot_id.to_string(),
});
}

// Format "<usage_name>"
Ok(SlotUsage {
kind: DeviceUsageKind::Instance,
slot_id: s.to_string(),
})
}
}

impl SlotUsage {
pub fn create(kind: &DeviceUsageKind, slot_id: &str) -> Result<Self, anyhow::Error> {
match kind {
DeviceUsageKind::Free => {
if !slot_id.is_empty() {
return Err(anyhow::anyhow!(
"Invalid input parameter, slot name: {} provided for free slot usage",
slot_id
));
};
}
_ => {
if slot_id.is_empty() {
return Err(anyhow::anyhow!(
"Invalid input parameter, no slot provided for slot usage"
));
};
}
};

Ok(Self {
kind: kind.clone(),
slot_id: slot_id.to_string(),
})
}

pub fn get_kind(&self) -> DeviceUsageKind {
self.kind.clone()
}

pub fn get_slot_id(&self) -> String {
self.slot_id.clone()
}

pub fn is_same_slot(&self, slot: &str) -> bool {
self.slot_id == slot
}
}

/// This gets the usage slots for an instance by getting the annotations that were stored at id `AKRI_SLOT_ANNOTATION_NAME_PREFIX` during allocate.
pub fn get_container_slot_usage(crictl_output: &str) -> HashMap<String, DeviceUsageKind> {
pub fn get_container_slot_usage(crictl_output: &str) -> HashMap<String, NodeUsage> {
match serde_json::from_str::<CriCtlOutput>(crictl_output) {
Ok(crictl_output_parsed) => crictl_output_parsed
.containers
Expand All @@ -118,14 +28,8 @@ pub fn get_container_slot_usage(crictl_output: &str) -> HashMap<String, DeviceUs
let slot_id = key
.strip_prefix(AKRI_SLOT_ANNOTATION_NAME_PREFIX)
.unwrap_or_default();
match SlotUsage::from_str(value) {
Ok(slot_usage) => {
if slot_usage.is_same_slot(slot_id) {
Some((slot_usage.get_slot_id(), slot_usage.get_kind()))
} else {
None
}
}
match NodeUsage::from_str(value) {
Ok(node_usage) => Some((slot_id.to_string(), node_usage)),
Err(_) => None,
}
} else {
Expand All @@ -147,6 +51,7 @@ pub fn get_container_slot_usage(crictl_output: &str) -> HashMap<String, DeviceUs
#[cfg(test)]
mod tests {
use super::*;
use akri_shared::akri::instance::device_usage::DeviceUsageKind;

fn get_container_str(annotation: &str) -> String {
format!("{{ \
Expand Down Expand Up @@ -186,60 +91,69 @@ mod tests {

// Empty output
assert_eq!(
HashMap::<String, DeviceUsageKind>::new(),
HashMap::<String, NodeUsage>::new(),
get_container_slot_usage(r#""#)
);
// Empty json output
assert_eq!(
HashMap::<String, DeviceUsageKind>::new(),
HashMap::<String, NodeUsage>::new(),
get_container_slot_usage(r#"{}"#)
);
// Expected output with no containers
assert_eq!(
HashMap::<String, DeviceUsageKind>::new(),
HashMap::<String, NodeUsage>::new(),
get_container_slot_usage(r#"{\"containers\": []}"#)
);
// Output with syntax error
assert_eq!(
HashMap::<String, DeviceUsageKind>::new(),
HashMap::<String, NodeUsage>::new(),
get_container_slot_usage(r#"{ddd}"#)
); // syntax error
// Expected output with no slot
assert_eq!(
HashMap::<String, DeviceUsageKind>::new(),
HashMap::<String, NodeUsage>::new(),
get_container_slot_usage(&format!(
"{{ \"containers\": [ {} ] }}",
&get_container_str("")
))
);
// Expected output with slot (including unexpected property)
let mut expected = HashMap::new();
expected.insert("foo".to_string(), DeviceUsageKind::Instance);
expected.insert(
"foo".to_string(),
NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(),
);
assert_eq!(
expected,
get_container_slot_usage(&format!(
"{{ \"ddd\": \"\", \"containers\": [ {} ] }}",
&get_container_str("\"akri.agent.slot-foo\": \"foo\",")
&get_container_str("\"akri.agent.slot-foo\": \"node-a\",")
))
);
// Expected output with slot
assert_eq!(
expected,
get_container_slot_usage(&format!(
"{{ \"containers\": [ {} ] }}",
&get_container_str("\"akri.agent.slot-foo\": \"foo\",")
&get_container_str("\"akri.agent.slot-foo\": \"node-a\",")
))
);
// Expected output with multiple containers
let mut expected_2 = HashMap::new();
expected_2.insert("foo1".to_string(), DeviceUsageKind::Instance);
expected_2.insert("foo2".to_string(), DeviceUsageKind::Instance);
expected_2.insert(
"foo1".to_string(),
NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(),
);
expected_2.insert(
"foo2".to_string(),
NodeUsage::create(&DeviceUsageKind::Instance, "node-b").unwrap(),
);
assert_eq!(
expected_2,
get_container_slot_usage(&format!(
"{{ \"containers\": [ {}, {} ] }}",
&get_container_str("\"akri.agent.slot-foo1\": \"foo1\","),
&get_container_str("\"akri.agent.slot-foo2\": \"foo2\","),
&get_container_str("\"akri.agent.slot-foo1\": \"node-a\","),
&get_container_str("\"akri.agent.slot-foo2\": \"node-b\","),
))
);
}
Expand Down
7 changes: 3 additions & 4 deletions agent/src/util/device_plugin_service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::constants::{
HEALTHY, KUBELET_UPDATE_CHANNEL_CAPACITY, LIST_AND_WATCH_SLEEP_SECS, UNHEALTHY,
};
use super::crictl_containers::SlotUsage;
use super::v1beta1;
use super::v1beta1::{
device_plugin_server::DevicePlugin, AllocateRequest, AllocateResponse, DevicePluginOptions,
Expand Down Expand Up @@ -408,11 +407,11 @@ impl InstanceDevicePlugin {
return Err(e);
}

let slot_usage =
SlotUsage::create(&DeviceUsageKind::Instance, &device_usage_id).unwrap();
let node_usage =
NodeUsage::create(&DeviceUsageKind::Instance, &dps.node_name).unwrap();
akri_annotations.insert(
format!("{}{}", AKRI_SLOT_ANNOTATION_NAME_PREFIX, &device_usage_id),
slot_usage.to_string(),
node_usage.to_string(),
);

// Add suffix _<instance_id> to each device property
Expand Down
63 changes: 44 additions & 19 deletions agent/src/util/slot_reconciliation.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{constants::SLOT_RECONCILIATION_CHECK_DELAY_SECS, crictl_containers};
use akri_shared::akri::instance::device_usage::{DeviceUsageKind, NodeUsage};
use akri_shared::akri::instance::device_usage::NodeUsage;
use akri_shared::{akri::instance::InstanceSpec, k8s::KubeInterface};
use async_trait::async_trait;
use k8s_openapi::api::core::v1::PodStatus;
Expand All @@ -14,7 +14,7 @@ use std::{
use tokio::process::Command;

type SlotQueryResult =
Result<HashMap<String, DeviceUsageKind>, Box<dyn std::error::Error + Send + Sync + 'static>>;
Result<HashMap<String, NodeUsage>, Box<dyn std::error::Error + Send + Sync + 'static>>;

#[cfg_attr(test, automock)]
#[async_trait]
Expand Down Expand Up @@ -179,12 +179,12 @@ impl DevicePluginSlotReconciler {
// there is a container using that slot on this node
node_slot_usage
.get_key_value(k)
.map(|(slot, kind)| (slot.to_string(), kind.clone()))
.map(|(slot, node_usage)| (slot.to_string(), node_usage.clone()))
} else {
None
}
})
.collect::<HashMap<String, DeviceUsageKind>>();
.collect::<HashMap<String, NodeUsage>>();

// Check Instance to find slots that are registered to this node, but
// there is no actual pod using the slot. We should update the Instance
Expand Down Expand Up @@ -256,10 +256,7 @@ impl DevicePluginSlotReconciler {
// Restore usage because there have been
// cases where a Pod is running (which corresponds
// to an Allocate call, but the Instance slot is empty.
let usage_kind = slots_missing_this_node_name.get(slot).unwrap();
NodeUsage::create(usage_kind, node_name)
.unwrap()
.to_string()
slots_missing_this_node_name.get(slot).unwrap().to_string()
} else if slots_to_clean.contains(slot) {
// Set usage to free because there is no
// Deallocate message from kubelet for us to know
Expand Down Expand Up @@ -368,13 +365,14 @@ pub async fn periodic_slot_reconciliation(
#[cfg(test)]
mod reconcile_tests {
use super::*;
use akri_shared::akri::instance::device_usage::DeviceUsageKind;
use akri_shared::{akri::instance::InstanceList, k8s::MockKubeInterface, os::file};
use k8s_openapi::api::core::v1::Pod;
use kube::api::ObjectList;

fn configure_get_node_slots(
mock: &mut MockSlotQuery,
result: HashMap<String, DeviceUsageKind>,
result: HashMap<String, NodeUsage>,
error: bool,
) {
mock.expect_get_node_slots().times(1).returning(move || {
Expand Down Expand Up @@ -410,7 +408,7 @@ mod reconcile_tests {
}

struct NodeSlots {
node_slots: HashMap<String, DeviceUsageKind>,
node_slots: HashMap<String, NodeUsage>,
node_slots_error: bool,
}

Expand Down Expand Up @@ -530,8 +528,14 @@ mod reconcile_tests {

let grace_period = Duration::from_millis(100);
let mut node_slots = HashMap::new();
node_slots.insert("config-a-359973-3".to_string(), DeviceUsageKind::Instance);
node_slots.insert("config-a-359973-5".to_string(), DeviceUsageKind::Instance);
node_slots.insert(
"config-a-359973-3".to_string(),
NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(),
);
node_slots.insert(
"config-a-359973-5".to_string(),
NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(),
);
configure_scnenario(
// slot_query to identify one slot used by this node
NodeSlots {
Expand Down Expand Up @@ -569,7 +573,10 @@ mod reconcile_tests {

let grace_period = Duration::from_millis(100);
let mut node_slots = HashMap::new();
node_slots.insert("config-a-359973-3".to_string(), DeviceUsageKind::Instance);
node_slots.insert(
"config-a-359973-3".to_string(),
NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(),
);
configure_scnenario(
// slot_query to identify one slot used by this node
NodeSlots {
Expand Down Expand Up @@ -629,7 +636,10 @@ mod reconcile_tests {

let grace_period = Duration::from_millis(100);
let mut node_slots = HashMap::new();
node_slots.insert("config-a-359973-3".to_string(), DeviceUsageKind::Instance);
node_slots.insert(
"config-a-359973-3".to_string(),
NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(),
);
configure_scnenario(
// slot_query to identify one slot used by this node
NodeSlots {
Expand Down Expand Up @@ -658,8 +668,14 @@ mod reconcile_tests {
std::thread::sleep(grace_period);

let mut node_slots_added = HashMap::new();
node_slots_added.insert("config-a-359973-3".to_string(), DeviceUsageKind::Instance);
node_slots_added.insert("config-a-359973-5".to_string(), DeviceUsageKind::Instance);
node_slots_added.insert(
"config-a-359973-3".to_string(),
NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(),
);
node_slots_added.insert(
"config-a-359973-5".to_string(),
NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(),
);
configure_scnenario(
// slot_query to identify one slot used by this node
NodeSlots {
Expand Down Expand Up @@ -692,7 +708,10 @@ mod reconcile_tests {

let grace_period = Duration::from_millis(100);
let mut node_slots = HashMap::new();
node_slots.insert("config-a-359973-3".to_string(), DeviceUsageKind::Instance);
node_slots.insert(
"config-a-359973-3".to_string(),
NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(),
);
configure_scnenario(
// slot_query to identify one slot used by this node
NodeSlots {
Expand Down Expand Up @@ -721,8 +740,14 @@ mod reconcile_tests {
std::thread::sleep(grace_period);

let mut node_slots_added = HashMap::new();
node_slots_added.insert("config-a-359973-1".to_string(), DeviceUsageKind::Instance);
node_slots_added.insert("config-a-359973-3".to_string(), DeviceUsageKind::Instance);
node_slots_added.insert(
"config-a-359973-1".to_string(),
NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(),
);
node_slots_added.insert(
"config-a-359973-3".to_string(),
NodeUsage::create(&DeviceUsageKind::Instance, "node-a").unwrap(),
);
configure_scnenario(
// slot_query to identify two slots used by this node
NodeSlots {
Expand Down

0 comments on commit b4f259e

Please sign in to comment.