diff --git a/Cargo.lock b/Cargo.lock index 1e9b464..f497339 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -808,11 +808,15 @@ dependencies = [ "actix-web", "actix-web-actors", "anyhow", + "chrono", "drogue-doppelgaenger-core", "futures", + "humantime", "log", "serde", "serde_json", + "thiserror", + "time 0.1.44", "tokio", "tokio-stream", ] @@ -880,6 +884,7 @@ dependencies = [ "serde_json", "serde_yaml", "thiserror", + "time 0.1.44", "tokio", "tokio-postgres", "tokio-stream", diff --git a/TODO.md b/TODO.md index daef0a0..44be832 100644 --- a/TODO.md +++ b/TODO.md @@ -7,3 +7,4 @@ * [ ] RBAC * [ ] Allow a way to modify the thing, overriding a non-empty outbox * [ ] Implement WASM +* [ ] Ensure that reported state "last updated" changes when only the value changes (move logic to machine) diff --git a/backend/Cargo.toml b/backend/Cargo.toml index f44f691..ad6907a 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -11,10 +11,14 @@ actix = "0.13" actix-web = "4" actix-web-actors = "4" anyhow = "1" +chrono = "0.4" futures = "0.3" +humantime = "2" log = "0.4" serde = { version = "1", features = ["derive", "rc"] } serde_json = "1" +time = "0.1" +thiserror = "1" tokio = { version = "1", features = ["full"] } tokio-stream = { version = "0.1", features = ["sync"] } diff --git a/backend/src/endpoints.rs b/backend/src/endpoints.rs index 6ca042b..492b061 100644 --- a/backend/src/endpoints.rs +++ b/backend/src/endpoints.rs @@ -1,14 +1,20 @@ -use crate::{notifier::actix::WebSocketHandler, Instance}; +use crate::{ + notifier::actix::WebSocketHandler, + utils::{to_datetime, to_duration}, + Instance, +}; use actix_web::{web, HttpRequest, HttpResponse}; use actix_web_actors::ws; +use chrono::Utc; use drogue_doppelgaenger_core::{ listener::KafkaSource, model::{Reconciliation, SyntheticType, Thing}, notifier::Notifier, processor::sink::Sink, service::{ - Id, JsonMergeUpdater, JsonPatchUpdater, Patch, ReportedStateUpdater, Service, - SyntheticStateUpdater, UpdateMode, UpdateOptions, + DesiredStateUpdate, DesiredStateUpdater, DesiredStateValueUpdater, Id, JsonMergeUpdater, + JsonPatchUpdater, Patch, ReportedStateUpdater, Service, SyntheticStateUpdater, UpdateMode, + UpdateOptions, }, storage::Storage, }; @@ -117,6 +123,62 @@ pub async fn things_update_synthetic_state( Ok(HttpResponse::NoContent().json(json!({}))) } +pub async fn things_update_desired_state( + service: web::Data>, + path: web::Path<(String, String, String)>, + payload: web::Json, +) -> Result { + let (application, thing, state) = path.into_inner(); + let payload = payload.into_inner(); + + service + .update( + &Id::new(application, thing), + DesiredStateUpdater(state, payload), + &OPTS, + ) + .await?; + + Ok(HttpResponse::NoContent().json(json!({}))) +} + +pub async fn things_update_desired_state_value( + request: HttpRequest, + service: web::Data>, + path: web::Path<(String, String, String)>, + payload: web::Json, +) -> Result { + let (application, thing, name) = path.into_inner(); + let value = payload.into_inner(); + + let valid_until = request + .headers() + .get("valid-until") + .map(to_datetime) + .transpose()?; + let valid_for = request + .headers() + .get("valid-for") + .map(to_duration) + .transpose()?; + + let valid_until = valid_until.or_else(|| valid_for.map(|d| Utc::now() + d)); + + service + .update( + &Id::new(application, thing), + DesiredStateValueUpdater { + name, + value, + valid_until, + }, + &OPTS, + ) + .await?; + + Ok(HttpResponse::NoContent().json(json!({}))) +} + pub async fn things_update_reconciliation( service: web::Data>, path: web::Path, diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 8d3bafe..dba4b37 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -1,5 +1,6 @@ mod endpoints; mod notifier; +mod utils; use actix_web::{guard, web, App, HttpServer}; use anyhow::anyhow; @@ -86,6 +87,22 @@ pub fn configure( Si, >)), ); + ctx.service( + web::resource( + "/api/v1alpha1/things/{application}/things/{thing}/desiredStates/{name}", + ) + .route(web::put().to(endpoints::things_update_desired_state::< + S, + N, + Si, + >)), + ); + ctx.service( + web::resource( + "/api/v1alpha1/things/{application}/things/{thing}/desiredStates/{name}/value", + ) + .route(web::put().to(endpoints::things_update_desired_state_value::)), + ); ctx.service( web::resource("/api/v1alpha1/things/{application}/things/{thing}/reconciliations") .route(web::put().to(endpoints::things_update_reconciliation::)), diff --git a/backend/src/utils.rs b/backend/src/utils.rs new file mode 100644 index 0000000..7fefae3 --- /dev/null +++ b/backend/src/utils.rs @@ -0,0 +1,37 @@ +use actix_web::body::BoxBody; +use actix_web::http::header::{HeaderValue, ToStrError}; +use actix_web::{HttpResponse, ResponseError}; +use chrono::{DateTime, Duration, ParseError, Utc}; +use drogue_doppelgaenger_core::error::ErrorInformation; +use humantime::DurationError; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Header Value: {0}")] + Value(#[from] ToStrError), + #[error("Parse: {0}")] + Parse(#[from] ParseError), + #[error("Out of range: {0}")] + OutOfRange(#[from] time::OutOfRangeError), + #[error("Duration: {0}")] + Duration(#[from] DurationError), +} + +impl ResponseError for Error { + fn error_response(&self) -> HttpResponse { + HttpResponse::BadRequest().json(ErrorInformation { + error: "InvalidFormat".to_string(), + message: Some(self.to_string()), + }) + } +} + +pub fn to_duration(value: &HeaderValue) -> Result { + Ok(Duration::from_std(humantime::parse_duration( + value.to_str()?, + )?)?) +} + +pub fn to_datetime(value: &HeaderValue) -> Result, Error> { + Ok(DateTime::parse_from_rfc3339(value.to_str()?)?.into()) +} diff --git a/core/Cargo.toml b/core/Cargo.toml index 74870c8..51817bc 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -24,24 +24,25 @@ humantime-serde = "1" indexmap = "1.9" json-patch = { version = "0.2", default-features = false } jsonschema = "0.16" +lazy_static = "1" log = "0.4" opentelemetry = { version = "0.17", features = ["rt-tokio"] } postgres-types = "0.2" +prometheus = { version = "0.13" } rdkafka = { version = "0.28", features = ["sasl", "ssl"] } rustls = "0.20" rustls-native-certs = "0.6" schemars = { version = "0.8", features = ["bytes", "chrono", "indexmap"] } -serde = "1" +serde = { version = "1", features = ["rc"] } serde_json = "1" thiserror = "1" +time = "0.1" tokio = "1" tokio-stream = { version = "0.1", features = ["sync"] } tracing = "0.1" tracing-opentelemetry = "0.17" tracing-subscriber = { version = "0.3", features = ["env-filter"] } uuid = { version = "1", features = ["v4"] } -lazy_static = "1" -prometheus = { version = "0.13" } opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"], optional = true } diff --git a/core/src/machine/deno.rs b/core/src/machine/deno.rs index 083d959..3b2af90 100644 --- a/core/src/machine/deno.rs +++ b/core/src/machine/deno.rs @@ -1,313 +1,352 @@ -use super::Outgoing; -use crate::model::Thing; -use anyhow::bail; -use chrono::Duration; use deno_core::{serde_v8, v8, Extension, JsRuntime, RuntimeOptions}; use serde::{Deserialize, Serialize}; -use serde_json::Value; -use std::sync::Arc; -use tokio::runtime::Handle; -use tokio::time::Instant; +use std::convert::Infallible; +use tokio::{runtime::Handle, task::JoinHandle, time::Instant}; #[derive(Clone, Debug)] pub struct DenoOptions { pub deadline: Instant, } -const KEY_CURRENT_STATE: &str = "currentState"; -const KEY_NEW_STATE: &str = "newState"; -const KEY_OUTBOX: &str = "outbox"; -const KEY_LOGS: &str = "logs"; -const KEY_WAKER: &str = "waker"; +pub trait Injectable: Sized + Send { + type Error: std::error::Error + Send + Sync; -pub struct ExecutionResult { - pub output: O, - pub value: T, + fn inject(self, runtime: &mut JsRuntime, name: &str) -> Result<(), Self::Error>; } -/// Execute a deno script -pub async fn execute( - name: &str, - script: &str, - input: I, - opts: DenoOptions, -) -> anyhow::Result> +pub trait Extractable: Sized + Send { + type Error: std::error::Error + Send + Sync; + + fn extract(runtime: &mut JsRuntime, name: &str) -> Result; +} + +pub trait Returnable: Sized + Send { + type Error: std::error::Error + Send + Sync; + + fn r#return( + runtime: &mut JsRuntime, + global: v8::Global, + ) -> Result; +} + +impl Injectable for T where - I: Serialize + Send + 'static, - for<'de> O: Serialize + Deserialize<'de> + Default + Send + 'static, - for<'de> R: Deserialize<'de> + Default + Send + 'static, + T: Serialize + Send, { - let script = script.to_string(); - let name = name.to_string(); - - Ok(Handle::current() - .spawn_blocking(move || { - // disable some operations - let disable = Extension::builder() - .middleware(|op| match op.name { - // the user won't see it, and it spams our logs - "op_print" => op.disable(), - _ => op, - }) - .build(); + type Error = serde_v8::Error; - // FIXME: doesn't work as advertised, we keep it anyway - let create_params = v8::Isolate::create_params().heap_limits(0, 3 * 1024 * 1024); + fn inject(self, runtime: &mut JsRuntime, name: &str) -> Result<(), Self::Error> { + let global = runtime.global_context(); + let scope = &mut runtime.handle_scope(); + let global = global.open(scope).global(scope); - let mut runtime = JsRuntime::new(RuntimeOptions { - create_params: Some(create_params), - extensions: vec![disable], - ..Default::default() - }); + let key = serde_v8::to_v8(scope, name)?; + let value = serde_v8::to_v8(scope, self)?; + global.set(scope, key, value); - let isolate = runtime.v8_isolate().thread_safe_handle(); - runtime.add_near_heap_limit_callback(move |_, current| { - // FIXME: again, this currently doesn't work properly, we keep it anyway - isolate.terminate_execution(); - current * 2 - }); + Ok(()) + } +} - let isolate = runtime.v8_isolate().thread_safe_handle(); - Handle::current().spawn(async move { - tokio::time::sleep_until(opts.deadline).await; - isolate.terminate_execution(); - }); +impl Extractable for () { + type Error = Infallible; - set_context::<_, O>(&mut runtime, input)?; + fn extract(_: &mut JsRuntime, _: &str) -> Result { + Ok(()) + } +} - let global = runtime.execute_script(&name, &script)?; +impl Extractable for Option +where + for<'de> T: Deserialize<'de> + Send, +{ + type Error = serde_v8::Error; - let value = { - let scope = &mut runtime.handle_scope(); - let local = v8::Local::new(scope, global); - serde_v8::from_v8(scope, local)? - }; + fn extract(runtime: &mut JsRuntime, name: &str) -> Result, Self::Error> { + let global = runtime.global_context(); + let mut scope = &mut runtime.handle_scope(); + let global = global.open(scope).global(scope); - Handle::current().block_on(async { runtime.run_event_loop(false).await })?; - // FIXME: eval late result + let key = serde_v8::to_v8(&mut scope, name)?; + Ok(match global.get(scope, key) { + Some(value) => Some(serde_v8::from_v8(scope, value)?), + None => None, + }) + } +} - let output = extract_context(&mut runtime)?; +impl Extractable for Json +where + for<'de> T: Deserialize<'de> + Send + Default, +{ + type Error = serde_v8::Error; - Ok::<_, anyhow::Error>(ExecutionResult { output, value }) - }) - .await??) + fn extract(runtime: &mut JsRuntime, name: &str) -> Result { + Option::::extract(runtime, name).map(|o| Json(o.unwrap_or_default())) + } } -#[deprecated = "Switch over to the execute method, once it is cleaned up a bit"] -/// Run a deno script -/// -/// TODO: consider keeping the runtime for this run. -pub async fn run( +pub struct Json(pub T); + +impl Returnable for T +where + for<'de> T: Deserialize<'de> + Send, +{ + type Error = serde_v8::Error; + + fn r#return( + runtime: &mut JsRuntime, + global: v8::Global, + ) -> Result { + let scope = &mut runtime.handle_scope(); + let local = v8::Local::new(scope, global); + serde_v8::from_v8(scope, local) + } +} + +pub struct Execution { name: String, - script: &str, - current_thing: Arc, - new_thing: &Thing, + code: String, opts: DenoOptions, -) -> anyhow::Result { - let script = script.to_string(); - let new_thing = new_thing.clone(); - - let thing = Handle::current() - .spawn_blocking(move || { - // disable some operations - let disable = Extension::builder() - .middleware(|op| match op.name { - // the user won't see it, and it spams our logs - "op_print" => op.disable(), - _ => op, - }) - .build(); +} - // FIXME: doesn't work as advertised, we keep it anyway - let create_params = v8::Isolate::create_params().heap_limits(0, 3 * 1024 * 1024); +impl Execution { + pub fn new(name: N, code: C, opts: DenoOptions) -> Self + where + N: Into, + C: Into, + { + Self { + name: name.into(), + code: code.into(), + opts, + } + } - let mut runtime = JsRuntime::new(RuntimeOptions { - create_params: Some(create_params), - extensions: vec![disable], - ..Default::default() - }); + fn create_runtime(&self) -> JsRuntime { + // disable some operations + let disable = Extension::builder() + .middleware(|op| match op.name { + // the user won't see it, and it spams our logs + "op_print" => op.disable(), + _ => op, + }) + .build(); + + // FIXME: doesn't work as advertised, we keep it anyway + let create_params = v8::Isolate::create_params().heap_limits(0, 3 * 1024 * 1024); + + let mut runtime = JsRuntime::new(RuntimeOptions { + create_params: Some(create_params), + extensions: vec![disable], + ..Default::default() + }); + + let isolate = runtime.v8_isolate().thread_safe_handle(); + runtime.add_near_heap_limit_callback(move |_, current| { + // FIXME: again, this currently doesn't work properly, we keep it anyway + isolate.terminate_execution(); + current * 2 + }); + + runtime + } - let isolate = runtime.v8_isolate().thread_safe_handle(); - runtime.add_near_heap_limit_callback(move |_, current| { - // FIXME: again, this currently doesn't work properly, we keep it anyway - isolate.terminate_execution(); - current * 2 - }); + pub async fn run(self, input: I) -> anyhow::Result> + where + I: Injectable + 'static, + O: Extractable + 'static, + R: Returnable + 'static, + { + struct Deadline(JoinHandle<()>); + impl Drop for Deadline { + fn drop(&mut self) { + self.0.abort(); + } + } - let isolate = runtime.v8_isolate().thread_safe_handle(); - Handle::current().spawn(async move { - tokio::time::sleep_until(opts.deadline).await; - isolate.terminate_execution(); - }); + Ok(Handle::current() + .spawn_blocking(move || { + let mut runtime = self.create_runtime(); - set_run_context(&mut runtime, ¤t_thing, &new_thing)?; + let name = self.name; + let code = self.code; - // FIXME: take return value - let _ = runtime.execute_script(&name, &script)?; + let isolate = runtime.v8_isolate().thread_safe_handle(); + let deadline = Deadline(Handle::current().spawn(async move { + tokio::time::sleep_until(self.opts.deadline).await; + isolate.terminate_execution(); + })); - Handle::current().block_on(async { runtime.run_event_loop(false).await })?; - // FIXME: eval late result + // set_context::<_, O>(&mut runtime, input)?; + input.inject(&mut runtime, "context")?; - extract_run_context(&mut runtime) - }) - .await??; + let global = runtime.execute_script(&name, &code)?; - Ok(thing) -} + let return_value = R::r#return(&mut runtime, global)?; -fn set_context(runtime: &mut JsRuntime, input: I) -> anyhow::Result<()> -where - I: Serialize, - O: Serialize + Default, -{ - let global = runtime.global_context(); - let scope = &mut runtime.handle_scope(); - let global = global.open(scope).global(scope); + Handle::current().block_on(async { runtime.run_event_loop(false).await })?; + // FIXME: eval late result - { - let key = serde_v8::to_v8(scope, "input")?; - let value = serde_v8::to_v8(scope, input)?; - global.set(scope, key, value); - } + drop(deadline); - { - let key = serde_v8::to_v8(scope, "output")?; - let value = serde_v8::to_v8(scope, O::default())?; - global.set(scope, key, value); - } + //let output = extract_context(&mut runtime)?; + let output = O::extract(&mut runtime, "context")?; - Ok(()) + Ok::<_, anyhow::Error>(ExecutionResult { + output, + return_value, + }) + }) + .await??) + } } -/// Extract the new state from the context -fn extract_context(runtime: &mut JsRuntime) -> anyhow::Result -where - for<'de> O: Deserialize<'de> + Default, -{ - let global = runtime.global_context(); - let mut scope = &mut runtime.handle_scope(); - let global = global.open(scope).global(scope); - - let output = { - let key = serde_v8::to_v8(&mut scope, "output")?; - match global.get(scope, key) { - Some(value) => serde_v8::from_v8(scope, value)?, - None => O::default(), - } - }; - - Ok(output) +pub struct ExecutionResult { + pub output: O, + pub return_value: T, } -/// Set the current and new state for the context -fn set_run_context( - runtime: &mut JsRuntime, - current_state: &Thing, - new_state: &Thing, -) -> anyhow::Result<()> { - let global = runtime.global_context(); - let scope = &mut runtime.handle_scope(); - let global = global.open(scope).global(scope); +pub mod duration { + use chrono::Duration; + use serde::de; + use std::fmt; + pub fn deserialize<'de, D>(d: D) -> Result, D::Error> + where + D: de::Deserializer<'de>, { - let key = serde_v8::to_v8(scope, KEY_CURRENT_STATE)?; - let value = serde_v8::to_v8(scope, current_state)?; - global.set(scope, key, value); + Ok(d.deserialize_option(OptionDurationVisitor)?) } - { - let key = serde_v8::to_v8(scope, KEY_NEW_STATE)?; - let value = serde_v8::to_v8(scope, new_state)?; - global.set(scope, key, value); - } + struct DurationVisitor; - { - let key = serde_v8::to_v8(scope, KEY_OUTBOX)?; - let value = serde_v8::to_v8(scope, Vec::::new())?; - global.set(scope, key, value); - } + impl<'de> de::Visitor<'de> for DurationVisitor { + type Value = Duration; - { - let key = serde_v8::to_v8(scope, KEY_LOGS)?; - let value = serde_v8::to_v8(scope, Vec::::new())?; - global.set(scope, key, value); - } + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a duration in seconds or a humantime duration") + } - Ok(()) -} + fn visit_i64(self, value: i64) -> Result + where + E: de::Error, + { + Ok(Duration::seconds(value)) + } + + fn visit_u64(self, value: u64) -> Result + where + E: de::Error, + { + let value = value.clamp(0, i64::MAX as u64) as i64; + Ok(Duration::seconds(value)) + } + + fn visit_f64(self, value: f64) -> Result + where + E: de::Error, + { + let value = value.clamp(i64::MIN as f64, i64::MAX as f64) as i64; + Ok(Duration::seconds(value)) + } -/// Extract the new state from the context -fn extract_run_context(runtime: &mut JsRuntime) -> anyhow::Result { - let global = runtime.global_context(); + fn visit_str(self, v: &str) -> Result + where + E: de::Error, + { + let duration = humantime::parse_duration(&v).map_err(de::Error::custom)?; + Ok(Duration::from_std(duration).map_err(de::Error::custom)?) + } + } - let mut scope = &mut runtime.handle_scope(); + struct OptionDurationVisitor; - let global = global.open(scope).global(scope); + impl<'de> de::Visitor<'de> for OptionDurationVisitor { + type Value = Option; - let new_thing = { - let key = serde_v8::to_v8(&mut scope, KEY_NEW_STATE)?; - match global.get(scope, key) { - Some(value) => serde_v8::from_v8(scope, value)?, - None => bail!("Script removed new state"), + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a duration in seconds, a humantime duration, or none") } - }; - let outbox = { - let key = serde_v8::to_v8(&mut scope, KEY_OUTBOX)?; - match global.get(scope, key) { - Some(value) => serde_v8::from_v8(scope, value)?, - None => vec![], + /// Deserialize a timestamp in seconds since the epoch + fn visit_none(self) -> Result, E> + where + E: de::Error, + { + Ok(None) } - }; - let logs = { - let key = serde_v8::to_v8(&mut scope, KEY_LOGS)?; - match global.get(scope, key) { - Some(value) => serde_v8::from_v8(scope, value)?, - None => vec![], + /// Deserialize a timestamp in seconds since the epoch + fn visit_some(self, d: D) -> Result, D::Error> + where + D: de::Deserializer<'de>, + { + d.deserialize_any(DurationVisitor).map(Some) } - }; - let waker = { - let key = serde_v8::to_v8(&mut scope, KEY_WAKER)?; - match global.get(scope, key) { - Some(value) => to_duration(serde_v8::from_v8(scope, value)?)?, - None => None, + /// Deserialize a timestamp in seconds since the epoch + fn visit_unit(self) -> Result, E> + where + E: de::Error, + { + Ok(None) } - }; - - Ok(Outgoing { - new_thing, - outbox, - logs, - waker, - }) + } } -/// convert a JavaScript value into a duration -fn to_duration(value: Value) -> anyhow::Result> { - Ok(match value { - Value::String(time) => { - let duration = humantime::parse_duration(&time)?; - Some(Duration::from_std(duration)?) - } - Value::Number(seconds) => { - if let Some(seconds) = seconds.as_i64() { - if seconds > 0 { - return Ok(Some(Duration::seconds(seconds))); - } - } else if let Some(_) = seconds.as_u64() { - // we can be sure it doesn't fit into an i64 - return Ok(Some(Duration::seconds(i64::MAX))); - } else if let Some(seconds) = seconds.as_f64() { - if seconds > i64::MAX as f64 { - return Ok(Some(Duration::seconds(i64::MAX))); - } else if seconds > 0f64 { - return Ok(Some(Duration::seconds(seconds as i64))); - } - } - None - } - _ => None, - }) +#[cfg(test)] +mod test { + use chrono::Duration; + use serde_json::{json, Value}; + + #[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize)] + struct Test { + #[serde(default, with = "super::duration")] + duration: Option, + } + + pub fn assert(expected: Option, from: Value) { + assert_eq!( + Test { duration: expected }, + serde_json::from_value(json!({ "duration": from })).unwrap(), + ); + } + + #[test] + pub fn test_deser_duration_seconds() { + assert(Some(Duration::seconds(1)), json!(1)); + assert(Some(Duration::hours(1)), json!(3600)); + } + + #[test] + pub fn test_deser_duration_negative_seconds() { + assert(Some(Duration::seconds(-1)), json!(-1)); + assert(Some(Duration::hours(-1)), json!(-3600)); + } + + #[test] + pub fn test_deser_duration_fractional() { + assert(Some(Duration::seconds(1)), json!(1.01)); + assert(Some(Duration::hours(1)), json!(3600.01)); + } + + #[test] + pub fn test_deser_duration_humantime() { + assert(Some(Duration::seconds(1)), json!("1s")); + assert(Some(Duration::hours(1)), json!("1h")); + } + + #[test] + pub fn test_deser_duration_null() { + assert(None, Value::Null); + } + + #[test] + pub fn test_deser_duration_none() { + assert_eq!( + Test { duration: None }, + serde_json::from_value(json!({})).unwrap(), + ); + } } diff --git a/core/src/machine/mod.rs b/core/src/machine/mod.rs index b26d2fb..694c1fe 100644 --- a/core/src/machine/mod.rs +++ b/core/src/machine/mod.rs @@ -1,9 +1,10 @@ mod deno; -use crate::machine::deno::DenoOptions; +use crate::machine::deno::{DenoOptions, Json}; use crate::model::{ - Changed, Code, JsonSchema, Metadata, Reconciliation, Schema, SyntheticType, Thing, ThingState, - Timer, WakerExt, WakerReason, + Changed, Code, DesiredFeatureMethod, DesiredFeatureReconciliation, DesiredMode, JsonSchema, + Metadata, Reconciliation, Schema, SyntheticType, Thing, ThingState, Timer, WakerExt, + WakerReason, }; use crate::processor::Message; use anyhow::anyhow; @@ -33,7 +34,7 @@ pub enum Error { Internal(#[source] anyhow::Error), } -/// The state machine runner. Good for a single run of updates. +/// The state machine runner. Good for a single run. pub struct Machine { thing: Thing, } @@ -48,6 +49,7 @@ impl Machine { Self { thing } } + /// Run actions for creating a new thing. pub async fn create(new_thing: Thing) -> Result { // Creating means that we start with an empty thing, and then set the initial state. // This allows to run through the reconciliation initially. @@ -65,6 +67,7 @@ impl Machine { Ok(outcome) } + /// Run an update. pub async fn update(self, f: F) -> Result where F: FnOnce(Thing) -> Fut, @@ -166,14 +169,6 @@ pub struct OutboxMessage { pub message: Message, } -#[derive(Clone, Debug)] -pub struct Outgoing { - pub new_thing: Thing, - pub outbox: Vec, - pub logs: Vec, - pub waker: Option, -} - pub struct Reconciler { deadline: tokio::time::Instant, current_thing: Arc, @@ -207,12 +202,13 @@ impl Reconciler { self.generate_synthetics().await?; // run code - let Reconciliation { changed, timers } = self.new_thing.reconciliation.clone(); - self.reconcile_changed(changed).await?; self.reconcile_timers(timers).await?; + // reconcile desired state + self.reconcile_desired_state().await?; + Ok(Outcome { new_thing: self.new_thing, outbox: self.outbox, @@ -236,6 +232,117 @@ impl Reconciler { Ok(()) } + fn sync_desired_state(&mut self) -> Result<(), Error> { + let mut waker = self.new_thing.waker(); + + for (name, mut desired) in &mut self.new_thing.desired_state { + if !matches!(desired.method, DesiredFeatureMethod::Manual) { + // find the current value + let reported_value = self + .new_thing + .synthetic_state + .get(name) + .map(|state| &state.value) + .or_else(|| { + self.new_thing + .reported_state + .get(name) + .map(|state| &state.value) + }) + .unwrap_or(&Value::Null); + let desired_value = &desired.value; + + // check if there is a change from the previous state + if let Some(previous) = self.current_thing.desired_state.get(name) { + if previous.value != desired.value + || previous.valid_until != desired.valid_until + { + // desired value changed, start reconciling again + desired.reconciliation = + DesiredFeatureReconciliation::Reconciling { last_attempt: None }; + desired.last_update = Utc::now(); + } + } + + match &desired.reconciliation { + DesiredFeatureReconciliation::Succeeded { .. } + if matches!(desired.mode, DesiredMode::Sync) => + { + // if we should keep it in sync, check values and if the value is still valid + if reported_value != desired_value + && desired.valid_until.map(|u| u > Utc::now()).unwrap_or(true) + { + // back to reconciling + desired.reconciliation = + DesiredFeatureReconciliation::Reconciling { last_attempt: None }; + + if let Some(valid_until) = desired.valid_until { + // and set waker + waker.wakeup_at(valid_until, WakerReason::Reconcile); + } + } + } + DesiredFeatureReconciliation::Succeeded { .. } + | DesiredFeatureReconciliation::Failed { .. } => { + // we do nothing + } + DesiredFeatureReconciliation::Reconciling { .. } => { + if reported_value == desired_value { + // value changed to expected value -> success + desired.reconciliation = + DesiredFeatureReconciliation::Succeeded { when: Utc::now() }; + } else if let Some(valid_until) = desired.valid_until { + // value did not change to expected value, and expired -> failure + if valid_until < Utc::now() { + desired.reconciliation = DesiredFeatureReconciliation::Failed { + when: Utc::now(), + reason: None, + }; + } else { + // otherwise, start waker + waker.wakeup_at(valid_until, WakerReason::Reconcile); + } + } + // else -> keep going + } + } + } + } + + self.new_thing.set_waker(waker); + + Ok(()) + } + + async fn reconcile_desired_state(&mut self) -> Result<(), Error> { + // sync first + self.sync_desired_state()?; + + // process next + for (name, mut desired) in &mut self.new_thing.desired_state { + match &desired.reconciliation { + DesiredFeatureReconciliation::Succeeded { .. } + | DesiredFeatureReconciliation::Failed { .. } => { + // we do nothing + } + DesiredFeatureReconciliation::Reconciling { last_attempt } => { + match &desired.method { + DesiredFeatureMethod::Manual | DesiredFeatureMethod::External => { + // we do nothing + } + DesiredFeatureMethod::Command { channel, payload } => { + todo!("Implement"); + } + DesiredFeatureMethod::Code(code) => { + todo!("Implement"); + } + } + } + } + } + Ok(()) + } + async fn reconcile_changed(&mut self, changed: IndexMap) -> Result<(), Error> { for (name, mut changed) in changed { let ExecutionResult { logs } = self @@ -329,36 +436,66 @@ impl Reconciler { async fn run_code(&mut self, name: String, code: &Code) -> Result { match code { Code::JavaScript(script) => { - let outgoing = deno::run( - name, - &script, - self.current_thing.clone(), - &self.new_thing, - DenoOptions { - deadline: self.deadline, - }, - ) - .await - .map_err(Error::Reconcile)?; + #[derive(serde::Serialize)] + #[serde(rename_all = "camelCase")] + struct Input { + current_state: Arc, + new_state: Thing, + // the following items are scooped off by the output, but we need to initialize + // them to present, but empty values for the scripts. + outbox: Vec, + logs: Vec, + } + + #[derive(Clone, Default, Debug, serde::Deserialize)] + #[serde(rename_all = "camelCase")] + pub struct Output { + #[serde(default)] + new_state: Option, + #[serde(default)] + outbox: Vec, + #[serde(default)] + logs: Vec, + #[serde(default, with = "deno::duration")] + waker: Option, + } + + let opts = DenoOptions { + deadline: self.deadline, + }; + let deno = deno::Execution::new(name, script, opts); + let out = deno + .run::<_, Json, ()>(Input { + current_state: self.current_thing.clone(), + new_state: self.new_thing.clone(), + outbox: vec![], + logs: vec![], + }) + .await + .map_err(Error::Reconcile)?; // FIXME: record error (if any) - let Outgoing { - mut new_thing, + let Output { + new_state, waker, outbox, logs, - } = outgoing; + } = out.output.0; + + let mut new_state = + new_state.unwrap_or_else(|| self.current_thing.as_ref().clone()); // schedule the waker, in the new state if let Some(duration) = waker { - new_thing.wakeup(duration, WakerReason::Reconcile); + new_state.wakeup(duration, WakerReason::Reconcile); } // set the new state - self.new_thing = new_thing; + self.new_thing = new_state; // extend outbox self.outbox.extend(outbox); + // done Ok(ExecutionResult { logs }) } } @@ -378,15 +515,14 @@ impl Reconciler { new_state: Thing, } - #[derive(Default, serde::Serialize, serde::Deserialize)] - struct Output {} - - let out: deno::ExecutionResult = - deno::execute(name, script, Input { new_state }, DenoOptions { deadline }) - .await - .map_err(Error::Reconcile)?; + let opts = DenoOptions { deadline }; + let deno = deno::Execution::new(name, script, opts); + let out = deno + .run::<_, (), Value>(Input { new_state }) + .await + .map_err(Error::Reconcile)?; - Ok(out.value) + Ok(out.return_value) } SyntheticType::Alias(alias) => match new_state.reported_state.get(alias) { Some(value) => Ok(value.value.clone()), @@ -461,7 +597,7 @@ mod test { desired_state: Default::default(), synthetic_state: Default::default(), reconciliation: Default::default(), - internal: None + internal: None, }, new_thing ); diff --git a/core/src/model/mod.rs b/core/src/model/mod.rs index 19ccd08..3285ecc 100644 --- a/core/src/model/mod.rs +++ b/core/src/model/mod.rs @@ -55,6 +55,34 @@ impl Thing { pub fn with_id(id: &Id) -> Self { Self::new(&id.application, &id.thing) } + + /// Get a clone of the current waker state + pub fn waker(&self) -> Waker { + self.internal + .as_ref() + .map(|i| i.waker.clone()) + .unwrap_or_default() + } + + /// Set the waker state, creating an internal if necessary + pub fn set_waker(&mut self, waker: Waker) { + if waker.is_empty() { + // only create an internal if the waker is not empty + if let Some(internal) = &mut self.internal { + internal.waker = waker; + } + } else { + match &mut self.internal { + Some(internal) => internal.waker = waker, + None => { + self.internal = Some(Internal { + waker, + ..Default::default() + }) + } + } + } + } } /// The state view on thing model. @@ -270,8 +298,99 @@ impl ReportedFeature { )] #[serde(rename_all = "camelCase")] pub struct DesiredFeature { - pub last_update: DateTime, + /// The value the system desired the device to apply. + #[serde(default)] pub value: Value, + /// The value the system desired the device to apply. + #[serde(default)] + pub mode: DesiredMode, + /// The timestamp the desired value was last updated. + pub last_update: DateTime, + /// An optional indication until when the desired value is valid. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub valid_until: Option>, + + /// The current reconciliation state of the desired value. + #[serde(default)] + pub reconciliation: DesiredFeatureReconciliation, + /// The method of reconciliation. + #[serde(default)] + pub method: DesiredFeatureMethod, +} + +/// The mode of the desired feature. +#[derive( + Clone, Debug, Default, PartialEq, Eq, serde::Deserialize, serde::Serialize, schemars::JsonSchema, +)] +#[serde(rename_all = "camelCase")] +pub enum DesiredMode { + /// Only reconcile once, resulting in success of failure. + Once, + /// Keep desired and reported state in sync. Switches back to from "success" to "reconciling" + /// when the report state deviates from the desired, for as long as the desired value is valid. + #[default] + Sync, +} + +#[derive( + Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize, schemars::JsonSchema, +)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "state")] +pub enum DesiredFeatureReconciliation { + #[serde(rename_all = "camelCase")] + Reconciling { + #[serde(default, skip_serializing_if = "Option::is_none")] + last_attempt: Option>, + }, + #[serde(rename_all = "camelCase")] + Succeeded { when: DateTime }, + #[serde(rename_all = "camelCase")] + Failed { + when: DateTime, + #[serde(default, skip_serializing_if = "Option::is_none")] + reason: Option, + }, +} + +impl Default for DesiredFeatureReconciliation { + fn default() -> Self { + Self::Reconciling { last_attempt: None } + } +} + +#[derive( + Clone, Debug, Default, PartialEq, Eq, serde::Deserialize, serde::Serialize, schemars::JsonSchema, +)] +#[serde(rename_all = "camelCase")] +pub enum DesiredFeatureMethod { + /// Do not process the feature at all. + /// + /// NOTE: This will not trigger any state changes other than setting the state to + /// [`DesiredFeatureState::Reconciling`]. + Manual, + /// An external process needs to trigger the reconciliation. But the system will detect a + /// reported state and change the desired state accordingly. + #[default] + External, + /// Try to reconcile the state by sending out commands. + Command { + channel: String, + #[serde(default)] + payload: DesiredFeatureCommandPayload, + }, + /// Generate reconcile actions through custom code. + Code(Code), +} + +#[derive( + Clone, Debug, Default, PartialEq, Eq, serde::Deserialize, serde::Serialize, schemars::JsonSchema, +)] +#[serde(rename_all = "camelCase")] +pub enum DesiredFeatureCommandPayload { + // Send the desired value as payload. + #[default] + Raw, } #[derive( @@ -315,8 +434,8 @@ pub enum Code { #[derive(Clone, Debug, Default, PartialEq, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct Internal { - #[serde(default, skip_serializing_if = "Option::is_none")] - pub waker: Option, + #[serde(default, skip_serializing_if = "Waker::is_empty")] + pub waker: Waker, #[serde(default, skip_serializing_if = "Vec::is_empty")] pub outbox: Vec, } @@ -327,7 +446,7 @@ impl Internal { } pub fn is_empty(&self) -> bool { - self.waker.is_none() & self.outbox.is_empty() + self.waker.is_empty() & self.outbox.is_empty() } } @@ -364,47 +483,48 @@ impl WakerExt for Thing { impl WakerExt for Internal { fn wakeup_at(&mut self, when: DateTime, reason: WakerReason) { - match &mut self.waker { - None => { - // no waker, just create it - self.waker = Some(Waker { - when, - why: { - let mut set = BTreeSet::new(); - set.insert(reason); - set - }, - }) - } - Some(waker) => { - // we already have a waker - if waker.when > when { - // wakeup earlier - waker.when = when; - } - // add our reason (if missing) - waker.why.extend(Some(reason)); - } - } + self.waker.wakeup_at(when, reason); } fn clear_wakeup(&mut self, reason: WakerReason) { - if let Some(waker) = &mut self.waker { - waker.why.remove(&reason); - if waker.why.is_empty() { - self.waker = None; - } - } + self.waker.clear_wakeup(reason); } } -#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)] #[serde(rename_all = "camelCase")] pub struct Waker { - pub when: DateTime, + pub when: Option>, pub why: BTreeSet, } +impl Waker { + pub fn is_empty(&self) -> bool { + self.when.is_none() + } +} + +impl WakerExt for Waker { + fn wakeup_at(&mut self, when: DateTime, reason: WakerReason) { + self.why.insert(reason); + match self.when { + None => self.when = Some(when), + Some(w) => { + if w > when { + self.when = Some(when); + } + } + } + } + + fn clear_wakeup(&mut self, reason: WakerReason) { + self.why.remove(&reason); + if self.why.is_empty() { + self.when = None; + } + } +} + #[derive( Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, )] @@ -473,4 +593,161 @@ mod test { serde_json::to_value(thing).unwrap() ); } + + #[test] + fn test_ser_desired() { + let mut thing = Thing::new("app", "thing"); + thing.desired_state.insert( + "foo".to_string(), + DesiredFeature { + last_update: Utc.ymd(2022, 1, 1).and_hms(1, 2, 3), + valid_until: Some(Utc.ymd(2022, 1, 1).and_hms(2, 2, 3)), + value: json!(42), + reconciliation: DesiredFeatureReconciliation::Reconciling { + last_attempt: Some(Utc.ymd(2022, 1, 1).and_hms(1, 2, 4)), + }, + method: DesiredFeatureMethod::Manual, + }, + ); + thing.desired_state.insert( + "bar".to_string(), + DesiredFeature { + last_update: Utc.ymd(2022, 1, 1).and_hms(1, 2, 3), + valid_until: None, + value: json!(42), + reconciliation: DesiredFeatureReconciliation::Succeeded { + when: Utc.ymd(2022, 1, 1).and_hms(1, 2, 4), + }, + method: DesiredFeatureMethod::Manual, + }, + ); + thing.desired_state.insert( + "baz".to_string(), + DesiredFeature { + last_update: Utc.ymd(2022, 1, 1).and_hms(1, 2, 3), + valid_until: None, + value: json!(42), + reconciliation: DesiredFeatureReconciliation::Failed { + when: Utc.ymd(2022, 1, 1).and_hms(1, 2, 4), + reason: Some("The dog ate my command".to_string()), + }, + method: DesiredFeatureMethod::Manual, + }, + ); + thing.desired_state.insert( + "method_code".to_string(), + DesiredFeature { + last_update: Utc.ymd(2022, 1, 1).and_hms(1, 2, 3), + valid_until: None, + value: json!(42), + reconciliation: DesiredFeatureReconciliation::Reconciling { + last_attempt: Some(Utc.ymd(2022, 1, 1).and_hms(1, 2, 4)), + }, + method: DesiredFeatureMethod::Code(Code::JavaScript("true".to_string())), + }, + ); + thing.desired_state.insert( + "method_command".to_string(), + DesiredFeature { + last_update: Utc.ymd(2022, 1, 1).and_hms(1, 2, 3), + valid_until: None, + value: json!(42), + reconciliation: DesiredFeatureReconciliation::Reconciling { + last_attempt: Some(Utc.ymd(2022, 1, 1).and_hms(1, 2, 4)), + }, + method: DesiredFeatureMethod::Command { + channel: "set-feature".to_string(), + payload: DesiredFeatureCommandPayload::Raw, + }, + }, + ); + thing.desired_state.insert( + "method_external".to_string(), + DesiredFeature { + last_update: Utc.ymd(2022, 1, 1).and_hms(1, 2, 3), + valid_until: None, + value: json!(42), + reconciliation: DesiredFeatureReconciliation::Reconciling { + last_attempt: Some(Utc.ymd(2022, 1, 1).and_hms(1, 2, 4)), + }, + method: DesiredFeatureMethod::External, + }, + ); + assert_eq!( + json!({ + "metadata": { + "name": "thing", + "application": "app", + }, + "desiredState": { + "bar": { + "value": 42, + "lastUpdate": "2022-01-01T01:02:03Z", + "reconciliation": { + "state": "succeeded", + "when": "2022-01-01T01:02:04Z", + }, + "method": "manual", + }, + "baz": { + "value": 42, + "lastUpdate": "2022-01-01T01:02:03Z", + "reconciliation": { + "state": "failed", + "when": "2022-01-01T01:02:04Z", + "reason": "The dog ate my command", + }, + "method": "manual", + }, + "foo": { + "value": 42, + "lastUpdate": "2022-01-01T01:02:03Z", + "validUntil": "2022-01-01T02:02:03Z", + "reconciliation": { + "state": "reconciling", + "lastAttempt": "2022-01-01T01:02:04Z", + }, + "method": "manual", + }, + "method_code": { + "value": 42, + "lastUpdate": "2022-01-01T01:02:03Z", + "reconciliation": { + "state": "reconciling", + "lastAttempt": "2022-01-01T01:02:04Z", + }, + "method": { + "code": { + "javaScript": "true", + }, + }, + }, + "method_command": { + "value": 42, + "lastUpdate": "2022-01-01T01:02:03Z", + "reconciliation": { + "state": "reconciling", + "lastAttempt": "2022-01-01T01:02:04Z", + }, + "method": { + "command": { + "channel": "set-feature", + "payload": "raw", + } + }, + }, + "method_external": { + "value": 42, + "lastUpdate": "2022-01-01T01:02:03Z", + "reconciliation": { + "state": "reconciling", + "lastAttempt": "2022-01-01T01:02:04Z", + }, + "method": "external", + }, + } + }), + serde_json::to_value(thing).unwrap() + ); + } } diff --git a/core/src/service/updater.rs b/core/src/service/updater.rs index 0c15bf9..7186bbf 100644 --- a/core/src/service/updater.rs +++ b/core/src/service/updater.rs @@ -1,8 +1,12 @@ -use crate::model::{Reconciliation, ReportedFeature, SyntheticFeature, SyntheticType, Thing}; -use chrono::Utc; +use crate::model::{ + DesiredFeature, DesiredFeatureMethod, DesiredFeatureReconciliation, DesiredMode, + Reconciliation, ReportedFeature, SyntheticFeature, SyntheticType, Thing, +}; +use chrono::{DateTime, Utc}; use serde_json::Value; use std::collections::{btree_map::Entry, BTreeMap}; use std::convert::Infallible; +use std::time::Duration; pub use json_patch::Patch; @@ -161,6 +165,114 @@ impl InfallibleUpdater for SyntheticStateUpdater { } } +/// A more flexible update struct for [`DesiredFeature`]. +#[derive(Clone, Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct DesiredStateUpdate { + #[serde(default)] + pub value: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub valid_until: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + #[serde(with = "humantime_serde")] + pub valid_for: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub mode: Option, + + #[serde(default)] + pub reconciliation: Option, + #[serde(default)] + pub method: Option, +} + +#[derive(Debug, thiserror::Error)] +pub enum DesiredStateUpdaterError { + #[error("Out of range: {0}")] + OutOfRange(#[from] time::OutOfRangeError), +} + +pub struct DesiredStateUpdater(pub String, pub DesiredStateUpdate); + +impl Updater for DesiredStateUpdater { + type Error = DesiredStateUpdaterError; + + fn update(self, mut thing: Thing) -> Result { + let DesiredStateUpdate { + value, + valid_until, + valid_for, + reconciliation, + method, + mode, + } = self.1; + + let valid_until = valid_until.or(valid_for + .map(|d| chrono::Duration::from_std(d)) + .transpose()? + .map(|d| Utc::now() + d)); + + match thing.desired_state.entry(self.0) { + Entry::Occupied(mut entry) => { + // we update what we got + let entry = entry.get_mut(); + if let Some(value) = value { + entry.value = value; + } + entry.valid_until = valid_until; + if let Some(reconciliation) = reconciliation { + entry.reconciliation = reconciliation; + } + if let Some(method) = method { + entry.method = method; + } + if let Some(mode) = mode { + entry.mode = mode; + } + } + Entry::Vacant(entry) => { + // we create some reasonable defaults + entry.insert(DesiredFeature { + value: value.unwrap_or_default(), + last_update: Utc::now(), + valid_until, + reconciliation: reconciliation.unwrap_or_default(), + method: method.unwrap_or_default(), + mode: mode.unwrap_or_default(), + }); + } + } + + Ok(thing) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum DesiredStateValueUpdaterError { + #[error("Unknown feature: {0}")] + Unknown(String), +} + +pub struct DesiredStateValueUpdater { + pub name: String, + pub value: Value, + pub valid_until: Option>, +} + +impl Updater for DesiredStateValueUpdater { + type Error = DesiredStateValueUpdaterError; + + fn update(self, mut thing: Thing) -> Result { + let state = thing + .desired_state + .get_mut(&self.name) + .ok_or_else(|| DesiredStateValueUpdaterError::Unknown(self.name))?; + state.value = self.value; + state.valid_until = self.valid_until; + Ok(thing) + } +} + #[cfg(test)] mod test { diff --git a/core/src/storage/postgres/mod.rs b/core/src/storage/postgres/mod.rs index ed83326..95d28e8 100644 --- a/core/src/storage/postgres/mod.rs +++ b/core/src/storage/postgres/mod.rs @@ -440,9 +440,5 @@ impl Storage { } fn waker_data(thing: &Thing) -> Option> { - thing - .internal - .as_ref() - .and_then(|i| i.waker.as_ref()) - .map(|w| w.when) + thing.internal.as_ref().and_then(|i| i.waker.when) } diff --git a/core/src/waker/postgres.rs b/core/src/waker/postgres.rs index 074e9b2..b0d1fb1 100644 --- a/core/src/waker/postgres.rs +++ b/core/src/waker/postgres.rs @@ -197,8 +197,8 @@ where let reasons = data .internal .as_ref() - .and_then(|i| i.waker.as_ref()) - .map(|w| &w.why) + .filter(|i| i.waker.when.is_some()) + .map(|i| &i.waker.why) .map(|r| r.iter().map(|r| *r).collect::>()) .unwrap_or_default(); @@ -239,10 +239,10 @@ where mut data: Data, ) -> anyhow::Result<()> { // we clear the waker and commit the transaction. The oplock should hold, as we have locked - // the record. + // the record "for update". if let Some(internal) = &mut data.internal { - internal.waker = None; + internal.waker = Default::default(); } let stmt = tx diff --git a/core/tests/base/waker.rs b/core/tests/base/waker.rs index c50c2b2..3dc022d 100644 --- a/core/tests/base/waker.rs +++ b/core/tests/base/waker.rs @@ -27,18 +27,18 @@ async fn test_process() { Code::JavaScript( r#" function wakeup(when) { - waker = when; + context.waker = when; } -if (newState.reportedState?.["foo"] === undefined) { - if (newState.metadata.annotations?.["test"] === "true") { - newState.reportedState = {}; - newState.reportedState["foo"] = { +if (context.newState.reportedState?.["foo"] === undefined) { + if (context.newState.metadata.annotations?.["test"] === "true") { + context.newState.reportedState = {}; + context.newState.reportedState["foo"] = { value: "bar", lastUpdate: new Date().toISOString(), } } else { - newState.metadata.annotations = {"test": "true"}; + context.newState.metadata.annotations = {"test": "true"}; wakeup("5s"); } } @@ -57,10 +57,10 @@ if (newState.reportedState?.["foo"] === undefined) { .await .unwrap(); - let wakeup = thing.internal.unwrap().waker.unwrap(); + let wakeup = thing.internal.unwrap().waker; assert_eq!(wakeup.why, BTreeSet::from([WakerReason::Reconcile])); - assert!(wakeup.when > Utc::now()); + assert!(wakeup.when.unwrap() > Utc::now()); // it should also have our annotation @@ -78,7 +78,9 @@ if (newState.reportedState?.["foo"] === undefined) { let thing = service.get(&id).await.unwrap().unwrap(); // waker expired, so it must be gone - assert!(thing.internal.clone().unwrap_or_default().waker.is_none()); + let internal = thing.internal.clone().unwrap_or_default(); + assert!(internal.waker.when.is_none()); + assert!(internal.waker.why.is_empty()); assert_eq!(thing.reported_state.get("foo").unwrap().value, json!("bar")); @@ -114,16 +116,16 @@ async fn test_timer() { Some(Duration::from_secs(3)), Code::JavaScript( r#" -if (newState.metadata.annotations === undefined) { - newState.metadata.annotations = {}; +if (context.newState.metadata.annotations === undefined) { + context.newState.metadata.annotations = {}; } -if (newState.reportedState === undefined) { - newState.reportedState = {}; +if (context.newState.reportedState === undefined) { + context.newState.reportedState = {}; } const lastUpdate = new Date().toISOString(); -const value = (newState.reportedState["timer"]?.value || 0) + 1; -newState.reportedState["timer"] = { value, lastUpdate }; +const value = (context.newState.reportedState["timer"]?.value || 0) + 1; +context.newState.reportedState["timer"] = { value, lastUpdate }; "# .to_string(), ), @@ -148,9 +150,9 @@ newState.reportedState["timer"] = { value, lastUpdate }; assert_eq!(thing.reported_state.get("timer").map(|f| &f.value), None); // check the waker - let wakeup = thing.internal.unwrap().waker.unwrap(); + let wakeup = thing.internal.unwrap().waker; assert_eq!(wakeup.why, BTreeSet::from([WakerReason::Reconcile])); - assert!(wakeup.when > Utc::now()); + assert!(wakeup.when.unwrap() > Utc::now()); // wait until the initial delay has expired tokio::time::sleep_until(tokio::time::Instant::now() + Duration::from_secs(3 + 1)).await; @@ -159,7 +161,7 @@ newState.reportedState["timer"] = { value, lastUpdate }; let thing = service.get(&id).await.unwrap().unwrap(); - let wakeup = thing.internal.unwrap().waker.unwrap(); + let wakeup = thing.internal.unwrap().waker; assert_eq!( wakeup.why.clone().into_iter().collect::>(), &[WakerReason::Reconcile] diff --git a/core/tests/common/mock.rs b/core/tests/common/mock.rs index 2ec5da0..a281eb0 100644 --- a/core/tests/common/mock.rs +++ b/core/tests/common/mock.rs @@ -422,8 +422,9 @@ impl MockWaker { pub async fn update(&self, thing: &Thing) { let mut lock = self.wakers.lock().await; - match thing.internal.as_ref().and_then(|i| i.waker.as_ref()) { - Some(waker) => { + let waker = thing.waker(); + match waker.when { + Some(when) => { let id = TargetId { id: Id::new( thing.metadata.application.clone(), @@ -439,11 +440,7 @@ impl MockWaker { }; lock.insert( thing.metadata.name.clone(), - ( - waker.when, - waker.why.iter().map(|r| *r).collect::>(), - id, - ), + (when, waker.why.iter().map(|r| *r).collect::>(), id), ); } None => { diff --git a/core/tests/failures/event.rs b/core/tests/failures/event.rs index 41b4ea3..27ca541 100644 --- a/core/tests/failures/event.rs +++ b/core/tests/failures/event.rs @@ -121,13 +121,13 @@ where r#" // send empty strategic merge, every time we reconcile -if (newState.reportedState === undefined) { - newState.reportedState = {} +if (context.newState.reportedState === undefined) { + context.newState.reportedState = {} } -const value = (newState.reportedState["counter"]?.value || 0) + 1; -newState.reportedState["counter"] = { value, lastUpdate: new Date().toISOString() }; +const value = (context.newState.reportedState["counter"]?.value || 0) + 1; +context.newState.reportedState["counter"] = { value, lastUpdate: new Date().toISOString() }; -outbox.push({thing: "thing2", message: { merge: {"counter": value}}}); +context.outbox.push({thing: "thing2", message: { merge: {"counter": value}}}); "# .to_string(), )), diff --git a/examples/60_synthetics.adoc b/examples/60_synthetics.adoc index 2f45af1..6e8c772 100644 --- a/examples/60_synthetics.adoc +++ b/examples/60_synthetics.adoc @@ -25,7 +25,6 @@ include::60_synthetics/syn2.js[] ---- http PUT localhost:8080/api/v1alpha1/things/default/things/foo/syntheticStates/add javaScript=@60_synthetics/syn2.js http PUT localhost:8080/api/v1alpha1/things/default/things/foo/reportedStates value1:=42 value2:=23 -http PUT localhost:8080/api/v1alpha1/things/default/things/foo/reportedStates value2:=23 ---- == Rename a feature diff --git a/examples/70_desired_state.adoc b/examples/70_desired_state.adoc new file mode 100644 index 0000000..661f2f7 --- /dev/null +++ b/examples/70_desired_state.adoc @@ -0,0 +1,73 @@ += Desired state aka Commands + +== Create an externally reconciled desired state + +[source,shell] +---- +http PUT localhost:8080/api/v1alpha1/things/default/things/foo/desiredStates/temperature method:=external +---- + +Set the desired value (ensure it is currently not already `23`): + +[source,shell] +---- +http PUT localhost:8080/api/v1alpha1/things/default/things/foo/desiredStates/temperature value:=23 +---- + +Now check the state with a `GET`. You should see that the reconciliation state is `reconciling` and it is waiting for +the report of the state to match the desired value: + +[source,json] +---- +{ + "desiredState": { + "temperature": { + "lastUpdate": "2022-07-29T08:14:45.072140510Z", + "method": "external", + "reconciliation": { + "state": "reconciling" + }, + "value": 23 + } + } +} +---- + +Now set the reported state to the desired value: + +[source,shell] +---- +http PUT localhost:8080/api/v1alpha1/things/default/things/foo/reportedStates temperature:=23 +---- + +Perform the `GET` operation again, and you will see that the reconciliation succeeded: + +[source,json] +---- +{ + "desiredState": { + "temperature": { + "lastUpdate": "2022-07-29T08:14:45.072140510Z", + "method": "external", + "reconciliation": { + "state": "succeeded", + "when": "2022-07-29T08:14:59.579334005Z" + }, + "value": 23 + } + } +} +---- + +By default, the mode is `Sync`, which means that the system expects the state to stay with the desired state. Switching +to another reported value, will restart the reconciliation process: + +[source,shell] +---- +http PUT localhost:8080/api/v1alpha1/things/default/things/foo/reportedStates temperature:=22 +---- + +Now `GET`, and check that it did. Set the value back to the desired state afterwards to get it back into the +"successful" state. + +NOTE: It will try to reconcile only for as long as the desired value is valid. Once it expired, it no longer tries. diff --git a/ideas/brainstorming.adoc b/ideas/brainstorming.adoc index 96ff80b..bb6618f 100644 --- a/ideas/brainstorming.adoc +++ b/ideas/brainstorming.adoc @@ -174,7 +174,7 @@ reconciliation: updateAggregate("some-device", "some-other-feature", newState["some-feature"]) } - periodic: + timers: command: period: 1m enabled: false # defaults to true