From f1b9b71717db3715ad62393ea0dd6304b8a6aa23 Mon Sep 17 00:00:00 2001 From: Rafael RL Date: Tue, 15 Oct 2024 17:03:14 +0200 Subject: [PATCH] SubscribeById implementation --- databroker/src/broker.rs | 3 + databroker/src/grpc/kuksa_val_v2/val.rs | 304 +++++++++++++++++++++--- 2 files changed, 268 insertions(+), 39 deletions(-) diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index b50dcaf7..3b8d0fed 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -132,6 +132,7 @@ pub struct QueryField { #[derive(Debug)] pub struct ChangeNotification { + pub id: i32, pub update: EntryUpdate, pub fields: HashSet, } @@ -774,6 +775,7 @@ impl ChangeSubscription { // fill unit field always update.unit.clone_from(&entry.metadata.unit); notifications.updates.push(ChangeNotification { + id: *id, update, fields: notify_fields, }); @@ -823,6 +825,7 @@ impl ChangeSubscription { notify_fields.insert(Field::ActuatorTarget); } notifications.updates.push(ChangeNotification { + id: *id, update, fields: notify_fields, }); diff --git a/databroker/src/grpc/kuksa_val_v2/val.rs b/databroker/src/grpc/kuksa_val_v2/val.rs index c3787822..c4d730d3 100644 --- a/databroker/src/grpc/kuksa_val_v2/val.rs +++ b/databroker/src/grpc/kuksa_val_v2/val.rs @@ -256,9 +256,59 @@ impl proto::val_server::Val for broker::DataBroker { async fn subscribe_by_id( &self, - _request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("Unimplemented")) + debug!(?request); + let permissions = match request.extensions().get::() { + Some(permissions) => { + debug!(?permissions); + permissions.clone() + } + None => return Err(tonic::Status::unauthenticated("Unauthenticated")), + }; + + let broker = self.authorized_access(&permissions); + + let request = request.into_inner(); + + let signal_ids = request.signal_ids; + let size = signal_ids.len(); + + let mut valid_requests: HashMap> = HashMap::with_capacity(size); + + for id in signal_ids { + valid_requests.insert( + match get_signal( + Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(id)), + }), + &broker, + ) + .await + { + Ok(signal_id) => signal_id, + Err(err) => return Err(err), + }, + vec![broker::Field::Datapoint].into_iter().collect(), + ); + } + + match broker.subscribe(valid_requests).await { + Ok(stream) => { + let stream = convert_to_proto_stream_id(stream, size); + Ok(tonic::Response::new(Box::pin(stream))) + } + Err(SubscriptionError::NotFound) => { + Err(tonic::Status::new(tonic::Code::NotFound, "Path not found")) + } + Err(SubscriptionError::InvalidInput) => Err(tonic::Status::new( + tonic::Code::InvalidArgument, + "Invalid Argument", + )), + Err(SubscriptionError::InternalError) => { + Err(tonic::Status::new(tonic::Code::Internal, "Internal Error")) + } + } } // Actuate a single actuator @@ -1026,6 +1076,26 @@ fn convert_to_proto_stream( }) } +fn convert_to_proto_stream_id( + input: impl Stream, + size: usize, +) -> impl Stream> { + input.map(move |item| { + let mut entries: HashMap = HashMap::with_capacity(size); + for update in item.updates { + let update_datapoint: Option = match update.update.datapoint { + Some(datapoint) => datapoint.into(), + None => None, + }; + if let Some(dp) = update_datapoint { + entries.insert(update.id, dp); + } + } + let response = proto::SubscribeByIdResponse { entries }; + Ok(response) + }) +} + #[cfg(test)] mod tests { use super::*; @@ -1823,6 +1893,51 @@ mod tests { } } + async fn publish_value( + broker: &DataBroker, + entry_id: i32, + input_value: Option, + input_timestamp: Option, + ) { + let timestamp = input_timestamp.map(|input_timestamp| input_timestamp.into()); + + let mut request = tonic::Request::new(proto::PublishValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id)), + }), + data_point: Some(proto::Datapoint { + timestamp, + + value: match input_value { + Some(true) => Some(proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + Some(false) => Some(proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(false)), + }), + None => None, + }, + }), + }); + + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + match broker.publish_value(request).await { + Ok(response) => { + // Handle the successful response + let publish_response = response.into_inner(); + + // Check if there is an error in the response + assert_eq!(publish_response, proto::PublishValueResponse {}); + } + Err(status) => { + // Handle the error from the publish_value function + panic!("Publish failed with status: {:?}", status); + } + } + } + /* Test subscribe service method */ @@ -1883,21 +1998,115 @@ mod tests { } } - async fn publish_value( - broker: &DataBroker, - entry_id: i32, - input_value: Option, - input_timestamp: Option, - ) { - let timestamp = input_timestamp.map(|input_timestamp| input_timestamp.into()); + let f = false; + let broker = DataBroker::default(); - let mut request = tonic::Request::new(proto::PublishValueRequest { - signal_id: Some(proto::SignalId { - signal: Some(proto::signal_id::Signal::Id(entry_id)), - }), - data_point: Some(proto::Datapoint { - timestamp, + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + let entry_id = authorized_access + .add_entry( + "test.datapoint1".to_string(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Sensor, + "Some Description that Does Not Matter".to_owned(), + None, + None, + ) + .await + .unwrap(); + + if has_value { + publish_value(&broker, entry_id, Some(false), None).await + } + + let mut request = tonic::Request::new(proto::SubscribeRequest { + signal_paths: vec!["test.datapoint1".to_string()], + }); + + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + let result = tokio::task::block_in_place(|| { + // Blocking operation here + // Since broker.subscribe is async, you need to run it in an executor + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(broker.subscribe(request)) + }); + + // Publish "true" as value + publish_value(&broker, entry_id, Some(true), None).await; + + // Publish "false" as value + publish_value(&broker, entry_id, Some(false), None).await; + + // Publish "false" again but with new timestamp - as it is not an update we shall not get anything + + let timestamp = std::time::SystemTime::now(); + publish_value(&broker, entry_id, Some(false), timestamp.into()).await; + + // Publish None as value, equals reset + publish_value(&broker, entry_id, None, None).await; + + // Publish "true" as value + + publish_value(&broker, entry_id, Some(true), None).await; + + if let Ok(stream) = result { + // Process the stream by iterating over the items + let mut stream = stream.into_inner(); + + let mut item_count = 0; + while let Some(item) = stream.next().await { + match item_count { + 0 => { + check_stream_next(&item, if has_value { Some(false) } else { None }).await; + } + 1 => { + check_stream_next(&item, Some(true)).await; + } + 2 => { + // As long as value stays as false we do not get anything new, so prepare for None + check_stream_next(&item, Some(false)).await; + } + 3 => { + check_stream_next(&item, None).await; + } + 4 => { + check_stream_next(&item, Some(true)).await; + // And we do not expect more + break; + } + _ => assert!( + f, + "You shouldn't land here too many items reported back to the stream." + ), + } + item_count += 1; + } + // Make sure stream is not closed in advance + assert_eq!(item_count, 4); + } else { + assert!(f, "Something went wrong while getting the stream.") + } + } + + /* + Test subscribe service method by id + */ + async fn test_subscribe_case_by_id(has_value: bool) { + async fn check_stream_next_by_id( + item: &Result, + input_value: Option, + signal_id: i32, + ) { + // Create Datapoint + let mut expected_response: HashMap = HashMap::new(); + // We expect to get an empty response first + expected_response.insert( + signal_id, + proto::Datapoint { + timestamp: None, value: match input_value { Some(true) => Some(proto::Value { typed_value: Some(proto::value::TypedValue::Bool(true)), @@ -1907,27 +2116,37 @@ mod tests { }), None => None, }, - }), - }); - - request - .extensions_mut() - .insert(permissions::ALLOW_ALL.clone()); - match broker.publish_value(request).await { - Ok(response) => { - // Handle the successful response - let publish_response = response.into_inner(); + }, + ); - // Check if there is an error in the response - assert_eq!(publish_response, proto::PublishValueResponse {}); + let f = false; + match item { + Ok(subscribe_response) => { + // Process the SubscribeResponse + let response = &subscribe_response.entries; + assert_eq!(response.len(), expected_response.len()); + for key in response.keys() { + match (response.get(key), expected_response.get(key)) { + (Some(entry1), Some(entry2)) => { + assert_eq!(entry1.value, entry2.value); + } + (Some(entry1), None) => { + assert!(f, "Key '{}' is only in response: {:?}", key, entry1) + } + (None, Some(entry2)) => assert!( + f, + "Key '{}' is only in expected_response: {:?}", + key, entry2 + ), + (None, None) => unreachable!(), + } + } } - Err(status) => { - // Handle the error from the publish_value function - panic!("Publish failed with status: {:?}", status); + Err(err) => { + assert!(f, "Error {:?}", err) } } } - let f = false; let broker = DataBroker::default(); @@ -1949,8 +2168,8 @@ mod tests { publish_value(&broker, entry_id, Some(false), None).await } - let mut request = tonic::Request::new(proto::SubscribeRequest { - signal_paths: vec!["test.datapoint1".to_string()], + let mut request = tonic::Request::new(proto::SubscribeByIdRequest { + signal_ids: vec![entry_id], }); request @@ -1961,7 +2180,7 @@ mod tests { // Blocking operation here // Since broker.subscribe is async, you need to run it in an executor let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(broker.subscribe(request)) + rt.block_on(broker.subscribe_by_id(request)) }); // Publish "true" as value @@ -1990,20 +2209,25 @@ mod tests { while let Some(item) = stream.next().await { match item_count { 0 => { - check_stream_next(&item, if has_value { Some(false) } else { None }).await; + check_stream_next_by_id( + &item, + if has_value { Some(false) } else { None }, + entry_id, + ) + .await; } 1 => { - check_stream_next(&item, Some(true)).await; + check_stream_next_by_id(&item, Some(true), entry_id).await; } 2 => { // As long as value stays as false we do not get anything new, so prepare for None - check_stream_next(&item, Some(false)).await; + check_stream_next_by_id(&item, Some(false), entry_id).await; } 3 => { - check_stream_next(&item, None).await; + check_stream_next_by_id(&item, None, entry_id).await; } 4 => { - check_stream_next(&item, Some(true)).await; + check_stream_next_by_id(&item, Some(true), entry_id).await; // And we do not expect more break; } @@ -2025,6 +2249,8 @@ mod tests { async fn test_subscribe() { test_subscribe_case(false).await; test_subscribe_case(true).await; + test_subscribe_case_by_id(false).await; + test_subscribe_case_by_id(true).await; } /*