Skip to content
This repository has been archived by the owner on Dec 18, 2024. It is now read-only.

Commit

Permalink
Subscribe to wildcard
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaeling committed Oct 23, 2023
1 parent 258f349 commit 62d213a
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 41 deletions.
22 changes: 3 additions & 19 deletions kuksa_databroker/databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub struct Entry {
pub metadata: Metadata,
}

#[derive(Debug, PartialEq, Eq, Hash)]
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub enum Field {
Datapoint,
ActuatorTarget,
Expand Down Expand Up @@ -1328,21 +1328,8 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {

pub async fn subscribe(
&self,
entries: HashMap<String, HashSet<Field>>,
valid_entries: HashMap<i32, HashSet<Field>>,
) -> Result<impl Stream<Item = EntryUpdates>, SubscriptionError> {
let valid_entries = {
let mut valid_entries = HashMap::new();
for (path, fields) in entries {
match self.get_id_by_path(path.as_ref()).await {
Some(id) => {
valid_entries.insert(id, fields);
}
None => return Err(SubscriptionError::NotFound),
}
}
valid_entries
};

if valid_entries.is_empty() {
return Err(SubscriptionError::InvalidInput);
}
Expand Down Expand Up @@ -2803,10 +2790,7 @@ mod tests {
.expect("Register datapoint should succeed");

let mut stream = broker
.subscribe(HashMap::from([(
"test.datapoint1".into(),
HashSet::from([Field::Datapoint]),
)]))
.subscribe(HashMap::from([(id1, HashSet::from([Field::Datapoint]))]))
.await
.expect("subscription should succeed");

Expand Down
74 changes: 52 additions & 22 deletions kuksa_databroker/databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,31 +375,61 @@ impl proto::val_server::Val for broker::DataBroker {
));
}

let mut entries = HashMap::new();

for entry in request.entries {
let mut fields = HashSet::new();
for id in entry.fields {
match proto::Field::from_i32(id) {
Some(field) => match field {
proto::Field::Value => {
fields.insert(broker::Field::Datapoint);
}
proto::Field::ActuatorTarget => {
fields.insert(broker::Field::ActuatorTarget);
let mut valid_requests: HashMap<String, (regex::Regex, HashSet<broker::Field>)> =
HashMap::new();

for entry in &request.entries {
if entry.path.contains('*') && !glob::is_valid_pattern(&entry.path) {
tonic::Status::new(tonic::Code::InvalidArgument, "Invalid Pattern Argument");
continue;
}

let regex_exp = glob::to_regex(&entry.path);
if let Ok(regex) = regex_exp {
let mut fields = HashSet::new();
for id in &entry.fields {
if let Some(field) = proto::Field::from_i32(*id) {
match field {
proto::Field::Value => {
fields.insert(broker::Field::Datapoint);
}
proto::Field::ActuatorTarget => {
fields.insert(broker::Field::ActuatorTarget);
}
_ => {
// Just ignore other fields for now
}
}
_ => {
// Just ignore other fields for now
};
}
valid_requests.insert(entry.path.clone(), (regex, fields));
}
}

let mut entries: HashMap<i32, HashSet<broker::Field>> = HashMap::new();

if !valid_requests.is_empty() {
for (path, (regex, fields)) in valid_requests {
let mut requested_path_found = false;
broker
.for_each_entry(|entry| {
let entry_path = &entry.metadata().path;
if regex.is_match(entry_path) {
requested_path_found = true;
entries
.entry(entry.metadata().id)
.and_modify(|existing_fields| {
existing_fields.extend(fields.clone());
})
.or_insert(fields.clone());
}
},
None => {
return Err(tonic::Status::invalid_argument(format!(
"Invalid Field (id: {id})"
)))
}
};
})
.await;
if !requested_path_found {
let message = format!("No entries found for the provided path: {}", path);
return Err(tonic::Status::new(tonic::Code::NotFound, message));
}
}
entries.insert(entry.path.clone(), fields);
}

match broker.subscribe(entries).await {
Expand Down

0 comments on commit 62d213a

Please sign in to comment.