Skip to content

Commit

Permalink
Add StreamedUpdate API
Browse files Browse the repository at this point in the history
  • Loading branch information
wba2hi committed May 23, 2024
1 parent 82513ea commit b1ed0db
Show file tree
Hide file tree
Showing 2 changed files with 264 additions and 86 deletions.
339 changes: 253 additions & 86 deletions databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,29 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::iter::FromIterator;
use std::pin::Pin;
use tokio::select;
use tokio::sync::mpsc;

use databroker_proto::kuksa::val::v1 as proto;
use databroker_proto::kuksa::val::v1::DataEntryError;
use databroker_proto::kuksa::val::v1::{DataEntryError, EntryUpdate};
use tokio_stream::Stream;
use tokio_stream::StreamExt;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Response, Status, Streaming};
use tracing::debug;

use crate::broker;
use crate::broker::EntryReadAccess;
use crate::broker::{AuthorizedAccess, EntryReadAccess};
use crate::broker::ReadError;
use crate::broker::SubscriptionError;
use crate::glob;
use crate::permissions::Permissions;

struct Pair<F, S> {
first: F,
second: S,
}

#[tonic::async_trait]
impl proto::val_server::Val for broker::DataBroker {
async fn get(
Expand Down Expand Up @@ -216,41 +225,23 @@ impl proto::val_server::Val for broker::DataBroker {

let broker = self.authorized_access(&permissions);


let entry_updates = request.into_inner().updates;

// Collect errors encountered
let mut errors = Vec::<DataEntryError>::new();

let mut updates = Vec::<(i32, broker::EntryUpdate)>::new();
for request in request.into_inner().updates {

for request in entry_updates {
match &request.entry {
Some(entry) => match broker.get_id_by_path(&entry.path).await {
Some(id) => {
let fields =
HashSet::<proto::Field>::from_iter(request.fields.iter().filter_map(
|id| proto::Field::from_i32(*id), // Ignore unknown fields for now
));

if entry.actuator_target.is_some() {
if let Some(metadata) = broker.get_metadata(id).await {
if metadata.entry_type != broker::EntryType::Actuator {
return Err(tonic::Status::invalid_argument(
"Tried to set a target value for a non-actuator. Non-actuators have no target value.".to_string(),
));
}
match validate_entry_update(&broker, &request, id).await {
Ok(pair) => {
updates.push((pair.first, pair.second))
}
Err(e) => { return Err(e) }
}

let entry = match &request.entry {
Some(entry) => entry,
None => {
return Err(tonic::Status::invalid_argument(
"Empty entry".to_string(),
))
}
};
debug!("Settings fields: {:?}", fields);
let update =
broker::EntryUpdate::from_proto_entry_and_fields(entry, fields);
updates.push((id, update));
}
None => {
let message = format!("{} not found", entry.path);
Expand Down Expand Up @@ -279,59 +270,7 @@ impl proto::val_server::Val for broker::DataBroker {
for (id, error) in err.into_iter() {
if let Some(metadata) = broker.get_metadata(id).await {
let path = metadata.path.clone();
let data_entry_error = match error {
broker::UpdateError::NotFound => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 404,
reason: String::from("not found"),
message: format!("no datapoint registered for path {path}"),
}),
},
broker::UpdateError::WrongType => DataEntryError {
path,
error: Some(proto::Error {
code: 400,
reason: String::from("type mismatch"),
message:
"cannot set existing datapoint to value of different type"
.to_string(),
}),
},
broker::UpdateError::UnsupportedType => DataEntryError {
path,
error: Some(proto::Error {
code: 400,
reason: String::from("unsupported type"),
message: "cannot set datapoint to value of unsupported type"
.to_string(),
}),
},
broker::UpdateError::OutOfBounds => DataEntryError {
path,
error: Some(proto::Error {
code: 400,
reason: String::from("value out of bounds"),
message: String::from("given value exceeds type's boundaries"),
}),
},
broker::UpdateError::PermissionDenied => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 403,
reason: String::from("forbidden"),
message: format!("Access was denied for {path}"),
}),
},
broker::UpdateError::PermissionExpired => DataEntryError {
path,
error: Some(proto::Error {
code: 401,
reason: String::from("unauthorized"),
message: String::from("Unauthorized"),
}),
},
};
let data_entry_error = convert_to_data_entry_error(&path, &error);
errors.push(data_entry_error);
}
}
Expand All @@ -344,12 +283,146 @@ impl proto::val_server::Val for broker::DataBroker {
}))
}

type StreamedUpdateStream = ReceiverStream<Result<proto::StreamedUpdateResponse, tonic::Status>>;

async fn streamed_update(
&self,
request: tonic::Request<Streaming<proto::StreamedUpdateRequest>>,
) -> Result<tonic::Response<Self::StreamedUpdateStream>, tonic::Status> {
debug!(?request);
let permissions = match request.extensions().get::<Permissions>() {
Some(permissions) => {
debug!(?permissions);
permissions.clone()
}
None => return Err(tonic::Status::unauthenticated("Unauthenticated")),
};
let mut stream = request.into_inner();

let mut shutdown_trigger = self.get_shutdown_trigger();

// Copy (to move into task below)
let broker = self.clone();

// Create stream (to be returned)
let (sender, receiver) = mpsc::channel(10);
// Listening on stream
tokio::spawn(async move {
let permissions = permissions;
let broker = broker.authorized_access(&permissions);
loop {
select! {
message = stream.message() => {
match message {
Ok(request) => {
match request {
Some(req) => {
let entry_updates = req.updates;

// Collect errors encountered
let mut errors = Vec::<DataEntryError>::new();
let mut updates = Vec::<(i32, broker::EntryUpdate)>::new();

for request in entry_updates {
match &request.entry {
Some(entry) => match broker.get_id_by_path(&entry.path).await {
Some(id) => {
match validate_entry_update(&broker, &request, id).await {
Ok(pair) => {
updates.push((pair.first, pair.second));
}
Err(e) => {
let message = format!("Data present in the request is invalid: {}", e.message());
errors.push(proto::DataEntryError {
path: entry.path.clone(),
error: Some(proto::Error {
code: 400,
reason: "invalid_data".to_string(),
message,
})
})
}
}
}
None => {
let message = format!("{} not found", entry.path);
errors.push(proto::DataEntryError {
path: entry.path.clone(),
error: Some(proto::Error {
code: 404,
reason: "not_found".to_string(),
message,
})
})
}
},
None => {
errors.push(proto::DataEntryError {
path: "".to_string(),
error: Some(proto::Error {
code: 400,
reason: "invalid_data".to_string(),
message: "Data present in the request is invalid: Path is required".to_string()
})
})
}
}
}

match broker.update_entries(updates).await {
Ok(_) => {}
Err(err) => {
debug!("Failed to set datapoint: {:?}", err);
for (id, error) in err.into_iter() {
if let Some(metadata) = broker.get_metadata(id).await {
let path = metadata.path.clone();
let data_entry_error = convert_to_data_entry_error(&path, &error);
errors.push(data_entry_error);
}
}

if let Err(err) = sender.send(
Ok(proto::StreamedUpdateResponse {
errors,
error: None, // TODO take first element of errors
})
).await {
debug!("Failed to send errors: {}", err);
}
}
}

}
None => {
debug!("provider: no more messages");
break;
}
}
},
Err(err) => {
debug!("provider: connection broken: {:?}", err);
break;
},
}
},
_ = shutdown_trigger.recv() => {
debug!("provider: shutdown received");
break;
}
}
}
});

// Return the stream
Ok(Response::new(ReceiverStream::new(receiver)))
}

Check warning on line 418 in databroker/src/grpc/kuksa_val_v1/val.rs

View check run for this annotation

Codecov / codecov/patch

databroker/src/grpc/kuksa_val_v1/val.rs#L291-L418

Added lines #L291 - L418 were not covered by tests

type SubscribeStream = Pin<
Box<
dyn Stream<Item = Result<proto::SubscribeResponse, tonic::Status>>
+ Send
+ Sync
+ 'static,
dyn Stream<Item=Result<proto::SubscribeResponse, tonic::Status>>
+ Send
+ Sync
+ 'static,
>,
>;

Expand Down Expand Up @@ -475,6 +548,100 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

async fn validate_entry_update(broker: &AuthorizedAccess<'_, '_>, request: &EntryUpdate, id: i32) -> Result<Pair<i32, broker::EntryUpdate>, Status> {
let entry = &request.entry.clone().unwrap();

let fields =
HashSet::<proto::Field>::from_iter(request.fields.iter().filter_map(
|id| proto::Field::from_i32(*id), // Ignore unknown fields for now
));

if entry.actuator_target.is_some() {
if let Some(metadata) = broker.get_metadata(id).await {
if metadata.entry_type != broker::EntryType::Actuator {
return Err(tonic::Status::invalid_argument(
"Tried to set a target value for a non-actuator. Non-actuators have no target value.".to_string(),
));
}
}

Check warning on line 566 in databroker/src/grpc/kuksa_val_v1/val.rs

View check run for this annotation

Codecov / codecov/patch

databroker/src/grpc/kuksa_val_v1/val.rs#L566

Added line #L566 was not covered by tests
}

let entry = match &request.entry {
Some(entry) => entry,
None => {
return Err(tonic::Status::invalid_argument(
"Empty entry".to_string(),
))

Check warning on line 574 in databroker/src/grpc/kuksa_val_v1/val.rs

View check run for this annotation

Codecov / codecov/patch

databroker/src/grpc/kuksa_val_v1/val.rs#L572-L574

Added lines #L572 - L574 were not covered by tests
}
};

debug!("Setting fields: {:?}", fields);
let update =
broker::EntryUpdate::from_proto_entry_and_fields(entry, fields);

let pair = Pair {
first: id,
second: update,
};
Ok(pair)
}

fn convert_to_data_entry_error(path: &String, error: &broker::UpdateError) -> DataEntryError {
match error {
broker::UpdateError::NotFound => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 404,
reason: String::from("not found"),
message: format!("no datapoint registered for path {path}"),
}),
},

Check warning on line 598 in databroker/src/grpc/kuksa_val_v1/val.rs

View check run for this annotation

Codecov / codecov/patch

databroker/src/grpc/kuksa_val_v1/val.rs#L591-L598

Added lines #L591 - L598 were not covered by tests
broker::UpdateError::WrongType => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 400,
reason: String::from("type mismatch"),
message:
"cannot set existing datapoint to value of different type"
.to_string(),
}),
},
broker::UpdateError::UnsupportedType => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 400,
reason: String::from("unsupported type"),
message: "cannot set datapoint to value of unsupported type"
.to_string(),
}),
},

