From 10592bb43c76ac04ed1f1f31b7e6e4b826071aa9 Mon Sep 17 00:00:00 2001 From: Rafael RL Date: Tue, 13 Feb 2024 12:08:00 +0100 Subject: [PATCH] Keep original subscribe signature --- kuksa_databroker/databroker/src/broker.rs | 56 ++++++++++--------- .../databroker/src/grpc/kuksa_val_v1/val.rs | 8 +-- .../databroker/src/viss/v2/server.rs | 11 +--- 3 files changed, 34 insertions(+), 41 deletions(-) diff --git a/kuksa_databroker/databroker/src/broker.rs b/kuksa_databroker/databroker/src/broker.rs index ebeff474..4d69ef61 100644 --- a/kuksa_databroker/databroker/src/broker.rs +++ b/kuksa_databroker/databroker/src/broker.rs @@ -1592,7 +1592,7 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { pub async fn subscribe( &self, - valid_entries: HashMap, types::ChangeType)>, + valid_entries: HashMap>, ) -> Result, SubscriptionError> { if valid_entries.is_empty() { return Err(SubscriptionError::InvalidInput); @@ -1601,21 +1601,36 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { let mut entries_on_changed: HashMap> = HashMap::new(); let mut entries_continuous: HashMap> = HashMap::new(); - for (id, (fields, change_type)) in valid_entries { - match change_type { - types::ChangeType::OnChange => { - entries_on_changed - .entry(id) - .and_modify(|existing_fields| existing_fields.extend(fields.clone())) - .or_insert(fields.clone()); + let db_read = self.broker.database.read().await; + let db_read_access = db_read.authorized_read_access(self.permissions); + + for (id, fields) in valid_entries { + match db_read_access.get_entry_by_id(id) { + Ok(entry) => { + let change_type = entry.metadata.change_type.clone(); + match change_type { + types::ChangeType::OnChange => { + entries_on_changed + .entry(id) + .and_modify(|existing_fields| { + existing_fields.extend(fields.clone()) + }) + .or_insert(fields.clone()); + } + types::ChangeType::Continuous => { + entries_continuous + .entry(id) + .and_modify(|existing_fields| { + existing_fields.extend(fields.clone()) + }) + .or_insert(fields.clone()); + } + types::ChangeType::Static => {} + } } - types::ChangeType::Continuous => { - entries_continuous - .entry(id) - .and_modify(|existing_fields| existing_fields.extend(fields.clone())) - .or_insert(fields.clone()); + Err(_) => { + debug!("notify: could not find entry with id {}", id) } - types::ChangeType::Static => {} } } @@ -3145,19 +3160,8 @@ mod tests { .await .expect("Register datapoint should succeed"); - let my_hashmap: HashMap, types::ChangeType)> = [( - id1, - ( - HashSet::from([Field::Datapoint]), - types::ChangeType::OnChange, - ), - )] - .iter() - .cloned() - .collect(); - let mut stream = broker - .subscribe(my_hashmap) + .subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))])) .await .expect("subscription should succeed"); diff --git a/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs b/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs index 249b4d1f..72a8fa87 100644 --- a/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs +++ b/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs @@ -28,7 +28,6 @@ use crate::broker::ReadError; use crate::broker::SubscriptionError; use crate::glob; use crate::permissions::Permissions; -use crate::types; #[tonic::async_trait] impl proto::val_server::Val for broker::DataBroker { @@ -410,7 +409,7 @@ impl proto::val_server::Val for broker::DataBroker { } } - let mut entries: HashMap, types::ChangeType)> = HashMap::new(); + let mut entries: HashMap> = HashMap::new(); if !valid_requests.is_empty() { for (path, (regex, fields)) in valid_requests { @@ -424,10 +423,9 @@ impl proto::val_server::Val for broker::DataBroker { entries .entry(entry.metadata().id) .and_modify(|existing_fields| { - existing_fields.0.extend(fields.clone()); - existing_fields.1 = entry.metadata().change_type.clone(); + existing_fields.extend(fields.clone()); }) - .or_insert((fields.clone(), entry.metadata().change_type.clone())); + .or_insert(fields.clone()); match entry.datapoint() { Ok(_) => {} diff --git a/kuksa_databroker/databroker/src/viss/v2/server.rs b/kuksa_databroker/databroker/src/viss/v2/server.rs index 634b2746..01f96fc3 100644 --- a/kuksa_databroker/databroker/src/viss/v2/server.rs +++ b/kuksa_databroker/databroker/src/viss/v2/server.rs @@ -33,7 +33,6 @@ use crate::{ }; use super::{conversions, types::*}; -pub use crate::types::ChangeType; #[tonic::async_trait] pub(crate) trait Viss: Send + Sync + 'static { @@ -254,15 +253,7 @@ impl Viss for Server { let Some(entries) = broker .get_id_by_path(request.path.as_ref()) .await - .map(|id| { - HashMap::from([( - id, - ( - HashSet::from([broker::Field::Datapoint]), - ChangeType::Static, - ), - )]) - }) + .map(|id| HashMap::from([(id, HashSet::from([broker::Field::Datapoint]))])) else { return Err(SubscribeErrorResponse { request_id,