Skip to content

Commit

Permalink
agent: Fix a stop discovery channel issue
Browse files Browse the repository at this point in the history
The stop discovery channel is shared by all Configuration resources of a
given discovery handler endpoint, this channel was used without giving
any context of the targeted DiscoveryOperator, leading to a stop of
discovery for all Configurations when a Configuration is modified or
deleted.

This commit makes the stop discovery channel send the concerned
Configuration ID (namespace + name) on the channel, allowing to filter
out requests targeting another DiscoveryOperator, the channel is still
shared so the cases where we really want to stop discovery for all
Configurations still works.

Signed-off-by: Nicolas Belouin <[email protected]>
  • Loading branch information
diconico07 committed May 25, 2023
1 parent 1d8189b commit 460d454
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 26 deletions.
2 changes: 1 addition & 1 deletion agent/src/util/config_action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::{
};
use tokio::sync::{broadcast, mpsc, RwLock};

type ConfigId = (String, String);
pub type ConfigId = (String, String);
type ConfigMap = Arc<RwLock<HashMap<ConfigId, ConfigInfo>>>;

/// Information for managing a Configuration, such as all applied Instances of that Configuration
Expand Down
99 changes: 76 additions & 23 deletions agent/src/util/discovery_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::super::INSTANCE_COUNT_METRIC;
#[cfg(any(test, feature = "agent-full"))]
use super::embedded_discovery_handlers::get_discovery_handler;
use super::{
config_action::ConfigId,
constants::SHARED_INSTANCE_OFFLINE_GRACE_PERIOD_SECS,
device_plugin_builder::{DevicePluginBuilder, DevicePluginBuilderInterface},
device_plugin_service,
Expand Down Expand Up @@ -73,6 +74,13 @@ impl DiscoveryOperator {
instance_map,
}
}
fn get_config_id(&self) -> ConfigId {
(
self.config.metadata.namespace.clone().unwrap(),
self.config.metadata.name.clone().unwrap(),
)
}

