Skip to content

Commit

Permalink
Fix broker properties not getting updated on Configuration change
Browse files Browse the repository at this point in the history
Move the cdi::Kind abstraction so that updates to broker properties can
be reflected here.

On Configuration reconciliation if request already exists, update broker
properties

Fix #705

Signed-off-by: Nicolas Belouin <[email protected]>
  • Loading branch information
diconico07 committed Oct 29, 2024
1 parent ea21796 commit ac992ad
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 68 deletions.
141 changes: 84 additions & 57 deletions agent/src/discovery_handler_manager/discovery_handler_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ pub trait DiscoveryHandlerEndpoint: Send + Sync {
/// results across the different registered handlers of that type, and generate the Instance objects for discovered
/// devices.
#[cfg_attr(test, automock)]
#[async_trait]
pub trait DiscoveryHandlerRequest: Sync + Send {
fn get_instances(&self) -> Result<Vec<Instance>, DiscoveryError>;
async fn get_instances(&self) -> Result<Vec<Instance>, DiscoveryError>;
async fn set_extra_device_properties(&self, extra_device_properties: HashMap<String, String>);
}

/// This trait is here to help with testing for code that interract with the discovery handler registry
Expand Down Expand Up @@ -169,35 +171,60 @@ pub trait DiscoveryHandlerRegistry: Sync + Send {
/// Real world implementation of the Discovery Handler Request
struct DHRequestImpl {
endpoints: RwLock<Vec<watch::Receiver<Vec<Arc<DiscoveredDevice>>>>>,
notifier: watch::Sender<Vec<Arc<DiscoveredDevice>>>,
notifier: watch::Sender<crate::device_manager::cdi::Kind>,
key: String,
handler_name: String,
details: String,
properties: Vec<DiscoveryProperty>,
extra_device_properties: HashMap<String, String>,
extra_device_properties: RwLock<HashMap<String, String>>,
kube_client: Arc<dyn DiscoveryManagerKubeInterface>,
termination_notifier: Arc<Notify>,
}

#[async_trait]
impl DiscoveryHandlerRequest for DHRequestImpl {
fn get_instances(&self) -> Result<Vec<Instance>, DiscoveryError> {
async fn get_instances(&self) -> Result<Vec<Instance>, DiscoveryError> {
let properties = self.extra_device_properties.read().await;
Ok(self
.notifier
.borrow()
.endpoints
.read()
.await
.iter()
.map(|i| self.device_to_instance(i))
.flat_map(|r| r.borrow().clone().into_iter())
.map(|i| self.device_to_instance(i.as_ref(), &properties))
.collect())
}

async fn set_extra_device_properties(&self, extra_device_properties: HashMap<String, String>) {
let mut current = self.extra_device_properties.write().await;
if extra_device_properties != *current {
let edit = extra_device_properties
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect();
*current = extra_device_properties;
self.notifier
.send_modify(|k| k.container_edits.first_mut().unwrap().env = edit);
}
}
}

impl DHRequestImpl {
fn device_to_instance(&self, dev: &DiscoveredDevice) -> Instance {
fn device_to_instance(
&self,
dev: &DiscoveredDevice,
extra_device_properties: &HashMap<String, String>,
) -> Instance {
let (rdev, shared) = match dev {
DiscoveredDevice::LocalDevice(d, _) => (d, false),
DiscoveredDevice::SharedDevice(d) => (d, true),
};
let mut properties = rdev.properties.clone();
properties.extend(self.extra_device_properties.clone());
properties.extend(
extra_device_properties
.iter()
.map(|(k, v)| (k.clone(), v.clone())),
);
Instance {
spec: InstanceSpec {
cdi_name: self.get_device_cdi_fqdn(dev),
Expand Down Expand Up @@ -255,13 +282,27 @@ impl DHRequestImpl {
.await
.iter_mut()
.flat_map(|r| r.borrow_and_update().clone().into_iter())
.unique_by(|d| self.get_device_cdi_fqdn(d))
.collect();
self.notifier.send_replace(
devices
.into_iter()
.unique_by(|d| self.get_device_cdi_fqdn(d))
.collect(),
);
self.notifier
.send_replace(crate::device_manager::cdi::Kind {
kind: format!("{}/{}", AKRI_PREFIX, self.key),
annotations: Default::default(),
devices: devices
.into_iter()
.map(|d| d.as_ref().clone().into())
.collect(),
container_edits: vec![ContainerEdit {
env: self
.extra_device_properties
.read()
.await
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect(),
..Default::default()
}],
});
}
}

Expand Down Expand Up @@ -324,41 +365,23 @@ impl DHRegistryImpl {
}

async fn handle_request(
mut req_notifier: watch::Receiver<Vec<Arc<DiscoveredDevice>>>,
key: String,
mut req_notifier: watch::Receiver<crate::device_manager::cdi::Kind>,
key: &String,
namespace: &String,
cdi_sender: Arc<Mutex<watch::Sender<HashMap<String, crate::device_manager::cdi::Kind>>>>,
local_config_sender: mpsc::Sender<ObjectRef<Configuration>>,
extra_device_properties: HashMap<String, String>,
) {
let cdi_kind = format!("{}/{}", AKRI_PREFIX, key);
loop {
match req_notifier.changed().await {
Ok(_) => {
cdi_sender.lock().await.send_modify(|kind| {
kind.insert(
cdi_kind.clone(),
crate::device_manager::cdi::Kind {
kind: cdi_kind.clone(),
annotations: Default::default(),
devices: req_notifier
.borrow_and_update()
.iter()
.map(|d| d.as_ref().clone().into())
.collect(),
container_edits: vec![ContainerEdit {
env: extra_device_properties
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect(),
..Default::default()
}],
},
);
let kind = req_notifier.borrow_and_update().clone();
cdi_sender.lock().await.send_modify(|kinds| {
kinds.insert(cdi_kind.clone(), kind);
});
trace!("Ask for reconciliation of {}::{}", namespace, key);
let res = local_config_sender
.send(ObjectRef::<Configuration>::new(&key).within(namespace))
.send(ObjectRef::<Configuration>::new(key).within(namespace))
.await;
if res.is_err() {
cdi_sender.lock().await.send_modify(|kind| {
Expand All @@ -370,7 +393,7 @@ async fn handle_request(
Err(_) => {
trace!("Ask for reconciliation of {}::{}", namespace, key);
let _ = local_config_sender
.send(ObjectRef::<Configuration>::new(&key).within(namespace))
.send(ObjectRef::<Configuration>::new(key).within(namespace))
.await;
cdi_sender.lock().await.send_modify(|kind| {
kind.remove(&cdi_kind);
Expand All @@ -394,7 +417,7 @@ impl DiscoveryHandlerRegistry for DHRegistryImpl {
) -> Result<(), DiscoveryError> {
match self.handlers.read().await.get(dh_name) {
Some(handlers) => {
let (notifier, _) = watch::channel(vec![]);
let (notifier, _) = watch::channel(Default::default());
let terminated = Arc::new(Notify::new());
let mut dh_req = DHRequestImpl {
endpoints: Default::default(),
Expand All @@ -403,7 +426,7 @@ impl DiscoveryHandlerRegistry for DHRegistryImpl {
handler_name: dh_name.to_string(),
details: dh_details.to_string(),
properties: dh_properties.to_vec(),
extra_device_properties: extra_device_properties.clone(),
extra_device_properties: RwLock::new(extra_device_properties),
kube_client: self.kube_client.clone(),
termination_notifier: terminated.clone(),
};
Expand Down Expand Up @@ -433,11 +456,10 @@ impl DiscoveryHandlerRegistry for DHRegistryImpl {
tokio::spawn(async move {
handle_request(
local_req_notifier,
local_key,
&local_key,
&namespace,
local_cdi_sender,
local_config_sender,
extra_device_properties,
)
.await
});
Expand Down Expand Up @@ -608,9 +630,9 @@ mod tests {
);
}

#[test]
fn test_dh_request_impl_get_instances() {
let (notifier, _) = watch::channel(vec![Arc::new(DiscoveredDevice::LocalDevice(
#[tokio::test]
async fn test_dh_request_impl_get_instances() {
let (_, notifier) = watch::channel(vec![Arc::new(DiscoveredDevice::LocalDevice(
Device {
id: "my_local_device".to_owned(),
properties: HashMap::from([(
Expand All @@ -622,23 +644,25 @@ mod tests {
},
"my_node".to_owned(),
))]);
let endpoints = RwLock::new(vec![notifier]);
let (cdi_notifier, _) = watch::channel(Default::default());
let req = DHRequestImpl {
endpoints: Default::default(),
notifier,
endpoints,
notifier: cdi_notifier,
key: "my_config".to_owned(),
handler_name: "mock_handler".to_string(),
details: Default::default(),
properties: Default::default(),
extra_device_properties: HashMap::from([(
extra_device_properties: RwLock::new(HashMap::from([(
"MY_EXTRA_KEY".to_owned(),
"value".to_owned(),
)]),
)])),
kube_client: Arc::new(MockDiscoveryManagerKubeInterface::new()),
termination_notifier: Arc::new(Notify::new()),
};

assert_eq!(
req.get_instances().unwrap(),
req.get_instances().await.unwrap(),
vec![Instance {
metadata: ObjectMeta {
name: Some("my_config-e77db4".to_owned()),
Expand All @@ -662,7 +686,7 @@ mod tests {

#[tokio::test]
async fn test_dh_request_impl_watch_devices() {
let (notifier, mut n_rec) = watch::channel(vec![]);
let (notifier, mut n_rec) = watch::channel(Default::default());
let (dh_send, dh_rec) = watch::channel(Default::default());
let req = Arc::new(DHRequestImpl {
endpoints: RwLock::new(vec![dh_rec]),
Expand All @@ -675,10 +699,10 @@ mod tests {
value: Some("value_1".to_string()),
value_from: None,
}],
extra_device_properties: HashMap::from([(
extra_device_properties: RwLock::new(HashMap::from([(
"MY_EXTRA_KEY".to_owned(),
"value".to_owned(),
)]),
)])),
kube_client: Arc::new(MockDiscoveryManagerKubeInterface::new()),
termination_notifier: Arc::new(Notify::new()),
});
Expand All @@ -687,7 +711,7 @@ mod tests {
let (new_dh_sen, rec) = broadcast::channel(1);

let task = tokio::spawn(async move { req_ref.watch_devices(rec).await });
assert!(n_rec.borrow_and_update().is_empty());
assert!(n_rec.borrow_and_update().devices.is_empty());

let new_device = Arc::new(DiscoveredDevice::SharedDevice(Device {
id: "my_shared_device".to_owned(),
Expand All @@ -698,7 +722,10 @@ mod tests {
dh_send.send(vec![new_device.clone()]).unwrap();

tokio::time::sleep(Duration::from_millis(500)).await;
assert_eq!(n_rec.borrow_and_update().clone(), vec![new_device]);
assert_eq!(
n_rec.borrow_and_update().devices.clone(),
vec![new_device.as_ref().clone().into()]
);

let mut new_dh = MockDiscoveryHandlerEndpoint::new();
let new_dh_senders = Arc::new(std::sync::Mutex::new(vec![]));
Expand Down
29 changes: 18 additions & 11 deletions agent/src/util/discovery_configuration_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,21 @@ pub async fn reconcile(

let discovered_instances: Vec<Instance> =
match ctx.dh_registry.get_request(&dc.name_any()).await {
Some(req) => req
.get_instances()?
.into_iter()
.map(|mut instance| {
// Add
instance.spec.nodes = vec![ctx.agent_identifier.to_owned()];
instance.owner_references_mut().push(owner_ref.clone());
instance.spec.capacity = dc.spec.capacity;
instance
})
.collect(),
Some(req) => {
req.set_extra_device_properties(dc.spec.broker_properties.clone())
.await;
req.get_instances()
.await?
.into_iter()
.map(|mut instance| {
// Add
instance.spec.nodes = vec![ctx.agent_identifier.to_owned()];
instance.owner_references_mut().push(owner_ref.clone());
instance.spec.capacity = dc.spec.capacity;
instance
})
.collect()
}
None => {
ctx.dh_registry
.new_request(
Expand Down Expand Up @@ -474,6 +478,9 @@ mod tests {

let mut registry = MockDiscoveryHandlerRegistry::new();
let mut request = MockDiscoveryHandlerRequest::new();
request
.expect_set_extra_device_properties()
.returning(|_| {});
request.expect_get_instances().returning(|| Ok(vec![]));
registry
.expect_get_request()
Expand Down

0 comments on commit ac992ad

Please sign in to comment.