From 26269cea9bf76ff436b2ee3ef635ad62abf312e7 Mon Sep 17 00:00:00 2001 From: lukasmittag Date: Fri, 9 Feb 2024 11:49:58 +0100 Subject: [PATCH] Add source timestamp --- kuksa_databroker/databroker/src/broker.rs | 24 ++++++++++++++++++ .../src/grpc/kuksa_val_v1/conversions.rs | 9 ++++--- .../src/grpc/sdv_databroker_v1/conversions.rs | 25 ++++++++++++++----- kuksa_databroker/databroker/src/main.rs | 2 ++ 4 files changed, 50 insertions(+), 10 deletions(-) diff --git a/kuksa_databroker/databroker/src/broker.rs b/kuksa_databroker/databroker/src/broker.rs index cb3071c3..45f14fd7 100644 --- a/kuksa_databroker/databroker/src/broker.rs +++ b/kuksa_databroker/databroker/src/broker.rs @@ -72,6 +72,7 @@ pub struct Metadata { #[derive(Debug, Clone, PartialEq)] pub struct Datapoint { pub ts: SystemTime, + pub source_ts: Option, pub value: DataValue, } @@ -1159,6 +1160,7 @@ impl<'a, 'b> DatabaseWriteAccess<'a, 'b> { Some(datapoint) => datapoint, None => Datapoint { ts: SystemTime::now(), + source_ts: None, value: DataValue::NotAvailable, }, }, @@ -1747,6 +1749,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts: time1, + source_ts: None, value: DataValue::Bool(true), }), actuator_target: None, @@ -1778,6 +1781,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts: time1, + source_ts: None, value: DataValue::Int32(100), }), actuator_target: None, @@ -1800,6 +1804,7 @@ mod tests { datapoint: None, actuator_target: Some(Some(Datapoint { ts: time2, + source_ts: None, value: DataValue::Bool(true), })), entry_type: None, @@ -1870,6 +1875,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts: SystemTime::now(), + source_ts: None, value: DataValue::Int32(1), }), actuator_target: None, @@ -1936,6 +1942,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts: time1, + source_ts: None, value: DataValue::Int32(1), }), actuator_target: None, @@ -2006,6 +2013,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts: SystemTime::now(), + source_ts: None, value: DataValue::Int32(101), }), actuator_target: None, @@ -2107,6 +2115,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts: SystemTime::now(), + source_ts: None, value: DataValue::Int32(i), }), actuator_target: None, @@ -2189,6 +2198,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts: SystemTime::now(), + source_ts: None, value: DataValue::Int32(200), }), actuator_target: None, @@ -2247,6 +2257,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts: SystemTime::now(), + source_ts: None, value: DataValue::Int32(102), }), actuator_target: None, @@ -2344,6 +2355,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts: SystemTime::now(), + source_ts: None, value: DataValue::Int32(-i), }), actuator_target: None, @@ -2360,6 +2372,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts: SystemTime::now(), + source_ts: None, value: DataValue::Int32(i), }), actuator_target: None, @@ -2422,6 +2435,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts, + source_ts: None, value: DataValue::BoolArray(vec![true, true, false, true]), }), actuator_target: None, @@ -2486,6 +2500,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts, + source_ts: None, value: DataValue::StringArray(vec![ String::from("yes"), String::from("no"), @@ -2565,6 +2580,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts, + source_ts: None, value: DataValue::StringArray(vec![ String::from("yes"), String::from("no"), @@ -2620,6 +2636,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts, + source_ts: None, value: DataValue::StringArray(vec![ String::from("yes"), String::from("no"), @@ -2674,6 +2691,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts, + source_ts: None, value: DataValue::StringArray(vec![ String::from("yes"), String::from("no"), @@ -2725,6 +2743,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts, + source_ts: None, value: DataValue::Int32Array(vec![10, 20, 30, 40]), }), actuator_target: None, @@ -2753,6 +2772,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts, + source_ts: None, value: DataValue::Int32Array(vec![100, 200, 300, 400]), }), actuator_target: None, @@ -2809,6 +2829,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts, + source_ts: None, value: DataValue::Uint32Array(vec![10, 20, 30, 40]), }), actuator_target: None, @@ -2837,6 +2858,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts, + source_ts: None, value: DataValue::Uint32Array(vec![100, 200, 300, 400]), }), actuator_target: None, @@ -2896,6 +2918,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts, + source_ts: None, value: DataValue::FloatArray(vec![10.0, 20.0, 30.0, 40.0]), }), actuator_target: None, @@ -2982,6 +3005,7 @@ mod tests { path: None, datapoint: Some(Datapoint { ts: SystemTime::now(), + source_ts: None, value: DataValue::Int32(101), }), actuator_target: None, diff --git a/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/conversions.rs b/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/conversions.rs index 699771d2..64cb8075 100644 --- a/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/conversions.rs +++ b/kuksa_databroker/databroker/src/grpc/kuksa_val_v1/conversions.rs @@ -302,12 +302,13 @@ impl TryFrom<&proto::Field> for broker::Field { impl From for broker::Datapoint { fn from(from: proto::Datapoint) -> Self { Self { - ts: match from.timestamp { + ts: SystemTime::now(), + source_ts: match from.timestamp { Some(ts) => match std::convert::TryInto::try_into(ts) { - Ok(ts) => ts, - Err(_) => SystemTime::now(), + Ok(ts) => Some(ts), + Err(_) => Some(SystemTime::now()), }, - None => SystemTime::now(), + None => None, }, value: broker::DataValue::from(from.value), } diff --git a/kuksa_databroker/databroker/src/grpc/sdv_databroker_v1/conversions.rs b/kuksa_databroker/databroker/src/grpc/sdv_databroker_v1/conversions.rs index c85888ae..b7c56972 100644 --- a/kuksa_databroker/databroker/src/grpc/sdv_databroker_v1/conversions.rs +++ b/kuksa_databroker/databroker/src/grpc/sdv_databroker_v1/conversions.rs @@ -22,13 +22,26 @@ use crate::broker; impl From<&proto::Datapoint> for broker::Datapoint { fn from(datapoint: &proto::Datapoint) -> Self { let value = broker::DataValue::from(datapoint); + let ts = SystemTime::now(); - let ts = match &datapoint.timestamp { - Some(ts) => ts.clone().try_into().unwrap_or_else(|_| SystemTime::now()), - None => SystemTime::now(), - }; - - broker::Datapoint { ts, value } + match &datapoint.timestamp { + Some(source_timestamp) => { + let source_ts = source_timestamp + .clone() + .try_into() + .unwrap_or_else(|_| SystemTime::now()); + broker::Datapoint { + ts, + source_ts: Some(source_ts), + value, + } + } + None => broker::Datapoint { + ts, + source_ts: None, + value, + }, + } } } diff --git a/kuksa_databroker/databroker/src/main.rs b/kuksa_databroker/databroker/src/main.rs index 5bf8c9f5..4332dd8d 100644 --- a/kuksa_databroker/databroker/src/main.rs +++ b/kuksa_databroker/databroker/src/main.rs @@ -73,6 +73,7 @@ async fn add_kuksa_attribute( broker::EntryUpdate { datapoint: Some(broker::Datapoint { ts: std::time::SystemTime::now(), + source_ts: None, value: broker::types::DataValue::String(value), }), path: None, @@ -134,6 +135,7 @@ async fn read_metadata_file<'a, 'b>( broker::EntryUpdate { datapoint: Some(broker::Datapoint { ts: std::time::SystemTime::now(), + source_ts: None, value: default, }), path: None,