/// Returns discovery_handler_map field. Allows the struct to be mocked.
#[allow(dead_code)]
pub fn get_discovery_handler_map(&self) -> RegisteredDiscoveryHandlerMap {
Expand All @@ -95,7 +103,7 @@ impl DiscoveryOperator {
discovery_handler_map.get_mut(&self.config.spec.discovery_handler.name)
{
for (endpoint, dh_details) in discovery_handler_details_map.clone() {
match dh_details.close_discovery_handler_connection.send(()) {
match dh_details.close_discovery_handler_connection.send(Some(self.get_config_id())) {
Ok(_) => trace!("stop_all_discovery - discovery client for {} discovery handler at endpoint {:?} told to stop", self.config.spec.discovery_handler.name, endpoint),
Err(e) => error!("stop_all_discovery - discovery client {} discovery handler at endpoint {:?} could not receive stop message with error {:?}", self.config.spec.discovery_handler.name, endpoint, e)
}
Expand Down Expand Up @@ -206,13 +214,25 @@ impl DiscoveryOperator {
) -> anyhow::Result<()> {
// clone objects for thread
let discovery_operator = Arc::new(self.clone());
let stop_discovery_receiver: &mut tokio::sync::broadcast::Receiver<()> =
let stop_discovery_receiver: &mut tokio::sync::broadcast::Receiver<Option<ConfigId>> =
&mut dh_details.close_discovery_handler_connection.subscribe();
loop {
// Wait for either new discovery results or a message to stop discovery
tokio::select! {
_ = stop_discovery_receiver.recv() => {
trace!("internal_do_discover - received message to stop discovery for endpoint {:?} serving protocol {}", dh_details.endpoint, discovery_operator.get_config().spec.discovery_handler.name);
result = stop_discovery_receiver.recv() => {
if let Ok(Some(config_id)) = result {
if config_id != self.get_config_id() {
trace!("internal_do_discover - received message to stop discovery for another configuration");
continue;
}
}
trace!(
"internal_do_discover({}::{}) - received message to stop discovery for endpoint {:?} serving protocol {}",
self.config.metadata.namespace.as_ref().unwrap(),
self.config.metadata.name.as_ref().unwrap(),
dh_details.endpoint,
discovery_operator.get_config().spec.discovery_handler.name,
);
break;
},
result = stream.get_message() => {
Expand Down Expand Up @@ -689,11 +709,21 @@ pub mod start_discovery {
sleep_duration = Duration::from_millis(100);
}

if tokio::time::timeout(sleep_duration, stop_discovery_receiver.recv())
.await
.is_ok()
if let Ok(result) =
tokio::time::timeout(sleep_duration, stop_discovery_receiver.recv()).await
{
trace!("do_discover_on_discovery_handler - received message to stop discovery for {} Discovery Handler at endpoint {:?}", dh_details.name, dh_details.endpoint);
if let Ok(Some(config_id)) = result {
if config_id != discovery_operator.get_config_id() {
trace!("do_discover_on_discovery_handler - received message to stop discovery for another configuration");
continue;
}
}
let (config_namespace, config_name) = discovery_operator.get_config_id();
trace!(
"do_discover_on_discovery_handler({}::{}) - received message to stop discovery for {} Discovery Handler at endpoint {:?}",
config_namespace, config_name,
dh_details.name, dh_details.endpoint,
);
break;
}
}
Expand Down Expand Up @@ -820,6 +850,10 @@ pub mod tests {
let ctx = MockDiscoveryOperator::new_context();
let discovery_handler_map_clone = discovery_handler_map.clone();
let config_clone = config.clone();
let config_id = (
config.metadata.namespace.clone().unwrap(),
config.metadata.namespace.clone().unwrap(),
);
let instance_map_clone = instance_map.clone();
ctx.expect().return_once(move |_, _, _| {
// let mut discovery_handler_status_seq = Sequence::new();
Expand All @@ -830,6 +864,8 @@ pub mod tests {
.returning(move || config_clone.clone());
mock.expect_get_instance_map()
.returning(move || instance_map_clone.clone());
mock.expect_get_config_id()
.returning(move || config_id.clone());
mock
});
MockDiscoveryOperator::new(discovery_handler_map, config, instance_map)
Expand All @@ -847,7 +883,9 @@ pub mod tests {
// Add discovery handler to registered discovery handler map
let dh_details_map = match registered_dh_map.lock().unwrap().clone().get_mut(dh_name) {
Some(dh_details_map) => {
dh_details_map.insert(endpoint.clone(), discovery_handler_details);
if !dh_details_map.contains_key(endpoint) {
dh_details_map.insert(endpoint.clone(), discovery_handler_details);
}
dh_details_map.clone()
}
None => {
Expand Down Expand Up @@ -878,8 +916,8 @@ pub mod tests {

fn setup_test_do_discover(
config_name: &str,
) -> (MockDiscoveryOperator, RegisteredDiscoveryHandlerMap) {
let discovery_handler_map = Arc::new(std::sync::Mutex::new(HashMap::new()));
discovery_handler_map: RegisteredDiscoveryHandlerMap,
) -> MockDiscoveryOperator {
add_discovery_handler_to_map(
"debugEcho",
&DiscoveryHandlerEndpoint::Uds("socket.sock".to_string()),
Expand All @@ -892,12 +930,12 @@ pub mod tests {
let config_yaml = std::fs::read_to_string(path_to_config).expect("Unable to read file");
let mut config: Configuration = serde_yaml::from_str(&config_yaml).unwrap();
config.metadata.name = Some(config_name.to_string());
let discovery_operator = create_mock_discovery_operator(
discovery_handler_map.clone(),
config.metadata.namespace = Some(config_name.to_string());
create_mock_discovery_operator(
discovery_handler_map,
config,
Arc::new(tokio::sync::RwLock::new(HashMap::new())),
);
(discovery_operator, discovery_handler_map)
)
}

#[test]
Expand Down Expand Up @@ -973,7 +1011,9 @@ pub mod tests {
#[tokio::test]
async fn test_start_discovery_termination() {
let _ = env_logger::builder().is_test(true).try_init();
let mut start_discovery_components = start_discovery_setup("config-a", true).await;
let discovery_handler_map = Arc::new(std::sync::Mutex::new(HashMap::new()));
let mut start_discovery_components =
start_discovery_setup("config-a", true, discovery_handler_map).await;
start_discovery_components
.running_receiver
.recv()
Expand All @@ -1000,8 +1040,11 @@ pub mod tests {
#[tokio::test]
async fn test_start_discovery_same_discovery_handler() {
let _ = env_logger::builder().is_test(true).try_init();
let mut start_discovery_components_a = start_discovery_setup("config-a", false).await;
let mut start_discovery_components_b = start_discovery_setup("config-b", false).await;
let discovery_handler_map = Arc::new(std::sync::Mutex::new(HashMap::new()));
let mut start_discovery_components_a =
start_discovery_setup("config-a", false, discovery_handler_map.clone()).await;
let mut start_discovery_components_b =
start_discovery_setup("config-b", false, discovery_handler_map.clone()).await;

start_discovery_components_a
.running_receiver
Expand All @@ -1022,16 +1065,21 @@ pub mod tests {
start_discovery_handle: tokio::task::JoinHandle<()>,
}

async fn start_discovery_setup(config_name: &str, terminate: bool) -> StartDiscoveryComponents {
let (mut mock_discovery_operator, discovery_handler_map) =
setup_test_do_discover(config_name);
async fn start_discovery_setup(
config_name: &str,
terminate: bool,
discovery_handler_map: RegisteredDiscoveryHandlerMap,
) -> StartDiscoveryComponents {
let mut mock_discovery_operator =
setup_test_do_discover(config_name, discovery_handler_map.clone());
let (running_sender, running_receiver) = tokio::sync::broadcast::channel::<()>(1);
mock_discovery_operator
.expect_get_stream()
.returning(move |_| {
running_sender.clone().send(()).unwrap();
None
});

mock_discovery_operator
.expect_delete_offline_instances()
.times(1)
Expand All @@ -1047,11 +1095,15 @@ pub mod tests {
.unwrap()
.clone()
.close_discovery_handler_connection;
let local_config_id = mock_discovery_operator.get_config_id();
mock_discovery_operator
.expect_stop_all_discovery()
.times(1)
.returning(move || {
stop_dh_discovery_sender.clone().send(()).unwrap();
stop_dh_discovery_sender
.clone()
.send(Some(local_config_id.clone()))
.unwrap();
});
}
let (mut finished_discovery_sender, finished_discovery_receiver) =
Expand Down Expand Up @@ -1083,7 +1135,8 @@ pub mod tests {
#[tokio::test]
async fn test_do_discover_completed_internal_connection() {
let _ = env_logger::builder().is_test(true).try_init();
let (mut mock_discovery_operator, _) = setup_test_do_discover("config-a");
let discovery_handler_map = Arc::new(std::sync::Mutex::new(HashMap::new()));
let mut mock_discovery_operator = setup_test_do_discover("config-a", discovery_handler_map);
let mut get_stream_seq = Sequence::new();
// First time cannot get stream
mock_discovery_operator
Expand Down
5 changes: 3 additions & 2 deletions agent/src/util/registration.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::config_action::ConfigId;
use super::constants::CLOSE_DISCOVERY_HANDLER_CONNECTION_CHANNEL_CAPACITY;
#[cfg(any(test, feature = "agent-full"))]
use super::constants::ENABLE_DEBUG_ECHO_LABEL;
Expand Down Expand Up @@ -51,7 +52,7 @@ pub struct DiscoveryDetails {
pub shared: bool,
/// Channel over which the Registration service tells a DiscoveryOperator client to close a connection with a
/// `DiscoveryHandler`, if any. A broadcast channel is used so both the sending and receiving ends can be cloned.
pub close_discovery_handler_connection: broadcast::Sender<()>,
pub close_discovery_handler_connection: broadcast::Sender<Option<ConfigId>>,
}

/// This maps the endpoint string and endpoint type of a `RegisterDiscoveryHandlerRequest` into a
Expand Down Expand Up @@ -126,7 +127,7 @@ impl Registration for AgentRegistration {
// 2) a connection cannot be made with the DH's endpoint
dh_details
.close_discovery_handler_connection
.send(())
.send(None)
.unwrap_or_default();
} else {
// Already registered. Return early.
Expand Down

0 comments on commit 460d454

Please sign in to comment.