diff --git a/Cargo.lock b/Cargo.lock index 99a300a6a..72504aecb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -48,6 +48,7 @@ dependencies = [ "serde_yaml", "sxd-document", "sxd-xpath", + "tempfile", "tokio 0.2.21", "tokio-core", "tonic", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index c7de47a96..d9731be89 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -36,6 +36,7 @@ serde_derive = "1.0.104" akri-shared = { path = "../shared" } sxd-document = "0.3.0" sxd-xpath = "0.4.0" +tempfile = "3.1.0" tokio = { version = "0.2", features = ["full"] } tokio-core = "0.1" tonic = "0.1" diff --git a/agent/src/util/config_action.rs b/agent/src/util/config_action.rs index 3a0281841..ac01bbfa0 100644 --- a/agent/src/util/config_action.rs +++ b/agent/src/util/config_action.rs @@ -1,6 +1,8 @@ use super::super::protocols; use super::{ - constants::{DISCOVERY_DELAY_SECS, SHARED_INSTANCE_OFFLINE_GRACE_PERIOD_SECS}, + constants::{ + DEVICE_PLUGIN_PATH, DISCOVERY_DELAY_SECS, SHARED_INSTANCE_OFFLINE_GRACE_PERIOD_SECS, + }, device_plugin_service, device_plugin_service::{ get_device_instance_name, ConnectivityStatus, InstanceInfo, InstanceMap, @@ -184,6 +186,7 @@ async fn handle_config_add( &kube_interface, stop_discovery_receiver, finished_discovery_sender, + DEVICE_PLUGIN_PATH, ) .await .unwrap(); @@ -322,6 +325,7 @@ impl PeriodicDiscovery { kube_interface: &impl KubeInterface, mut stop_discovery_receiver: mpsc::Receiver<()>, finished_discovery_sender: broadcast::Sender<()>, + device_plugin_path: &str, ) -> Result<(), Box> { trace!( "do_periodic_discovery - start for config {}", @@ -373,6 +377,7 @@ impl PeriodicDiscovery { shared, instance_properties, instance_map, + device_plugin_path, ) .await { @@ -495,6 +500,7 @@ mod config_action_tests { use akri_shared::k8s::test_kube::MockKubeImpl; use protocols::debug_echo::{DEBUG_ECHO_AVAILABILITY_CHECK_PATH, OFFLINE}; use std::{env, fs}; + use tempfile::Builder; use tokio::sync::broadcast; async fn build_instance_map( @@ -767,8 +773,16 @@ mod config_action_tests { config_protocol: protocol, instance_map: instance_map_clone, }; + let device_plugin_temp_dir = + Builder::new().prefix("device-plugins-").tempdir().unwrap(); + let device_plugin_temp_dir_path = device_plugin_temp_dir.path().to_str().unwrap(); periodic_dicovery - .do_periodic_discovery(&mock, watch_periph_rx, finished_watching_tx) + .do_periodic_discovery( + &mock, + watch_periph_rx, + finished_watching_tx, + device_plugin_temp_dir_path, + ) .await .unwrap(); }); diff --git a/agent/src/util/constants.rs b/agent/src/util/constants.rs index f3dc51baa..1883ed737 100644 --- a/agent/src/util/constants.rs +++ b/agent/src/util/constants.rs @@ -10,11 +10,7 @@ pub const UNHEALTHY: &str = "Unhealthy"; pub const K8S_DEVICE_PLUGIN_VERSION: &str = "v1beta1"; /// DevicePluginPath is the folder the kubelet expects to find Device-Plugin sockets. Only privileged pods have access to this path. -#[cfg(not(test))] -pub const DEVICE_PLUGIN_PATH: &str = "/var/lib/kubelet/device-plugins/"; -/// Path for testing `DevicePluginService` -#[cfg(test)] -pub const DEVICE_PLUGIN_PATH: &str = "/tmp/device-plugins/"; +pub const DEVICE_PLUGIN_PATH: &str = "/var/lib/kubelet/device-plugins"; /// Path of the Kubelet registry socket pub const KUBELET_SOCKET: &str = "/var/lib/kubelet/device-plugins/kubelet.sock"; diff --git a/agent/src/util/device_plugin_service.rs b/agent/src/util/device_plugin_service.rs index f76a48435..6d9440f21 100644 --- a/agent/src/util/device_plugin_service.rs +++ b/agent/src/util/device_plugin_service.rs @@ -1,6 +1,5 @@ use super::constants::{ - DEVICE_PLUGIN_PATH, HEALTHY, K8S_DEVICE_PLUGIN_VERSION, KUBELET_SOCKET, - LIST_AND_WATCH_SLEEP_SECS, UNHEALTHY, + HEALTHY, K8S_DEVICE_PLUGIN_VERSION, KUBELET_SOCKET, LIST_AND_WATCH_SLEEP_SECS, UNHEALTHY, }; use super::v1beta1; use super::v1beta1::{ @@ -751,16 +750,17 @@ pub async fn build_device_plugin( shared: bool, instance_properties: HashMap, instance_map: InstanceMap, + device_plugin_path: &str, ) -> Result<(), Box> { info!("build_device_plugin - entered for device {}", instance_name); let capability_id: String = format!("{}/{}", AKRI_PREFIX, instance_name); let unique_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?; let device_endpoint: String = format!("{}-{}.sock", instance_name, unique_time.as_secs()); - let socket_path: String = format!( - "{}{}", - DEVICE_PLUGIN_PATH.to_string(), - device_endpoint.clone() - ); + let socket_path: String = Path::new(device_plugin_path) + .join(device_endpoint.clone()) + .to_str() + .unwrap() + .to_string(); // Channel capacity set to 6 because 3 possible senders (allocate, update_connectivity_status, and handle_config_delete) // and and receiver only periodically checks channel let (list_and_watch_message_sender, _) = broadcast::channel(6); @@ -900,7 +900,7 @@ async fn register( pre_start_required: false, }; - // lttp://... is a fake uri that is unused (in service_fn) but neccessary for uds connection + // lttp://... is a fake uri that is unused (in service_fn) but necessary for uds connection let channel = Endpoint::try_from("lttp://[::]:50051")? .connect_with_connector(service_fn(|_: Uri| UnixStream::connect(KUBELET_SOCKET))) .await?; @@ -1002,6 +1002,7 @@ mod device_plugin_service_tests { fs, io::{Error, ErrorKind}, }; + use tempfile::Builder; enum NodeName { ThisNode, @@ -1346,11 +1347,13 @@ mod device_plugin_service_tests { let _ = env_logger::builder().is_test(true).try_init(); let (device_plugin_service, device_plugin_service_receivers) = create_device_plugin_service(ConnectivityStatus::Online, false); - let socket_path: String = format!( - "{}{}", - DEVICE_PLUGIN_PATH.to_string(), - device_plugin_service.endpoint.clone() - ); + let device_plugin_temp_dir = Builder::new().prefix("device-plugins-").tempdir().unwrap(); + let socket_path: String = device_plugin_temp_dir + .path() + .join(device_plugin_service.endpoint.clone()) + .to_str() + .unwrap() + .to_string(); let list_and_watch_message_sender = device_plugin_service.list_and_watch_message_sender.clone(); let instance_name = device_plugin_service.instance_name.clone();