Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable applying multiple Configurations that use the same Discovery Handler #432

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion agent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "agent"
version = "0.7.9"
version = "0.7.10"
authors = ["Kate Goldenring <[email protected]>", "<[email protected]>"]
edition = "2018"

Expand Down
176 changes: 1 addition & 175 deletions agent/src/util/config_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,9 @@ mod config_action_tests {
use super::super::{
device_plugin_service,
device_plugin_service::{InstanceConnectivityStatus, InstanceMap},
discovery_operator::tests::{add_discovery_handler_to_map, build_instance_map},
registration::{DiscoveryHandlerEndpoint, DiscoveryHandlerStatus},
discovery_operator::tests::build_instance_map,
};
use super::*;
use akri_discovery_utils::discovery::{mock_discovery_handler, v0::Device};
use akri_shared::{akri::configuration::Configuration, k8s::MockKubeInterface};
use std::{collections::HashMap, fs, sync::Arc};
use tokio::sync::{broadcast, Mutex};
Expand Down Expand Up @@ -445,178 +443,6 @@ mod config_action_tests {
assert_eq!(instance_map.lock().await.len(), 0);
}

async fn run_and_test_handle_config_add(
discovery_handler_map: RegisteredDiscoveryHandlerMap,
config_map: ConfigMap,
config: Configuration,
dh_endpoint: &DiscoveryHandlerEndpoint,
dh_name: &str,
) -> tokio::task::JoinHandle<()> {
let (new_discovery_handler_sender, _) = broadcast::channel(1);
let mut mock_kube_interface = MockKubeInterface::new();
mock_kube_interface
.expect_create_instance()
.times(1)
.returning(move |_, _, _, _, _| Ok(()));
let arc_mock_kube_interface: Arc<dyn k8s::KubeInterface> = Arc::new(mock_kube_interface);
let config_add_config = config.clone();
let config_add_config_map = config_map.clone();
let config_add_discovery_handler_map = discovery_handler_map.clone();
let handle = tokio::spawn(async move {
handle_config_add(
arc_mock_kube_interface,
&config_add_config,
config_add_config_map,
config_add_discovery_handler_map,
new_discovery_handler_sender,
)
.await
.unwrap();
});

// Loop until the Configuration and single discovered Instance are added to the ConfigMap
let config_name = config.metadata.name.unwrap();
let mut x: i8 = 0;
while x < 5 {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
if let Some(config_info) = config_map.lock().await.get(&config_name) {
if config_info.instance_map.lock().await.len() == 1 {
break;
}
}
x += 1;
}
assert_ne!(x, 4);
// Assert that Discovery Handler is marked as Active
check_discovery_handler_status(
discovery_handler_map,
dh_name,
dh_endpoint,
DiscoveryHandlerStatus::Active,
)
.await;
handle
}

async fn check_discovery_handler_status(
discovery_handler_map: RegisteredDiscoveryHandlerMap,
dh_name: &str,
dh_endpoint: &DiscoveryHandlerEndpoint,
dh_status: DiscoveryHandlerStatus,
) {
let mut x: i8 = 0;
while x < 5 {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let dh_map = discovery_handler_map.lock().unwrap();
if let Some(dh_details_map) = dh_map.get(dh_name) {
if dh_details_map.get(dh_endpoint).unwrap().connectivity_status == dh_status {
break;
}
}
x += 1;
}
assert_ne!(x, 4);
}

// Tests that when a Configuration is added, deleted, and added again,
// instances are created, deleted and recreated,
// and the Discovery Handler is marked as Active, Waiting, Active, and Waiting.
// Also asserts that all threads are successfully terminated.
#[tokio::test]
async fn test_handle_config_add_delete_add() {
let _ = env_logger::builder().is_test(true).try_init();

// Set up Discovery Handler
// Start a mock DH, specifying that it should NOT return an error
let return_error = false;
let (endpoint_dir, endpoint) =
mock_discovery_handler::get_mock_discovery_handler_dir_and_endpoint("mock.sock");
let dh_endpoint = DiscoveryHandlerEndpoint::Uds(endpoint.to_string());
let device_id = "device_id";
let _dh_server_thread_handle = mock_discovery_handler::run_mock_discovery_handler(
&endpoint_dir,
&endpoint,
return_error,
vec![Device {
id: device_id.to_string(),
properties: HashMap::new(),
mounts: Vec::default(),
device_specs: Vec::default(),
}],
)
.await;
// Make sure registration server has started
akri_shared::uds::unix_stream::try_connect(&endpoint)
.await
.unwrap();

// Add Discovery Handler to map
let dh_name = "debugEcho";
let discovery_handler_map = Arc::new(std::sync::Mutex::new(HashMap::new()));
add_discovery_handler_to_map(dh_name, &dh_endpoint, false, discovery_handler_map.clone());

// Set up, run, and test handle_config_add
// Discovery Handler should create an instance and be marked as Active
let path_to_config = "../test/yaml/config-a.yaml";
let config_yaml = fs::read_to_string(path_to_config).expect("Unable to read file");
let config: Configuration = serde_yaml::from_str(&config_yaml).unwrap();
let config_name = config.metadata.name.clone().unwrap();
let config_map: ConfigMap = Arc::new(Mutex::new(HashMap::new()));
let first_add_handle = run_and_test_handle_config_add(
discovery_handler_map.clone(),
config_map.clone(),
config.clone(),
&dh_endpoint,
dh_name,
)
.await;

let config_delete_config = config.clone();
let config_delete_config_map = config_map.clone();
handle_config_delete(
&MockKubeInterface::new(),
&config_delete_config,
config_delete_config_map.clone(),
)
.await
.unwrap();

// Assert that config is removed from map after it has been deleted
assert!(!config_delete_config_map
.lock()
.await
.contains_key(&config_name));

// Assert that Discovery Handler is marked as Waiting
check_discovery_handler_status(
discovery_handler_map.clone(),
dh_name,
&dh_endpoint,
DiscoveryHandlerStatus::Waiting,
)
.await;

let second_add_handle = run_and_test_handle_config_add(
discovery_handler_map.clone(),
config_map.clone(),
config.clone(),
&dh_endpoint,
dh_name,
)
.await;

// Assert that Discovery Handler is marked as Waiting
check_discovery_handler_status(
discovery_handler_map.clone(),
dh_name,
&dh_endpoint,
DiscoveryHandlerStatus::Waiting,
)
.await;

futures::future::join_all(vec![first_add_handle, second_add_handle]).await;
}

// Tests that when a Configuration is updated,
// if generation has changed, should return true
#[tokio::test]
Expand Down
4 changes: 0 additions & 4 deletions agent/src/util/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ pub const SLOT_RECONCILIATION_SLOT_GRACE_PERIOD_SECS: u64 = 300;
#[cfg(any(test, feature = "agent-full"))]
pub const ENABLE_DEBUG_ECHO_LABEL: &str = "ENABLE_DEBUG_ECHO";

/// Maximum amount of time allowed to pass without being able to connect to a discovery handler without it being removed
/// from the map of registered Discovery Handlers.
pub const DISCOVERY_HANDLER_OFFLINE_GRACE_PERIOD_SECS: u64 = 300;

/// Capacity of channel over which `DevicePluginService::list_and_watch` sends updates to kubelet about "virtual" device
/// health of an instance. The kubelet Device Plugin manager should receive each message instantly; however, providing
/// some buffer in case.
Expand Down
Loading