Skip to content

Commit

Permalink
Use 'cargo fmt' to fix format issues
Browse files Browse the repository at this point in the history
  • Loading branch information
wba2hi committed May 24, 2024
1 parent b1ed0db commit 9f0e361
Showing 1 changed file with 72 additions and 81 deletions.
153 changes: 72 additions & 81 deletions databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ use tokio::sync::mpsc;

use databroker_proto::kuksa::val::v1 as proto;
use databroker_proto::kuksa::val::v1::{DataEntryError, EntryUpdate};
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::Stream;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Response, Status, Streaming};
use tracing::debug;

use crate::broker;
use crate::broker::{AuthorizedAccess, EntryReadAccess};
use crate::broker::ReadError;
use crate::broker::SubscriptionError;
use crate::broker::{AuthorizedAccess, EntryReadAccess};
use crate::glob;
use crate::permissions::Permissions;

Expand Down Expand Up @@ -225,7 +225,6 @@ impl proto::val_server::Val for broker::DataBroker {

let broker = self.authorized_access(&permissions);


let entry_updates = request.into_inner().updates;

// Collect errors encountered
Expand All @@ -235,14 +234,10 @@ impl proto::val_server::Val for broker::DataBroker {
for request in entry_updates {
match &request.entry {
Some(entry) => match broker.get_id_by_path(&entry.path).await {
Some(id) => {
match validate_entry_update(&broker, &request, id).await {
Ok(pair) => {
updates.push((pair.first, pair.second))
}
Err(e) => { return Err(e) }
}
}
Some(id) => match validate_entry_update(&broker, &request, id).await {
Ok(pair) => updates.push((pair.first, pair.second)),
Err(e) => return Err(e),
},
None => {
let message = format!("{} not found", entry.path);
errors.push(proto::DataEntryError {
Expand Down Expand Up @@ -283,7 +278,8 @@ impl proto::val_server::Val for broker::DataBroker {
}))
}

type StreamedUpdateStream = ReceiverStream<Result<proto::StreamedUpdateResponse, tonic::Status>>;
type StreamedUpdateStream =
ReceiverStream<Result<proto::StreamedUpdateResponse, tonic::Status>>;

async fn streamed_update(
&self,
Expand Down Expand Up @@ -419,10 +415,10 @@ impl proto::val_server::Val for broker::DataBroker {

type SubscribeStream = Pin<
Box<
dyn Stream<Item=Result<proto::SubscribeResponse, tonic::Status>>
+ Send
+ Sync
+ 'static,
dyn Stream<Item = Result<proto::SubscribeResponse, tonic::Status>>
+ Send
+ Sync
+ 'static,
>,
>;

Expand Down Expand Up @@ -548,13 +544,16 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

async fn validate_entry_update(broker: &AuthorizedAccess<'_, '_>, request: &EntryUpdate, id: i32) -> Result<Pair<i32, broker::EntryUpdate>, Status> {
async fn validate_entry_update(
broker: &AuthorizedAccess<'_, '_>,
request: &EntryUpdate,
id: i32,
) -> Result<Pair<i32, broker::EntryUpdate>, Status> {
let entry = &request.entry.clone().unwrap();

let fields =
HashSet::<proto::Field>::from_iter(request.fields.iter().filter_map(
|id| proto::Field::from_i32(*id), // Ignore unknown fields for now
));
let fields = HashSet::<proto::Field>::from_iter(request.fields.iter().filter_map(
|id| proto::Field::from_i32(*id), // Ignore unknown fields for now
));

if entry.actuator_target.is_some() {
if let Some(metadata) = broker.get_metadata(id).await {
Expand All @@ -568,16 +567,11 @@ async fn validate_entry_update(broker: &AuthorizedAccess<'_, '_>, request: &Entr

let entry = match &request.entry {
Some(entry) => entry,
None => {
return Err(tonic::Status::invalid_argument(
"Empty entry".to_string(),
))
}
None => return Err(tonic::Status::invalid_argument("Empty entry".to_string())),

Check warning on line 570 in databroker/src/grpc/kuksa_val_v1/val.rs

View check run for this annotation

Codecov / codecov/patch

databroker/src/grpc/kuksa_val_v1/val.rs#L570

Added line #L570 was not covered by tests
};

debug!("Setting fields: {:?}", fields);
let update =
broker::EntryUpdate::from_proto_entry_and_fields(entry, fields);
let update = broker::EntryUpdate::from_proto_entry_and_fields(entry, fields);

let pair = Pair {
first: id,
Expand All @@ -587,60 +581,57 @@ async fn validate_entry_update(broker: &AuthorizedAccess<'_, '_>, request: &Entr
}

fn convert_to_data_entry_error(path: &String, error: &broker::UpdateError) -> DataEntryError {
match error {
broker::UpdateError::NotFound => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 404,
reason: String::from("not found"),
message: format!("no datapoint registered for path {path}"),
}),
},
broker::UpdateError::WrongType => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 400,
reason: String::from("type mismatch"),
message:
"cannot set existing datapoint to value of different type"
.to_string(),
}),
},
broker::UpdateError::UnsupportedType => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 400,
reason: String::from("unsupported type"),
message: "cannot set datapoint to value of unsupported type"
.to_string(),
}),
},
broker::UpdateError::OutOfBounds => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 400,
reason: String::from("value out of bounds"),
message: String::from("given value exceeds type's boundaries"),
}),
},
broker::UpdateError::PermissionDenied => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 403,
reason: String::from("forbidden"),
message: format!("Access was denied for {path}"),
}),
},
broker::UpdateError::PermissionExpired => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 401,
reason: String::from("unauthorized"),
message: String::from("Unauthorized"),
}),
},
}
match error {
broker::UpdateError::NotFound => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 404,
reason: String::from("not found"),
message: format!("no datapoint registered for path {path}"),
}),
},

Check warning on line 592 in databroker/src/grpc/kuksa_val_v1/val.rs

View check run for this annotation

Codecov / codecov/patch

databroker/src/grpc/kuksa_val_v1/val.rs#L585-L592

Added lines #L585 - L592 were not covered by tests
broker::UpdateError::WrongType => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 400,
reason: String::from("type mismatch"),
message: "cannot set existing datapoint to value of different type".to_string(),
}),
},
broker::UpdateError::UnsupportedType => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 400,
reason: String::from("unsupported type"),
message: "cannot set datapoint to value of unsupported type".to_string(),
}),
},

Check warning on line 608 in databroker/src/grpc/kuksa_val_v1/val.rs

View check run for this annotation

Codecov / codecov/patch

databroker/src/grpc/kuksa_val_v1/val.rs#L601-L608

Added lines #L601 - L608 were not covered by tests
broker::UpdateError::OutOfBounds => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 400,
reason: String::from("value out of bounds"),
message: String::from("given value exceeds type's boundaries"),
}),
},
broker::UpdateError::PermissionDenied => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 403,
reason: String::from("forbidden"),
message: format!("Access was denied for {path}"),
}),
},
broker::UpdateError::PermissionExpired => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 401,
reason: String::from("unauthorized"),
message: String::from("Unauthorized"),
}),
},

Check warning on line 632 in databroker/src/grpc/kuksa_val_v1/val.rs

View check run for this annotation

Codecov / codecov/patch

databroker/src/grpc/kuksa_val_v1/val.rs#L625-L632

Added lines #L625 - L632 were not covered by tests
}
}

fn convert_to_proto_stream(
input: impl Stream<Item = broker::EntryUpdates>,
Expand Down

0 comments on commit 9f0e361

Please sign in to comment.