diff --git a/Cargo.lock b/Cargo.lock index 8c061de38..1f157ed51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -333,7 +333,7 @@ checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" [[package]] name = "agent" -version = "0.7.9" +version = "0.7.10" dependencies = [ "akri-debug-echo", "akri-discovery-utils", @@ -390,7 +390,7 @@ dependencies = [ [[package]] name = "akri-debug-echo" -version = "0.7.9" +version = "0.7.10" dependencies = [ "akri-discovery-utils", "akri-shared", @@ -410,7 +410,7 @@ dependencies = [ [[package]] name = "akri-discovery-utils" -version = "0.7.9" +version = "0.7.10" dependencies = [ "akri-shared", "anyhow", @@ -432,7 +432,7 @@ dependencies = [ [[package]] name = "akri-onvif" -version = "0.7.9" +version = "0.7.10" dependencies = [ "akri-discovery-utils", "akri-shared", @@ -461,7 +461,7 @@ dependencies = [ [[package]] name = "akri-opcua" -version = "0.7.9" +version = "0.7.10" dependencies = [ "akri-discovery-utils", "akri-shared", @@ -485,7 +485,7 @@ dependencies = [ [[package]] name = "akri-shared" -version = "0.7.9" +version = "0.7.10" dependencies = [ "anyhow", "async-trait", @@ -514,7 +514,7 @@ dependencies = [ [[package]] name = "akri-udev" -version = "0.7.9" +version = "0.7.10" dependencies = [ "akri-discovery-utils", "anyhow", @@ -980,7 +980,7 @@ checksum = "f92cfa0fd5690b3cf8c1ef2cabbd9b7ef22fa53cf5e1f92b05103f6d5d1cf6e7" [[package]] name = "controller" -version = "0.7.9" +version = "0.7.10" dependencies = [ "akri-shared", "anyhow", @@ -1195,7 +1195,7 @@ dependencies = [ [[package]] name = "debug-echo-discovery-handler" -version = "0.7.9" +version = "0.7.10" dependencies = [ "akri-debug-echo", "akri-discovery-utils", @@ -2414,7 +2414,7 @@ checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" [[package]] name = "onvif-discovery-handler" -version = "0.7.9" +version = "0.7.10" dependencies = [ "akri-discovery-utils", "akri-onvif", @@ -2498,7 +2498,7 @@ dependencies = [ [[package]] name = "opcua-discovery-handler" -version = "0.7.9" +version = "0.7.10" dependencies = [ "akri-discovery-utils", "akri-opcua", @@ -4311,7 +4311,7 @@ dependencies = [ [[package]] name = "udev-discovery-handler" -version = "0.7.9" +version = "0.7.10" dependencies = [ "akri-discovery-utils", "akri-udev", @@ -4322,7 +4322,7 @@ dependencies = [ [[package]] name = "udev-video-broker" -version = "0.7.9" +version = "0.7.10" dependencies = [ "akri-shared", "env_logger", @@ -4586,7 +4586,7 @@ dependencies = [ [[package]] name = "webhook-configuration" -version = "0.7.9" +version = "0.7.10" dependencies = [ "actix", "actix-rt 2.5.0", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 3a4767f25..4a3157f1d 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "agent" -version = "0.7.9" +version = "0.7.10" authors = ["Kate Goldenring ", ""] edition = "2018" diff --git a/agent/src/util/config_action.rs b/agent/src/util/config_action.rs index af7db0e6d..f92e360f7 100644 --- a/agent/src/util/config_action.rs +++ b/agent/src/util/config_action.rs @@ -342,11 +342,9 @@ mod config_action_tests { use super::super::{ device_plugin_service, device_plugin_service::{InstanceConnectivityStatus, InstanceMap}, - discovery_operator::tests::{add_discovery_handler_to_map, build_instance_map}, - registration::{DiscoveryHandlerEndpoint, DiscoveryHandlerStatus}, + discovery_operator::tests::build_instance_map, }; use super::*; - use akri_discovery_utils::discovery::{mock_discovery_handler, v0::Device}; use akri_shared::{akri::configuration::Configuration, k8s::MockKubeInterface}; use std::{collections::HashMap, fs, sync::Arc}; use tokio::sync::{broadcast, Mutex}; @@ -445,178 +443,6 @@ mod config_action_tests { assert_eq!(instance_map.lock().await.len(), 0); } - async fn run_and_test_handle_config_add( - discovery_handler_map: RegisteredDiscoveryHandlerMap, - config_map: ConfigMap, - config: Configuration, - dh_endpoint: &DiscoveryHandlerEndpoint, - dh_name: &str, - ) -> tokio::task::JoinHandle<()> { - let (new_discovery_handler_sender, _) = broadcast::channel(1); - let mut mock_kube_interface = MockKubeInterface::new(); - mock_kube_interface - .expect_create_instance() - .times(1) - .returning(move |_, _, _, _, _| Ok(())); - let arc_mock_kube_interface: Arc = Arc::new(mock_kube_interface); - let config_add_config = config.clone(); - let config_add_config_map = config_map.clone(); - let config_add_discovery_handler_map = discovery_handler_map.clone(); - let handle = tokio::spawn(async move { - handle_config_add( - arc_mock_kube_interface, - &config_add_config, - config_add_config_map, - config_add_discovery_handler_map, - new_discovery_handler_sender, - ) - .await - .unwrap(); - }); - - // Loop until the Configuration and single discovered Instance are added to the ConfigMap - let config_name = config.metadata.name.unwrap(); - let mut x: i8 = 0; - while x < 5 { - tokio::time::sleep(std::time::Duration::from_millis(200)).await; - if let Some(config_info) = config_map.lock().await.get(&config_name) { - if config_info.instance_map.lock().await.len() == 1 { - break; - } - } - x += 1; - } - assert_ne!(x, 4); - // Assert that Discovery Handler is marked as Active - check_discovery_handler_status( - discovery_handler_map, - dh_name, - dh_endpoint, - DiscoveryHandlerStatus::Active, - ) - .await; - handle - } - - async fn check_discovery_handler_status( - discovery_handler_map: RegisteredDiscoveryHandlerMap, - dh_name: &str, - dh_endpoint: &DiscoveryHandlerEndpoint, - dh_status: DiscoveryHandlerStatus, - ) { - let mut x: i8 = 0; - while x < 5 { - tokio::time::sleep(std::time::Duration::from_millis(200)).await; - let dh_map = discovery_handler_map.lock().unwrap(); - if let Some(dh_details_map) = dh_map.get(dh_name) { - if dh_details_map.get(dh_endpoint).unwrap().connectivity_status == dh_status { - break; - } - } - x += 1; - } - assert_ne!(x, 4); - } - - // Tests that when a Configuration is added, deleted, and added again, - // instances are created, deleted and recreated, - // and the Discovery Handler is marked as Active, Waiting, Active, and Waiting. - // Also asserts that all threads are successfully terminated. - #[tokio::test] - async fn test_handle_config_add_delete_add() { - let _ = env_logger::builder().is_test(true).try_init(); - - // Set up Discovery Handler - // Start a mock DH, specifying that it should NOT return an error - let return_error = false; - let (endpoint_dir, endpoint) = - mock_discovery_handler::get_mock_discovery_handler_dir_and_endpoint("mock.sock"); - let dh_endpoint = DiscoveryHandlerEndpoint::Uds(endpoint.to_string()); - let device_id = "device_id"; - let _dh_server_thread_handle = mock_discovery_handler::run_mock_discovery_handler( - &endpoint_dir, - &endpoint, - return_error, - vec![Device { - id: device_id.to_string(), - properties: HashMap::new(), - mounts: Vec::default(), - device_specs: Vec::default(), - }], - ) - .await; - // Make sure registration server has started - akri_shared::uds::unix_stream::try_connect(&endpoint) - .await - .unwrap(); - - // Add Discovery Handler to map - let dh_name = "debugEcho"; - let discovery_handler_map = Arc::new(std::sync::Mutex::new(HashMap::new())); - add_discovery_handler_to_map(dh_name, &dh_endpoint, false, discovery_handler_map.clone()); - - // Set up, run, and test handle_config_add - // Discovery Handler should create an instance and be marked as Active - let path_to_config = "../test/yaml/config-a.yaml"; - let config_yaml = fs::read_to_string(path_to_config).expect("Unable to read file"); - let config: Configuration = serde_yaml::from_str(&config_yaml).unwrap(); - let config_name = config.metadata.name.clone().unwrap(); - let config_map: ConfigMap = Arc::new(Mutex::new(HashMap::new())); - let first_add_handle = run_and_test_handle_config_add( - discovery_handler_map.clone(), - config_map.clone(), - config.clone(), - &dh_endpoint, - dh_name, - ) - .await; - - let config_delete_config = config.clone(); - let config_delete_config_map = config_map.clone(); - handle_config_delete( - &MockKubeInterface::new(), - &config_delete_config, - config_delete_config_map.clone(), - ) - .await - .unwrap(); - - // Assert that config is removed from map after it has been deleted - assert!(!config_delete_config_map - .lock() - .await - .contains_key(&config_name)); - - // Assert that Discovery Handler is marked as Waiting - check_discovery_handler_status( - discovery_handler_map.clone(), - dh_name, - &dh_endpoint, - DiscoveryHandlerStatus::Waiting, - ) - .await; - - let second_add_handle = run_and_test_handle_config_add( - discovery_handler_map.clone(), - config_map.clone(), - config.clone(), - &dh_endpoint, - dh_name, - ) - .await; - - // Assert that Discovery Handler is marked as Waiting - check_discovery_handler_status( - discovery_handler_map.clone(), - dh_name, - &dh_endpoint, - DiscoveryHandlerStatus::Waiting, - ) - .await; - - futures::future::join_all(vec![first_add_handle, second_add_handle]).await; - } - // Tests that when a Configuration is updated, // if generation has changed, should return true #[tokio::test] diff --git a/agent/src/util/constants.rs b/agent/src/util/constants.rs index 66681c851..e3c0cdfaa 100644 --- a/agent/src/util/constants.rs +++ b/agent/src/util/constants.rs @@ -31,10 +31,6 @@ pub const SLOT_RECONCILIATION_SLOT_GRACE_PERIOD_SECS: u64 = 300; #[cfg(any(test, feature = "agent-full"))] pub const ENABLE_DEBUG_ECHO_LABEL: &str = "ENABLE_DEBUG_ECHO"; -/// Maximum amount of time allowed to pass without being able to connect to a discovery handler without it being removed -/// from the map of registered Discovery Handlers. -pub const DISCOVERY_HANDLER_OFFLINE_GRACE_PERIOD_SECS: u64 = 300; - /// Capacity of channel over which `DevicePluginService::list_and_watch` sends updates to kubelet about "virtual" device /// health of an instance. The kubelet Device Plugin manager should receive each message instantly; however, providing /// some buffer in case. diff --git a/agent/src/util/discovery_operator.rs b/agent/src/util/discovery_operator.rs index 854dc5e66..b997ad462 100644 --- a/agent/src/util/discovery_operator.rs +++ b/agent/src/util/discovery_operator.rs @@ -2,18 +2,13 @@ use super::super::INSTANCE_COUNT_METRIC; #[cfg(any(test, feature = "agent-full"))] use super::embedded_discovery_handlers::get_discovery_handler; use super::{ - constants::{ - DISCOVERY_HANDLER_OFFLINE_GRACE_PERIOD_SECS, SHARED_INSTANCE_OFFLINE_GRACE_PERIOD_SECS, - }, + constants::SHARED_INSTANCE_OFFLINE_GRACE_PERIOD_SECS, device_plugin_builder::{DevicePluginBuilder, DevicePluginBuilderInterface}, device_plugin_service, device_plugin_service::{ get_device_instance_name, InstanceConnectivityStatus, InstanceInfo, InstanceMap, }, - registration::{ - DiscoveryDetails, DiscoveryHandlerEndpoint, DiscoveryHandlerStatus, - RegisteredDiscoveryHandlerMap, - }, + registration::{DiscoveryDetails, DiscoveryHandlerEndpoint, RegisteredDiscoveryHandlerMap}, streaming_extension::StreamingExt, }; use akri_discovery_utils::discovery::v0::{ @@ -36,16 +31,13 @@ use mockall::{automock, predicate::*}; #[cfg(not(test))] use std::time::Instant; use std::{collections::HashMap, convert::TryFrom, sync::Arc}; -use tonic::{ - transport::{Endpoint, Uri}, - Status, -}; +use tonic::transport::{Endpoint, Uri}; /// StreamType provides a wrapper around the two different types of streams returned from embedded /// or embedded discovery handlers and ones running externally. pub enum StreamType { #[cfg(any(test, feature = "agent-full"))] - Embedded(tokio::sync::mpsc::Receiver>), + Embedded(tokio::sync::mpsc::Receiver>), External(tonic::Streaming), } @@ -211,7 +203,7 @@ impl DiscoveryOperator { kube_interface: Arc, dh_details: &'a DiscoveryDetails, stream: &'a mut dyn StreamingExt, - ) -> Result<(), Status> { + ) -> anyhow::Result<()> { // clone objects for thread let discovery_operator = Arc::new(self.clone()); let stop_discovery_receiver: &mut tokio::sync::broadcast::Receiver<()> = @@ -224,21 +216,15 @@ impl DiscoveryOperator { break; }, result = stream.get_message() => { - let message = result?; - if let Some(response) = message { - trace!("internal_do_discover - got discovery results {:?}", response.devices); - self.handle_discovery_results( - kube_interface.clone(), - response.devices, - dh_details.shared, - Box::new(DevicePluginBuilder{}), - ) - .await - .unwrap(); - } else { - error!("internal_do_discover - received result of type None. Should not happen."); - break; - } + let response = result?.ok_or_else(|| anyhow::anyhow!("Received response type None. Should not happen."))?; + trace!("internal_do_discover - got discovery results {:?}", response.devices); + self.handle_discovery_results( + kube_interface.clone(), + response.devices, + dh_details.shared, + Box::new(DevicePluginBuilder{}), + ) + .await?; } } } @@ -246,54 +232,6 @@ impl DiscoveryOperator { Ok(()) } - /// Sets the connectivity status of a discovery handler. If a discovery handler goes offline, mark_offline_or_deregister_discovery_handler should be used. - pub fn set_discovery_handler_connectivity_status( - &self, - endpoint: &DiscoveryHandlerEndpoint, - connectivity_status: DiscoveryHandlerStatus, - ) { - trace!("set_discovery_handler_connectivity_status - set status of {:?} for {} discovery handler at endpoint {:?}", connectivity_status, self.config.spec.discovery_handler.name, endpoint); - let mut registered_dh_map = self.discovery_handler_map.lock().unwrap(); - let discovery_handler_details_map = registered_dh_map - .get_mut(&self.config.spec.discovery_handler.name) - .unwrap(); - let dh_details = discovery_handler_details_map.get_mut(endpoint).unwrap(); - dh_details.connectivity_status = connectivity_status; - } - - /// This is called when no connection can be made with a discovery handler at its endpoint. - /// It takes action based on a Discovery Handler's (DH's) current `DiscoveryHandlerStatus`. - /// If `DiscoveryHandlerStatus::Waiting`, connectivity status changed to Offline. - /// If `DiscoveryHandlerStatus::Offline`, DH is removed from the `RegisteredDiscoveryHandlersMap` - /// if it have been offline for longer than the grace period. - /// If `DiscoveryHandlerStatus::Active`, this should not happen, Error is returned. - pub async fn mark_offline_or_deregister_discovery_handler( - &self, - endpoint: &DiscoveryHandlerEndpoint, - ) -> Result { - trace!("mark_offline_or_deregister_discovery_handler - {} discovery handler at endpoint {:?} is offline", self.config.spec.discovery_handler.name, endpoint); - let mut deregistered = false; - let mut registered_dh_map = self.discovery_handler_map.lock().unwrap(); - let discovery_handler_details_map = registered_dh_map - .get_mut(&self.config.spec.discovery_handler.name) - .unwrap(); - let dh_details = discovery_handler_details_map.get_mut(endpoint).unwrap(); - match dh_details.connectivity_status { - DiscoveryHandlerStatus::Offline(instant) => { - if instant.elapsed().as_secs() > DISCOVERY_HANDLER_OFFLINE_GRACE_PERIOD_SECS { - trace!("mark_offline_or_deregister_discovery_handler - de-registering {} discovery handler at endpoint {:?} since been offline for longer than 5 minutes", self.config.spec.discovery_handler.name, endpoint); - // Remove discovery handler from map if timed out - discovery_handler_details_map.remove(endpoint).unwrap(); - deregistered = true; - } - } - DiscoveryHandlerStatus::Waiting | DiscoveryHandlerStatus::Active => { - dh_details.connectivity_status = DiscoveryHandlerStatus::Offline(Instant::now()); - } - } - Ok(deregistered) - } - /// Checks if any of this DiscoveryOperator's Configuration's Instances have been offline for too long. /// If a non-local device has not come back online before `SHARED_INSTANCE_OFFLINE_GRACE_PERIOD_SECS`, /// the associated Device Plugin and Instance are terminated and deleted, respectively. @@ -343,7 +281,7 @@ impl DiscoveryOperator { discovery_results: Vec, shared: bool, device_plugin_builder: Box, - ) -> Result<(), Box> { + ) -> anyhow::Result<()> { let config_name = self.config.metadata.name.clone().unwrap(); trace!( "handle_discovery_results - for config {} with discovery results {:?}", @@ -414,7 +352,7 @@ impl DiscoveryOperator { kube_interface: Arc, currently_visible_instances: HashMap, shared: bool, - ) -> Result<(), Box> { + ) -> anyhow::Result<()> { let instance_map = self.instance_map.lock().await.clone(); for (instance, instance_info) in instance_map { trace!( @@ -514,9 +452,7 @@ impl DiscoveryOperator { } pub mod start_discovery { - use super::super::registration::{ - DiscoveryDetails, DiscoveryHandlerEndpoint, DiscoveryHandlerStatus, - }; + use super::super::registration::{DiscoveryDetails, DiscoveryHandlerEndpoint}; // Use this `mockall` macro to automate importing a mock type in test mode, or a real type otherwise. #[double] pub use super::DiscoveryOperator; @@ -637,13 +573,7 @@ pub mod start_discovery { /// A Configuration specifies the name of `DiscoveryHandlers` that should be utilized for discovery. /// This tries to establish connection with each `DiscoveryHandler` registered under the requested /// `DiscoveryHandler` name and spawns a discovery thread for each connection. - /// This function also manages the `DiscoveryHandlerStatus` of each `DiscoveryHandler` as follows: - /// /// `DiscoveryHandlerStatus::Active` if a connection is established via a call to get_stream - /// /// `DiscoveryHandlerStatus::Waitin`g after a connection has finished due to either being signaled to stop connecting - /// /// or an error being returned from the discovery handler (that is not a broken pipe) - /// /// `DiscoveryHandlerStatus::Offline` if a connection cannot be established via a call to get_stream /// If a connection cannot be established, continues to try, sleeping between iteration. - /// Removes the discovery handler from the `RegisteredDiscoveryHandlerMap` if it has been offline for longer than the grace period. pub async fn do_discover( discovery_operator: Arc, kube_interface: Arc, @@ -673,26 +603,18 @@ pub mod start_discovery { config.spec.discovery_handler.name, endpoint ); - // Only use DiscoveryHandler if it doesn't have a client yet - if dh_details.connectivity_status != DiscoveryHandlerStatus::Active { - trace!( - "do_discover - {} discovery handler at endpoint {:?} doesn't have client", - config.spec.discovery_handler.name, - endpoint - ); - let discovery_operator = discovery_operator.clone(); - let kube_interface = kube_interface.clone(); - discovery_tasks.push(tokio::spawn(async move { - do_discover_on_discovery_handler( - discovery_operator.clone(), - kube_interface.clone(), - &endpoint, - &dh_details, - ) - .await - .unwrap(); - })); - } + let discovery_operator = discovery_operator.clone(); + let kube_interface = kube_interface.clone(); + discovery_tasks.push(tokio::spawn(async move { + do_discover_on_discovery_handler( + discovery_operator.clone(), + kube_interface.clone(), + &endpoint, + &dh_details, + ) + .await + .unwrap(); + })); } } futures::future::try_join_all(discovery_tasks).await?; @@ -705,34 +627,20 @@ pub mod start_discovery { kube_interface: Arc, endpoint: &'a DiscoveryHandlerEndpoint, dh_details: &'a DiscoveryDetails, - ) -> Result<(), Box> { + ) -> anyhow::Result<()> { loop { - let deregistered; - match discovery_operator.get_stream(endpoint).await { - Some(stream_type) => { - // Since connection was established, be sure that the Discovery Handler is marked as having a client - discovery_operator.set_discovery_handler_connectivity_status( - endpoint, - DiscoveryHandlerStatus::Active, - ); - match stream_type { - StreamType::External(mut stream) => { - match discovery_operator - .internal_do_discover( - kube_interface.clone(), - dh_details, - &mut stream, - ) - .await - { - Ok(_) => { - discovery_operator.set_discovery_handler_connectivity_status( - endpoint, - DiscoveryHandlerStatus::Waiting, - ); - break; - } - Err(status) => { + if let Some(stream_type) = discovery_operator.get_stream(endpoint).await { + match stream_type { + StreamType::External(mut stream) => { + match discovery_operator + .internal_do_discover(kube_interface.clone(), dh_details, &mut stream) + .await + { + Ok(_) => { + break; + } + Err(e) => { + if let Some(status) = e.downcast_ref::() { if status.message().contains("broken pipe") { // Mark all associated instances as offline error!("do_discover_on_discovery_handler - connection with Discovery Handler dropped with status {:?}. Marking all instances offline.", status); @@ -743,14 +651,9 @@ pub mod start_discovery { dh_details.shared, ) .await?; - deregistered = discovery_operator - .mark_offline_or_deregister_discovery_handler(endpoint) - .await - .unwrap(); } else { trace!("do_discover_on_discovery_handler - Discovery Handlers returned error status {}. Marking all instances offline.", status); // TODO: Possibly mark config as invalid - // Mark all associated instances as offline by declaring no visible instances discovery_operator .update_instance_connectivity_status( kube_interface.clone(), @@ -758,61 +661,39 @@ pub mod start_discovery { dh_details.shared, ) .await?; - discovery_operator - .set_discovery_handler_connectivity_status( - endpoint, - DiscoveryHandlerStatus::Waiting, - ); - break; } + } else { + return Err(e); } } } - #[cfg(any(test, feature = "agent-full"))] - StreamType::Embedded(mut stream) => { - discovery_operator - .internal_do_discover( - kube_interface.clone(), - dh_details, - &mut stream, - ) - .await - .unwrap(); - discovery_operator.set_discovery_handler_connectivity_status( - endpoint, - DiscoveryHandlerStatus::Waiting, - ); - break; - } } - } - None => { - deregistered = discovery_operator - .mark_offline_or_deregister_discovery_handler(endpoint) - .await - .unwrap(); + #[cfg(any(test, feature = "agent-full"))] + StreamType::Embedded(mut stream) => { + discovery_operator + .internal_do_discover(kube_interface.clone(), dh_details, &mut stream) + .await?; + // Embedded discovery should only return okay if signaled to stop. Otherwise, bubble up error. + break; + } } } - if deregistered { - break; - } else { - // If a connection cannot be established with the Discovery Handler, it will sleep and try again. - // This continues until connection established, the Discovery Handler is deregistered due to grace period elapsing, - // or the Discovery Handler is told to stop discovery. - let mut stop_discovery_receiver = - dh_details.close_discovery_handler_connection.subscribe(); - let mut sleep_duration = Duration::from_secs(60); - if cfg!(test) { - sleep_duration = Duration::from_millis(100); - } - if tokio::time::timeout(sleep_duration, stop_discovery_receiver.recv()) - .await - .is_ok() - { - trace!("do_discover_on_discovery_handler - received message to stop discovery for {} Discovery Handler at endpoint {:?}", dh_details.name, dh_details.endpoint); - break; - } + // If a connection cannot be established with the Discovery Handler, it will sleep and try again. + // This continues until connection established or the Discovery Handler is told to stop discovery. + let mut stop_discovery_receiver = + dh_details.close_discovery_handler_connection.subscribe(); + let mut sleep_duration = Duration::from_secs(60); + if cfg!(test) { + sleep_duration = Duration::from_millis(100); + } + + if tokio::time::timeout(sleep_duration, stop_discovery_receiver.recv()) + .await + .is_ok() + { + trace!("do_discover_on_discovery_handler - received message to stop discovery for {} Discovery Handler at endpoint {:?}", dh_details.name, dh_details.endpoint); + break; } } Ok(()) @@ -860,9 +741,7 @@ pub fn inner_generate_instance_digest( pub mod tests { use super::super::{ device_plugin_builder::MockDevicePluginBuilderInterface, - registration::{ - inner_register_embedded_discovery_handlers, DiscoveryDetails, DiscoveryHandlerStatus, - }, + registration::{inner_register_embedded_discovery_handlers, DiscoveryDetails}, }; use super::*; use akri_discovery_utils::discovery::mock_discovery_handler; @@ -993,11 +872,12 @@ pub mod tests { endpoint, shared, close_discovery_handler_connection: close_discovery_handler_connection, - connectivity_status: DiscoveryHandlerStatus::Waiting, } } - fn setup_test_do_discover() -> (MockDiscoveryOperator, RegisteredDiscoveryHandlerMap) { + fn setup_test_do_discover( + config_name: &str, + ) -> (MockDiscoveryOperator, RegisteredDiscoveryHandlerMap) { let discovery_handler_map = Arc::new(std::sync::Mutex::new(HashMap::new())); add_discovery_handler_to_map( "debugEcho", @@ -1009,7 +889,8 @@ pub mod tests { // Build discovery operator let path_to_config = "../test/yaml/config-a.yaml"; let config_yaml = std::fs::read_to_string(path_to_config).expect("Unable to read file"); - let config: Configuration = serde_yaml::from_str(&config_yaml).unwrap(); + let mut config: Configuration = serde_yaml::from_str(&config_yaml).unwrap(); + config.metadata.name = Some(config_name.to_string()); let discovery_operator = create_mock_discovery_operator( discovery_handler_map.clone(), config, @@ -1091,52 +972,97 @@ pub mod tests { #[tokio::test] async fn test_start_discovery_termination() { let _ = env_logger::builder().is_test(true).try_init(); - let (mut mock_discovery_operator, discovery_handler_map) = setup_test_do_discover(); - let (marked_offline_sender, mut marked_offline_receiver) = - tokio::sync::broadcast::channel(1); + let mut start_discovery_components = start_discovery_setup("config-a", true).await; + start_discovery_components + .running_receiver + .recv() + .await + .unwrap(); + start_discovery_components + .stop_all_discovery_sender + .send(()) + .unwrap(); + start_discovery_components + .finished_discovery_receiver + .recv() + .await + .unwrap(); + // Make sure that all threads have finished + start_discovery_components + .start_discovery_handle + .await + .unwrap(); + } + + // Test that start discovery can be called twice for two (differently named) + // Configurations that use the same DH. + #[tokio::test] + async fn test_start_discovery_same_discovery_handler() { + let _ = env_logger::builder().is_test(true).try_init(); + let mut start_discovery_components_a = start_discovery_setup("config-a", false).await; + let mut start_discovery_components_b = start_discovery_setup("config-b", false).await; + + start_discovery_components_a + .running_receiver + .recv() + .await + .unwrap(); + start_discovery_components_b + .running_receiver + .recv() + .await + .unwrap(); + } + + struct StartDiscoveryComponents { + finished_discovery_receiver: tokio::sync::mpsc::Receiver<()>, + stop_all_discovery_sender: tokio::sync::broadcast::Sender<()>, + running_receiver: tokio::sync::broadcast::Receiver<()>, + start_discovery_handle: tokio::task::JoinHandle<()>, + } + + async fn start_discovery_setup(config_name: &str, terminate: bool) -> StartDiscoveryComponents { + let (mut mock_discovery_operator, discovery_handler_map) = + setup_test_do_discover(config_name); + let (running_sender, running_receiver) = tokio::sync::broadcast::channel::<()>(1); mock_discovery_operator .expect_get_stream() - .returning(|_| None); - mock_discovery_operator - .expect_mark_offline_or_deregister_discovery_handler() - .withf(move |endpoint: &DiscoveryHandlerEndpoint| { - endpoint == &DiscoveryHandlerEndpoint::Uds("socket.sock".to_string()) - }) .returning(move |_| { - marked_offline_sender.clone().send(()).unwrap(); - Ok(false) + running_sender.clone().send(()).unwrap(); + None }); mock_discovery_operator .expect_delete_offline_instances() .times(1) .returning(move |_| Ok(())); - let stop_dh_discovery_sender = discovery_handler_map - .lock() - .unwrap() - .get_mut("debugEcho") - .unwrap() - .clone() - .get(&DiscoveryHandlerEndpoint::Uds("socket.sock".to_string())) - .unwrap() - .clone() - .close_discovery_handler_connection; - mock_discovery_operator - .expect_stop_all_discovery() - .times(1) - .returning(move || { - stop_dh_discovery_sender.clone().send(()).unwrap(); - }); + if terminate { + let stop_dh_discovery_sender = discovery_handler_map + .lock() + .unwrap() + .get_mut("debugEcho") + .unwrap() + .clone() + .get(&DiscoveryHandlerEndpoint::Uds("socket.sock".to_string())) + .unwrap() + .clone() + .close_discovery_handler_connection; + mock_discovery_operator + .expect_stop_all_discovery() + .times(1) + .returning(move || { + stop_dh_discovery_sender.clone().send(()).unwrap(); + }); + } + let (mut finished_discovery_sender, finished_discovery_receiver) = + tokio::sync::mpsc::channel(2); let (new_dh_sender, _) = broadcast::channel(2); let (stop_all_discovery_sender, _) = broadcast::channel(2); - let (mut finished_discovery_sender, mut finished_discovery_receiver) = - tokio::sync::mpsc::channel(2); - let thread_new_dh_sender = new_dh_sender.clone(); let thread_stop_all_discovery_sender = stop_all_discovery_sender.clone(); let mock_kube_interface: Arc = Arc::new(MockKubeInterface::new()); - let handle = tokio::spawn(async move { + let start_discovery_handle = tokio::spawn(async move { start_discovery::start_discovery( mock_discovery_operator, - thread_new_dh_sender, + new_dh_sender.to_owned(), thread_stop_all_discovery_sender, &mut finished_discovery_sender, mock_kube_interface, @@ -1144,36 +1070,26 @@ pub mod tests { .await .unwrap(); }); - - // Wait until do_discovery has gotten to point the DH marked offline - marked_offline_receiver.recv().await.unwrap(); - stop_all_discovery_sender.send(()).unwrap(); - finished_discovery_receiver.recv().await.unwrap(); - // Make sure that all threads have finished - handle.await.unwrap(); + StartDiscoveryComponents { + finished_discovery_receiver, + stop_all_discovery_sender, + running_receiver, + start_discovery_handle, + } } - // Test that DH is connected to on second try getting stream and - // that connectivity status is changed from Waiting -> Active -> Waiting again - // when a successful connection is made and completed. + // Test that DH is connected to on second try getting stream. #[tokio::test] async fn test_do_discover_completed_internal_connection() { let _ = env_logger::builder().is_test(true).try_init(); - let (mut mock_discovery_operator, _) = setup_test_do_discover(); + let (mut mock_discovery_operator, _) = setup_test_do_discover("config-a"); let mut get_stream_seq = Sequence::new(); - // First time cannot get stream and is marked offline + // First time cannot get stream mock_discovery_operator .expect_get_stream() .times(1) .returning(|_| None) .in_sequence(&mut get_stream_seq); - mock_discovery_operator - .expect_mark_offline_or_deregister_discovery_handler() - .withf(move |endpoint: &DiscoveryHandlerEndpoint| { - endpoint == &DiscoveryHandlerEndpoint::Uds("socket.sock".to_string()) - }) - .times(1) - .returning(|_| Ok(false)); // Second time successfully get stream let (_, rx) = mpsc::channel(2); let stream_type = Some(StreamType::Embedded(rx)); @@ -1182,38 +1098,11 @@ pub mod tests { .times(1) .return_once(move |_| stream_type) .in_sequence(&mut get_stream_seq); - // Make sure discovery handler is marked as Active - let mut discovery_handler_status_seq = Sequence::new(); - mock_discovery_operator - .expect_set_discovery_handler_connectivity_status() - .withf( - move |endpoint: &DiscoveryHandlerEndpoint, - connectivity_status: &DiscoveryHandlerStatus| { - endpoint == &DiscoveryHandlerEndpoint::Uds("socket.sock".to_string()) - && connectivity_status == &DiscoveryHandlerStatus::Active - }, - ) - .times(1) - .returning(|_, _| ()) - .in_sequence(&mut discovery_handler_status_seq); // Discovery should be initiated mock_discovery_operator .expect_internal_do_discover() .times(1) .returning(|_, _, _| Ok(())); - // Make sure after discovery is complete that the DH is marked Online again - mock_discovery_operator - .expect_set_discovery_handler_connectivity_status() - .withf( - move |endpoint: &DiscoveryHandlerEndpoint, - connectivity_status: &DiscoveryHandlerStatus| { - endpoint == &DiscoveryHandlerEndpoint::Uds("socket.sock".to_string()) - && connectivity_status == &DiscoveryHandlerStatus::Waiting - }, - ) - .times(1) - .returning(|_, _| ()) - .in_sequence(&mut discovery_handler_status_seq); let mock_kube_interface: Arc = Arc::new(MockKubeInterface::new()); start_discovery::do_discover(Arc::new(mock_discovery_operator), mock_kube_interface) .await @@ -1593,82 +1482,6 @@ pub mod tests { ) } - #[tokio::test] - async fn test_set_discovery_handler_connectivity_status() { - let _ = env_logger::builder().is_test(true).try_init(); - let discovery_handler_name = "debugEcho"; - let endpoint = DiscoveryHandlerEndpoint::Uds("socket.sock".to_string()); - let discovery_operator = setup_non_mocked_dh(discovery_handler_name, &endpoint); - // Test that an online discovery handler is marked Active - discovery_operator - .set_discovery_handler_connectivity_status(&endpoint, DiscoveryHandlerStatus::Active); - assert_eq!( - discovery_operator - .discovery_handler_map - .lock() - .unwrap() - .get_mut(discovery_handler_name) - .unwrap() - .clone() - .get(&endpoint) - .unwrap() - .clone() - .connectivity_status, - DiscoveryHandlerStatus::Active - ); - } - - #[tokio::test] - async fn test_mark_offline_or_deregister_discovery_handler() { - let _ = env_logger::builder().is_test(true).try_init(); - let discovery_handler_name = "debugEcho"; - let endpoint = DiscoveryHandlerEndpoint::Uds("socket.sock".to_string()); - let discovery_operator = setup_non_mocked_dh(discovery_handler_name, &endpoint); - // Test that an online discovery handler is marked offline - assert!(!discovery_operator - .mark_offline_or_deregister_discovery_handler(&endpoint) - .await - .unwrap()); - if let DiscoveryHandlerStatus::Offline(_) = discovery_operator - .discovery_handler_map - .lock() - .unwrap() - .get_mut(discovery_handler_name) - .unwrap() - .clone() - .get(&endpoint) - .unwrap() - .clone() - .connectivity_status - { - // expected - } else { - panic!("DiscoveryHandlerStatus should be changed to offline"); - } - // Test that an offline discovery handler IS NOT deregistered if the time has not passed - assert!(!discovery_operator - .mark_offline_or_deregister_discovery_handler(&endpoint) - .await - .unwrap()); - - // Test that an offline discovery handler IS deregistered if the time has passed - let mock_now = Instant::now(); - MockClock::advance(Duration::from_secs(301)); - discovery_operator - .discovery_handler_map - .lock() - .unwrap() - .get_mut(discovery_handler_name) - .unwrap() - .get_mut(&endpoint) - .unwrap() - .connectivity_status = DiscoveryHandlerStatus::Offline(mock_now); - assert!(discovery_operator - .mark_offline_or_deregister_discovery_handler(&endpoint) - .await - .unwrap()); - } - #[tokio::test] async fn test_get_stream_embedded() { let _ = env_logger::builder().is_test(true).try_init(); diff --git a/agent/src/util/registration.rs b/agent/src/util/registration.rs index f1863a094..abe0e9340 100644 --- a/agent/src/util/registration.rs +++ b/agent/src/util/registration.rs @@ -10,12 +10,8 @@ use akri_discovery_utils::discovery::v0::{ use akri_shared::os::env_var::{ActualEnvVarQuery, EnvVarQuery}; use akri_shared::uds::unix_stream; use futures::TryFutureExt; -#[cfg(test)] -use mock_instant::Instant; use std::collections::HashMap; use std::sync::{Arc, Mutex}; -#[cfg(not(test))] -use std::time::Instant; use tokio::sync::broadcast; use tonic::{transport::Server, Request, Response, Status}; @@ -44,17 +40,6 @@ pub enum DiscoveryHandlerEndpoint { Network(String), } -/// Describes the connectivity status of a Discovery Handler. -#[derive(PartialEq, Debug, Clone)] -pub enum DiscoveryHandlerStatus { - /// This discovery handler is currently doing discovery on behalf of the Agent - Active, - /// This discovery handler is available and waiting for a discover call from the Agent - Waiting, - /// Not returning discovery results - Offline(Instant), -} - /// Details about a `DiscoveryHandler` and a sender for terminating its clients when needed. #[derive(Debug, Clone)] pub struct DiscoveryDetails { @@ -67,8 +52,6 @@ pub struct DiscoveryDetails { /// Channel over which the Registration service tells a DiscoveryOperator client to close a connection with a /// `DiscoveryHandler`, if any. A broadcast channel is used so both the sending and receiving ends can be cloned. pub close_discovery_handler_connection: broadcast::Sender<()>, - /// Connection state of the `DiscoveryHandler`. - pub connectivity_status: DiscoveryHandlerStatus, } /// This maps the endpoint string and endpoint type of a `RegisterDiscoveryHandlerRequest` into a @@ -131,7 +114,6 @@ impl Registration for AgentRegistration { endpoint: dh_endpoint.clone(), shared: req.shared, close_discovery_handler_connection, - connectivity_status: DiscoveryHandlerStatus::Waiting, }; let mut registered_discovery_handlers = self.registered_discovery_handlers.lock().unwrap(); // Check if any DiscoveryHandlers have been registered under this name @@ -274,7 +256,6 @@ pub fn inner_register_embedded_discovery_handlers( endpoint: DiscoveryHandlerEndpoint::Embedded, shared, close_discovery_handler_connection, - connectivity_status: DiscoveryHandlerStatus::Waiting, }; let mut register_request_map = HashMap::new(); register_request_map.insert( diff --git a/controller/Cargo.toml b/controller/Cargo.toml index ace3edff1..cb70cd809 100644 --- a/controller/Cargo.toml +++ b/controller/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "controller" -version = "0.7.9" +version = "0.7.10" authors = [""] edition = "2018" diff --git a/deployment/helm/Chart.yaml b/deployment/helm/Chart.yaml index 9953867ff..f91445654 100644 --- a/deployment/helm/Chart.yaml +++ b/deployment/helm/Chart.yaml @@ -15,9 +15,9 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.7.9 +version: 0.7.10 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. -appVersion: 0.7.9 +appVersion: 0.7.10 diff --git a/discovery-handler-modules/debug-echo-discovery-handler/Cargo.toml b/discovery-handler-modules/debug-echo-discovery-handler/Cargo.toml index eb4f725b1..818ca18a8 100644 --- a/discovery-handler-modules/debug-echo-discovery-handler/Cargo.toml +++ b/discovery-handler-modules/debug-echo-discovery-handler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "debug-echo-discovery-handler" -version = "0.7.9" +version = "0.7.10" authors = ["Kate Goldenring "] edition = "2018" diff --git a/discovery-handler-modules/onvif-discovery-handler/Cargo.toml b/discovery-handler-modules/onvif-discovery-handler/Cargo.toml index 08c024681..c7f610f29 100644 --- a/discovery-handler-modules/onvif-discovery-handler/Cargo.toml +++ b/discovery-handler-modules/onvif-discovery-handler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "onvif-discovery-handler" -version = "0.7.9" +version = "0.7.10" authors = ["Kate Goldenring "] edition = "2018" diff --git a/discovery-handler-modules/opcua-discovery-handler/Cargo.toml b/discovery-handler-modules/opcua-discovery-handler/Cargo.toml index 0232993ee..c5fb3d0c9 100644 --- a/discovery-handler-modules/opcua-discovery-handler/Cargo.toml +++ b/discovery-handler-modules/opcua-discovery-handler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "opcua-discovery-handler" -version = "0.7.9" +version = "0.7.10" authors = ["Kate Goldenring "] edition = "2018" diff --git a/discovery-handler-modules/udev-discovery-handler/Cargo.toml b/discovery-handler-modules/udev-discovery-handler/Cargo.toml index 175af9264..b376c1f9b 100644 --- a/discovery-handler-modules/udev-discovery-handler/Cargo.toml +++ b/discovery-handler-modules/udev-discovery-handler/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "udev-discovery-handler" -version = "0.7.9" +version = "0.7.10" authors = ["Kate Goldenring "] edition = "2018" diff --git a/discovery-handlers/debug-echo/Cargo.toml b/discovery-handlers/debug-echo/Cargo.toml index de5aedde2..7ef4fa7a3 100644 --- a/discovery-handlers/debug-echo/Cargo.toml +++ b/discovery-handlers/debug-echo/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akri-debug-echo" -version = "0.7.9" +version = "0.7.10" authors = ["Kate Goldenring "] edition = "2018" diff --git a/discovery-handlers/onvif/Cargo.toml b/discovery-handlers/onvif/Cargo.toml index f8dd92116..a1c467bdc 100644 --- a/discovery-handlers/onvif/Cargo.toml +++ b/discovery-handlers/onvif/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akri-onvif" -version = "0.7.9" +version = "0.7.10" authors = ["Kate Goldenring "] edition = "2018" diff --git a/discovery-handlers/opcua/Cargo.toml b/discovery-handlers/opcua/Cargo.toml index 61a8d184d..c8a710197 100644 --- a/discovery-handlers/opcua/Cargo.toml +++ b/discovery-handlers/opcua/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akri-opcua" -version = "0.7.9" +version = "0.7.10" authors = ["Kate Goldenring "] edition = "2018" diff --git a/discovery-handlers/udev/Cargo.toml b/discovery-handlers/udev/Cargo.toml index 50919f2b6..5a9486350 100644 --- a/discovery-handlers/udev/Cargo.toml +++ b/discovery-handlers/udev/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akri-udev" -version = "0.7.9" +version = "0.7.10" authors = ["Kate Goldenring "] edition = "2018" diff --git a/discovery-utils/Cargo.toml b/discovery-utils/Cargo.toml index 6195fcdd3..63f1d990d 100644 --- a/discovery-utils/Cargo.toml +++ b/discovery-utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akri-discovery-utils" -version = "0.7.9" +version = "0.7.10" authors = ["Kate Goldenring "] edition = "2018" diff --git a/samples/brokers/udev-video-broker/Cargo.toml b/samples/brokers/udev-video-broker/Cargo.toml index 8bd46e158..99ee06f66 100644 --- a/samples/brokers/udev-video-broker/Cargo.toml +++ b/samples/brokers/udev-video-broker/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "udev-video-broker" -version = "0.7.9" +version = "0.7.10" authors = ["Kate Goldenring ", ""] edition = "2018" diff --git a/shared/Cargo.toml b/shared/Cargo.toml index a582c581e..96215869b 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akri-shared" -version = "0.7.9" +version = "0.7.10" authors = [""] edition = "2018" diff --git a/version.txt b/version.txt index 972ef76ad..5b209ea20 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.7.9 +0.7.10 diff --git a/webhooks/validating/configuration/Cargo.toml b/webhooks/validating/configuration/Cargo.toml index df2fb8acc..538e4a3ff 100644 --- a/webhooks/validating/configuration/Cargo.toml +++ b/webhooks/validating/configuration/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "webhook-configuration" -version = "0.7.9" +version = "0.7.10" authors = ["DazWilkin "] edition = "2018"