diff --git a/kuksa_databroker/databroker/src/broker.rs b/kuksa_databroker/databroker/src/broker.rs index 9ba3091d..4c96721c 100644 --- a/kuksa_databroker/databroker/src/broker.rs +++ b/kuksa_databroker/databroker/src/broker.rs @@ -79,7 +79,7 @@ pub struct Entry { pub metadata: Metadata, } -#[derive(Debug, PartialEq, Eq, Hash)] +#[derive(Debug, PartialEq, Eq, Hash, Clone)] pub enum Field { Datapoint, ActuatorTarget, @@ -1328,21 +1328,8 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> { pub async fn subscribe( &self, - entries: HashMap>, + valid_entries: HashMap>, ) -> Result, SubscriptionError> { - let valid_entries = { - let mut valid_entries = HashMap::new(); - for (path, fields) in entries { - match self.get_id_by_path(path.as_ref()).await { - Some(id) => { - valid_entries.insert(id, fields); - } - None => return Err(SubscriptionError::NotFound), - } - } - valid_entries - }; - if valid_entries.is_empty() { return Err(SubscriptionError::InvalidInput); } @@ -2803,10 +2790,7 @@ mod tests { .expect("Register datapoint should succeed"); let mut stream = broker - .subscribe(HashMap::from([( - "test.datapoint1".into(), - HashSet::from([Field::Datapoint]), - )])) + .subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))])) .await .expect("subscription should succeed"); diff --git a/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs b/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs index 28637fca..d49bd478 100644 --- a/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs +++ b/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs @@ -375,31 +375,61 @@ impl proto::val_server::Val for broker::DataBroker { )); } - let mut entries = HashMap::new(); - - for entry in request.entries { - let mut fields = HashSet::new(); - for id in entry.fields { - match proto::Field::from_i32(id) { - Some(field) => match field { - proto::Field::Value => { - fields.insert(broker::Field::Datapoint); - } - proto::Field::ActuatorTarget => { - fields.insert(broker::Field::ActuatorTarget); + let mut valid_requests: HashMap)> = + HashMap::new(); + + for entry in &request.entries { + if entry.path.contains('*') && !glob::is_valid_pattern(&entry.path) { + tonic::Status::new(tonic::Code::InvalidArgument, "Invalid Pattern Argument"); + continue; + } + + let regex_exp = glob::to_regex(&entry.path); + if let Ok(regex) = regex_exp { + let mut fields = HashSet::new(); + for id in &entry.fields { + if let Some(field) = proto::Field::from_i32(*id) { + match field { + proto::Field::Value => { + fields.insert(broker::Field::Datapoint); + } + proto::Field::ActuatorTarget => { + fields.insert(broker::Field::ActuatorTarget); + } + _ => { + // Just ignore other fields for now + } } - _ => { - // Just ignore other fields for now + }; + } + valid_requests.insert(entry.path.clone(), (regex, fields)); + } + } + + let mut entries: HashMap> = HashMap::new(); + + if !valid_requests.is_empty() { + for (path, (regex, fields)) in valid_requests { + let mut requested_path_found = false; + broker + .for_each_entry(|entry| { + let entry_path = &entry.metadata().path; + if regex.is_match(entry_path) { + requested_path_found = true; + entries + .entry(entry.metadata().id) + .and_modify(|existing_fields| { + existing_fields.extend(fields.clone()); + }) + .or_insert(fields.clone()); } - }, - None => { - return Err(tonic::Status::invalid_argument(format!( - "Invalid Field (id: {id})" - ))) - } - }; + }) + .await; + if !requested_path_found { + let message = format!("No entries found for the provided path: {}", path); + return Err(tonic::Status::new(tonic::Code::NotFound, message)); + } } - entries.insert(entry.path.clone(), fields); } match broker.subscribe(entries).await {