Skip to content

Commit

Permalink
refactor: sync last_updated timestamp in any case
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Aug 17, 2022
1 parent c34286c commit 823e797
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 10 deletions.
5 changes: 3 additions & 2 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* [ ] Make `KafkaSource` generic, align with `Notifier` trait
* [ ] Think about handling "by application" limitation.
* [ ] RBAC
* [ ] Allow a way to modify the thing, overriding a non-empty outbox
* [x] Allow a way to modify the thing, overriding a non-empty outbox
* [ ] Allow more fine-grained control over this
* [ ] Implement WASM
* [ ] Ensure that reported state "last updated" changes when only the value changes (move logic to machine)
* [x] Ensure that reported state "last updated" changes when only the value changes (move logic to machine)
34 changes: 27 additions & 7 deletions core/src/machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,11 @@ impl Reconciler {
}

pub async fn run(mut self) -> Result<Outcome, Error> {
// clear reconcile waker
self.new_thing.clear_wakeup(WakerReason::Reconcile);
// cleanup first
self.cleanup();

// clear old logs first, otherwise logging of state will continuously grow
// FIXME: remove when we only send a view of the state to the reconcile code
for (_, v) in &mut self.new_thing.reconciliation.changed {
v.last_log.clear();
}
// detect reported state changes
self.sync_reported_state();

// synthetics
self.generate_synthetics().await?;
Expand All @@ -215,6 +212,29 @@ impl Reconciler {
})
}

fn cleanup(&mut self) {
// clear reconcile waker
self.new_thing.clear_wakeup(WakerReason::Reconcile);

// clear old logs first, otherwise logging of state will continuously grow
// FIXME: remove when we only send a view of the state to the reconcile code
for (_, v) in &mut self.new_thing.reconciliation.changed {
v.last_log.clear();
}
}

fn sync_reported_state(&mut self) {
// we ensure that all reported values which changed from the previous value get an updated
// last_update timestamp
for (k, next) in &mut self.new_thing.reported_state {
if let Some(previous) = self.current_thing.reported_state.get(k) {
if previous.value != next.value {
next.last_update = Utc::now();
}
}
}
}

async fn generate_synthetics(&mut self) -> Result<(), Error> {
let now = Utc::now();

Expand Down
3 changes: 2 additions & 1 deletion core/src/service/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ impl InfallibleUpdater for ReportedStateUpdater {
let e = e.get_mut();
if e.value != value {
e.value = value;
e.last_update = Utc::now();
// don't need to update the last_updated, as the system will ensure
// that for us later on
}
}
Entry::Vacant(e) => {
Expand Down
10 changes: 10 additions & 0 deletions examples/10_basic.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,21 @@ http PUT localhost:8080/api/v1alpha1/things/default/things/foo/reportedStates te

== Patch a thing

With a JSON patch:

[source,shell]
----
http PATCH localhost:8080/api/v1alpha1/things/default/things/foo content-type:application/json-patch+json '[0][op]=replace' '[0][path]=/reportedState/temperature/value' '[0][value]=43'
----

Or with a strategic merge:

[source,shell]
----
http PATCH localhost:8080/api/v1alpha1/things/default/things/foo content-type:application/merge-patch+json 'reportedState[temperature][value]:=43'
----


=== Remove an annotation

[source,shell]
Expand Down

0 comments on commit 823e797

Please sign in to comment.