Skip to content

Commit

Permalink
Databroker performance improvements on Get service call
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaeling committed May 6, 2024
1 parent c7950be commit bfd9521
Showing 1 changed file with 55 additions and 139 deletions.
194 changes: 55 additions & 139 deletions databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,134 +54,56 @@ impl proto::val_server::Val for broker::DataBroker {
} else {
let mut entries = Vec::new();
let mut errors = Vec::new();
/*
* valid_requests: A collection of valid requests, each represented as a tuple with five fields:
* - Regex: The regular expression created from the string path request.
* - Fields: A HashSet of proto::Field objects extracted from the request.
* - RequestPath: The original request path, used for error reporting when no entries match.
* - IsMatch: A boolean flag indicating whether the current request matches any entry.
* - Error: An optional ReadError representing a permission error that may occur when querying a valid path entry.
*/
let mut valid_requests: Vec<(
regex::Regex,
HashSet<proto::Field>,
String,
bool,
Option<ReadError>,
)> = Vec::new();

// Fill valid_requests structure.
for request in requested {
if request.path.contains('*') && !glob::is_valid_pattern(&request.path) {
errors.push(proto::DataEntryError {
path: request.path,
error: Some(proto::Error {
code: 400,
reason: "bad_request".to_owned(),
message: "Bad Wildcard Pattern Request".to_owned(),
}),
});
continue;
}

let view = proto::View::from_i32(request.view).ok_or_else(|| {
tonic::Status::invalid_argument(format!("Invalid View (id: {}", request.view))
})?;
let fields = HashSet::<proto::Field>::from_iter(request.fields.iter().filter_map(
|id| proto::Field::from_i32(*id), // Ignore unknown fields for now
));
let view_fields = combine_view_and_fields(view, fields);
debug!("Getting fields: {:?}", view_fields);

let regex_exp = glob::to_regex(&request.path);
match regex_exp {
Ok(value) => {
valid_requests.push((value, view_fields, request.path, false, None));
for request in requested {
match broker.get_entry_by_path(&request.path).await {
Ok(entry) => {
let view = proto::View::from_i32(request.view).ok_or_else(|| {
tonic::Status::invalid_argument(format!(
"Invalid View (id: {}",
request.view
))

Check warning on line 65 in databroker/src/grpc/kuksa_val_v1/val.rs

View check run for this annotation

Codecov / codecov/patch

databroker/src/grpc/kuksa_val_v1/val.rs#L62-L65

Added lines #L62 - L65 were not covered by tests
})?;
let fields =
HashSet::<proto::Field>::from_iter(request.fields.iter().filter_map(
|id| proto::Field::from_i32(*id), // Ignore unknown fields for now
));
let fields = combine_view_and_fields(view, fields);
debug!("Getting fields: {:?}", fields);
let proto_entry = proto_entry_from_entry_and_fields(entry, fields);
debug!("Getting datapoint: {:?}", proto_entry);
entries.push(proto_entry);
}
Err(_) => {
Err(ReadError::NotFound) => {
errors.push(proto::DataEntryError {
path: request.path,
error: Some(proto::Error {
code: 400,
reason: "bad regex".to_owned(),
message: "Regex can't be created for provided path".to_owned(),
code: 404,
reason: "not_found".to_owned(),
message: "Path not found".to_owned(),
}),
});
}
Err(ReadError::PermissionExpired) => {
errors.push(proto::DataEntryError {
path: request.path,
error: Some(proto::Error {
code: 401,
reason: "unauthorized".to_owned(),
message: "Authorization expired".to_owned(),
}),
});
}
Err(ReadError::PermissionDenied) => {
errors.push(proto::DataEntryError {
path: request.path,
error: Some(proto::Error {
code: 403,
reason: "forbidden".to_owned(),
message: "Permission denied".to_owned(),

Check warning on line 103 in databroker/src/grpc/kuksa_val_v1/val.rs

View check run for this annotation

Codecov / codecov/patch

databroker/src/grpc/kuksa_val_v1/val.rs#L87-L103

Added lines #L87 - L103 were not covered by tests
}),
});
}
}
}
if !valid_requests.is_empty() {
broker
.for_each_entry(|entry| {
let mut result_fields: HashSet<proto::Field> = HashSet::new();
for (regex, view_fields, _, is_match, op_error) in &mut valid_requests {
let path = &entry.metadata().path;
if regex.is_match(path) {
// Update the `is_match` to indicate a valid and used request path.
*is_match = true;
if view_fields.contains(&proto::Field::Metadata) {
result_fields.extend(view_fields.clone());
}
if view_fields.contains(&proto::Field::ActuatorTarget)
|| view_fields.contains(&proto::Field::Value)
{
match entry.datapoint() {
Ok(_) => {
// If the entry's path matches the regex and there is access permission,
// add the result fields to the current entry.
result_fields.extend(view_fields.clone());
}
Err(error) => {
//Propagate the error
*op_error = Some(error);
}
}
}
}
}

// If there are result fields, add them to the entries list.
if !result_fields.is_empty() {
let proto_entry =
proto_entry_from_entry_and_fields(entry, result_fields);
debug!("Getting datapoint: {:?}", proto_entry);
entries.push(proto_entry);
}
})
.await;
}

/*
* Handle Unmatched or Permission Errors
*
* After processing valid requests, this section iterates over the `valid_requests` vector
* to check if any requests didn't have matching entries or encountered permission errors.
*
* For each unmatched request, a "not_found" error message is added to the `errors` list.
* For requests with permission errors, a "forbidden" error message is added.
*/
for (_, _, path, is_match, error) in valid_requests {
if !is_match {
errors.push(proto::DataEntryError {
path: path.to_owned(),
error: Some(proto::Error {
code: 404,
reason: "not_found".to_owned(),
message: "No entries found for the provided path".to_owned(),
}),
});
} else if let Some(_error) = error {
// clear the entries vector since we only want to return rerrors
// and not partial success
entries.clear();
errors.push(proto::DataEntryError {
path: path.to_owned(),
error: Some(proto::Error {
code: 403,
reason: "forbidden".to_owned(),
message: "Permission denied for some entries".to_owned(),
}),
});
}
}

Expand Down Expand Up @@ -496,25 +418,19 @@ fn convert_to_proto_stream(
}

fn proto_entry_from_entry_and_fields(
entry: EntryReadAccess,
entry: broker::Entry,
fields: HashSet<proto::Field>,
) -> proto::DataEntry {
let path = entry.metadata().path.to_string();
let path = entry.metadata.path.to_string();
let value = if fields.contains(&proto::Field::Value) {
match entry.datapoint() {
Ok(value) => Option::<proto::Datapoint>::from(value.clone()),
Err(_) => None,
}
Option::<proto::Datapoint>::from(entry.datapoint)
} else {
None
};
let actuator_target = if fields.contains(&proto::Field::ActuatorTarget) {
match entry.actuator_target() {
Ok(value) => match value {
Some(value) => Option::<proto::Datapoint>::from(value.clone()),
None => None,
},
Err(_) => None,
match entry.actuator_target {
Some(actuator_target) => Option::<proto::Datapoint>::from(actuator_target),
None => None,
}
} else {
None
Expand All @@ -527,15 +443,15 @@ fn proto_entry_from_entry_and_fields(

if all || fields.contains(&proto::Field::MetadataDataType) {
metadata_is_set = true;
metadata.data_type = proto::DataType::from(entry.metadata().data_type.clone()) as i32;
metadata.data_type = proto::DataType::from(entry.metadata.data_type.clone()) as i32;
}
if all || fields.contains(&proto::Field::MetadataDescription) {
metadata_is_set = true;
metadata.description = Some(entry.metadata().description.clone());
metadata.description = Some(entry.metadata.description.clone());
}
if all || fields.contains(&proto::Field::MetadataEntryType) {
metadata_is_set = true;
metadata.entry_type = proto::EntryType::from(&entry.metadata().entry_type) as i32;
metadata.entry_type = proto::EntryType::from(&entry.metadata.entry_type) as i32;
}
if all || fields.contains(&proto::Field::MetadataComment) {
metadata_is_set = true;
Expand All @@ -549,7 +465,7 @@ fn proto_entry_from_entry_and_fields(
}
if all || fields.contains(&proto::Field::MetadataUnit) {
metadata_is_set = true;
metadata.unit = entry.metadata().unit.clone();
metadata.unit = entry.metadata.unit.clone();
}
if all || fields.contains(&proto::Field::MetadataValueRestriction) {
metadata_is_set = true;
Expand All @@ -558,7 +474,7 @@ fn proto_entry_from_entry_and_fields(
if all || fields.contains(&proto::Field::MetadataActuator) {
metadata_is_set = true;
// TODO: Add to Metadata
metadata.entry_specific = match entry.metadata().entry_type {
metadata.entry_specific = match entry.metadata.entry_type {
broker::EntryType::Actuator => {
// Some(proto::metadata::EntrySpecific::Actuator(
// proto::Actuator::default(),
Expand All @@ -571,7 +487,7 @@ fn proto_entry_from_entry_and_fields(
if all || fields.contains(&proto::Field::MetadataSensor) {
metadata_is_set = true;
// TODO: Add to Metadata
metadata.entry_specific = match entry.metadata().entry_type {
metadata.entry_specific = match entry.metadata.entry_type {
broker::EntryType::Sensor => {
// Some(proto::metadata::EntrySpecific::Sensor(
// proto::Sensor::default(),
Expand All @@ -584,7 +500,7 @@ fn proto_entry_from_entry_and_fields(
if all || fields.contains(&proto::Field::MetadataAttribute) {
metadata_is_set = true;
// TODO: Add to Metadata
metadata.entry_specific = match entry.metadata().entry_type {
metadata.entry_specific = match entry.metadata.entry_type {
broker::EntryType::Attribute => {
// Some(proto::metadata::EntrySpecific::Attribute(
// proto::Attribute::default(),
Expand Down

0 comments on commit bfd9521

Please sign in to comment.