Check warning on line 617 in databroker/src/grpc/kuksa_val_v1/val.rs

View check run for this annotation

Codecov / codecov/patch

databroker/src/grpc/kuksa_val_v1/val.rs#L609-L617

Added lines #L609 - L617 were not covered by tests
broker::UpdateError::OutOfBounds => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 400,
reason: String::from("value out of bounds"),
message: String::from("given value exceeds type's boundaries"),
}),
},
broker::UpdateError::PermissionDenied => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 403,
reason: String::from("forbidden"),
message: format!("Access was denied for {path}"),
}),
},
broker::UpdateError::PermissionExpired => DataEntryError {
path: path.clone(),
error: Some(proto::Error {
code: 401,
reason: String::from("unauthorized"),
message: String::from("Unauthorized"),
}),
},

Check warning on line 641 in databroker/src/grpc/kuksa_val_v1/val.rs

View check run for this annotation

Codecov / codecov/patch

databroker/src/grpc/kuksa_val_v1/val.rs#L634-L641

Added lines #L634 - L641 were not covered by tests
}
}

fn convert_to_proto_stream(
input: impl Stream<Item = broker::EntryUpdates>,
) -> impl Stream<Item = Result<proto::SubscribeResponse, tonic::Status>> {
Expand Down
Loading

0 comments on commit b1ed0db

Please sign in to comment.