Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved throughput: reduce unnecessary overload on gRPC bidirectional stream by avoiding empty responses #100

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 8 additions & 16 deletions databroker/src/grpc/kuksa_val_v2/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,9 +646,10 @@ impl proto::val_server::Val for broker::DataBroker {
},
Some(PublishValuesRequest(publish_values_request)) => {
let response = publish_values(&broker, &publish_values_request).await;
if let Err(err) = response_stream_sender.send(Ok(response)).await
{
debug!("Failed to send response: {}", err);
if let Some(value) = response {
if let Err(err) = response_stream_sender.send(Ok(value)).await {
debug!("Failed to send error response: {}", err);
}
}
},
Some(BatchActuateStreamResponse(_batch_actuate_stream_response)) => {
Expand Down Expand Up @@ -768,7 +769,7 @@ async fn provide_actuation(
async fn publish_values(
broker: &AuthorizedAccess<'_, '_>,
request: &databroker_proto::kuksa::val::v2::PublishValuesRequest,
) -> OpenProviderStreamResponse {
) -> Option<OpenProviderStreamResponse> {
let ids: Vec<(i32, broker::EntryUpdate)> = request
.datapoints
.iter()
Expand All @@ -793,17 +794,8 @@ async fn publish_values(

// TODO check if provider is allowed to update the entries for the provided signals?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noticed the TODO - Is it so that we currently do not check token, even if broker is configured to require token?
If so, something we better should fix before release - not necessarily part of this PR?

Copy link
Contributor

@erikbosch erikbosch Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we have let broker = broker.authorized_access(&permissions); earlier, so it is "broker" that does the check on the line below. Could you confirm @argerus

If so just remove the TODO above.

match broker.update_entries(ids).await {
Ok(_) => OpenProviderStreamResponse {
action: Some(
open_provider_stream_response::Action::PublishValuesResponse(
PublishValuesResponse {
request_id: request.request_id,
status: HashMap::new(),
},
),
),
},
Err(err) => OpenProviderStreamResponse {
Ok(_) => None,
Err(err) => Some(OpenProviderStreamResponse {
action: Some(
open_provider_stream_response::Action::PublishValuesResponse(
PublishValuesResponse {
Expand All @@ -815,7 +807,7 @@ async fn publish_values(
},
),
),
},
}),
}
}

Expand Down
2 changes: 1 addition & 1 deletion proto/kuksa/val/v2/val.proto
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ service VAL {
// UNAUTHENTICATED if no credentials provided or credentials has expired
// ALREADY_EXISTS if a provider already claimed the ownership of an actuator
//
// - Provider sends PublishValuesRequest -> Databroker returns PublishValuesResponse
// - Provider sends PublishValuesRequest -> Databroker returns PublishValuesResponse upon error, and nothing upon success
// GRPC errors are returned as messages in the stream
// response with the signal id `map<int32, Error> status = 2;` (permissive case)
// NOT_FOUND if a signal is non-existant.
Expand Down