Skip to content
This repository has been archived by the owner on Dec 18, 2024. It is now read-only.

Commit

Permalink
Add source timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasmittag committed Mar 4, 2024
1 parent 4077045 commit 26269ce
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 10 deletions.
24 changes: 24 additions & 0 deletions kuksa_databroker/databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub struct Metadata {
#[derive(Debug, Clone, PartialEq)]
pub struct Datapoint {
pub ts: SystemTime,
pub source_ts: Option<SystemTime>,
pub value: DataValue,
}

Expand Down Expand Up @@ -1159,6 +1160,7 @@ impl<'a, 'b> DatabaseWriteAccess<'a, 'b> {
Some(datapoint) => datapoint,
None => Datapoint {
ts: SystemTime::now(),
source_ts: None,
value: DataValue::NotAvailable,
},
},
Expand Down Expand Up @@ -1747,6 +1749,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts: time1,
source_ts: None,
value: DataValue::Bool(true),
}),
actuator_target: None,
Expand Down Expand Up @@ -1778,6 +1781,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts: time1,
source_ts: None,
value: DataValue::Int32(100),
}),
actuator_target: None,
Expand All @@ -1800,6 +1804,7 @@ mod tests {
datapoint: None,
actuator_target: Some(Some(Datapoint {
ts: time2,
source_ts: None,
value: DataValue::Bool(true),
})),
entry_type: None,
Expand Down Expand Up @@ -1870,6 +1875,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts: SystemTime::now(),
source_ts: None,
value: DataValue::Int32(1),
}),
actuator_target: None,
Expand Down Expand Up @@ -1936,6 +1942,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts: time1,
source_ts: None,
value: DataValue::Int32(1),
}),
actuator_target: None,
Expand Down Expand Up @@ -2006,6 +2013,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts: SystemTime::now(),
source_ts: None,
value: DataValue::Int32(101),
}),
actuator_target: None,
Expand Down Expand Up @@ -2107,6 +2115,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts: SystemTime::now(),
source_ts: None,
value: DataValue::Int32(i),
}),
actuator_target: None,
Expand Down Expand Up @@ -2189,6 +2198,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts: SystemTime::now(),
source_ts: None,
value: DataValue::Int32(200),
}),
actuator_target: None,
Expand Down Expand Up @@ -2247,6 +2257,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts: SystemTime::now(),
source_ts: None,
value: DataValue::Int32(102),
}),
actuator_target: None,
Expand Down Expand Up @@ -2344,6 +2355,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts: SystemTime::now(),
source_ts: None,
value: DataValue::Int32(-i),
}),
actuator_target: None,
Expand All @@ -2360,6 +2372,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts: SystemTime::now(),
source_ts: None,
value: DataValue::Int32(i),
}),
actuator_target: None,
Expand Down Expand Up @@ -2422,6 +2435,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts,
source_ts: None,
value: DataValue::BoolArray(vec![true, true, false, true]),
}),
actuator_target: None,
Expand Down Expand Up @@ -2486,6 +2500,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts,
source_ts: None,
value: DataValue::StringArray(vec![
String::from("yes"),
String::from("no"),
Expand Down Expand Up @@ -2565,6 +2580,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts,
source_ts: None,
value: DataValue::StringArray(vec![
String::from("yes"),
String::from("no"),
Expand Down Expand Up @@ -2620,6 +2636,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts,
source_ts: None,
value: DataValue::StringArray(vec![
String::from("yes"),
String::from("no"),
Expand Down Expand Up @@ -2674,6 +2691,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts,
source_ts: None,
value: DataValue::StringArray(vec![
String::from("yes"),
String::from("no"),
Expand Down Expand Up @@ -2725,6 +2743,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts,
source_ts: None,
value: DataValue::Int32Array(vec![10, 20, 30, 40]),
}),
actuator_target: None,
Expand Down Expand Up @@ -2753,6 +2772,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts,
source_ts: None,
value: DataValue::Int32Array(vec![100, 200, 300, 400]),
}),
actuator_target: None,
Expand Down Expand Up @@ -2809,6 +2829,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts,
source_ts: None,
value: DataValue::Uint32Array(vec![10, 20, 30, 40]),
}),
actuator_target: None,
Expand Down Expand Up @@ -2837,6 +2858,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts,
source_ts: None,
value: DataValue::Uint32Array(vec![100, 200, 300, 400]),
}),
actuator_target: None,
Expand Down Expand Up @@ -2896,6 +2918,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts,
source_ts: None,
value: DataValue::FloatArray(vec![10.0, 20.0, 30.0, 40.0]),
}),
actuator_target: None,
Expand Down Expand Up @@ -2982,6 +3005,7 @@ mod tests {
path: None,
datapoint: Some(Datapoint {
ts: SystemTime::now(),
source_ts: None,
value: DataValue::Int32(101),
}),
actuator_target: None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,13 @@ impl TryFrom<&proto::Field> for broker::Field {
impl From<proto::Datapoint> for broker::Datapoint {
fn from(from: proto::Datapoint) -> Self {
Self {
ts: match from.timestamp {
ts: SystemTime::now(),
source_ts: match from.timestamp {
Some(ts) => match std::convert::TryInto::try_into(ts) {
Ok(ts) => ts,
Err(_) => SystemTime::now(),
Ok(ts) => Some(ts),
Err(_) => Some(SystemTime::now()),
},
None => SystemTime::now(),
None => None,
},
value: broker::DataValue::from(from.value),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,26 @@ use crate::broker;
impl From<&proto::Datapoint> for broker::Datapoint {
fn from(datapoint: &proto::Datapoint) -> Self {
let value = broker::DataValue::from(datapoint);
let ts = SystemTime::now();

let ts = match &datapoint.timestamp {
Some(ts) => ts.clone().try_into().unwrap_or_else(|_| SystemTime::now()),
None => SystemTime::now(),
};

broker::Datapoint { ts, value }
match &datapoint.timestamp {
Some(source_timestamp) => {
let source_ts = source_timestamp
.clone()
.try_into()
.unwrap_or_else(|_| SystemTime::now());
broker::Datapoint {
ts,
source_ts: Some(source_ts),
value,
}
}
None => broker::Datapoint {
ts,
source_ts: None,
value,
},
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions kuksa_databroker/databroker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async fn add_kuksa_attribute(
broker::EntryUpdate {
datapoint: Some(broker::Datapoint {
ts: std::time::SystemTime::now(),
source_ts: None,
value: broker::types::DataValue::String(value),
}),
path: None,
Expand Down Expand Up @@ -134,6 +135,7 @@ async fn read_metadata_file<'a, 'b>(
broker::EntryUpdate {
datapoint: Some(broker::Datapoint {
ts: std::time::SystemTime::now(),
source_ts: None,
value: default,
}),
path: None,
Expand Down

0 comments on commit 26269ce

Please sign in to comment.