From c6c59f3d214c7659b41e96cc07c41c4a05fe11aa Mon Sep 17 00:00:00 2001 From: Jens Reimann Date: Fri, 22 Jul 2022 17:18:51 +0200 Subject: [PATCH] feat: implement core of wakeup stuff --- Cargo.lock | 2 + backend/src/endpoints.rs | 11 +- backend/src/lib.rs | 40 +- backend/src/notifier/actix.rs | 62 ++- backend/src/notifier/mod.rs | 1 + core/Cargo.toml | 2 + core/examples/gen_schema.rs | 3 +- core/src/app.rs | 2 +- core/src/lib.rs | 1 + core/src/listener/mod.rs | 8 +- core/src/machine/deno.rs | 39 ++ core/src/machine/mod.rs | 17 +- core/src/model/mod.rs | 99 +++- core/src/notifier/kafka.rs | 2 +- core/src/notifier/mod.rs | 2 +- core/src/processor/mod.rs | 113 +++- core/src/processor/sink/kafka.rs | 83 +++ core/src/processor/sink/mod.rs | 31 ++ core/src/processor/source/kafka.rs | 114 +--- core/src/processor/source/mod.rs | 36 +- core/src/service/id.rs | 15 + core/src/service/mod.rs | 84 ++- core/src/storage/mod.rs | 56 +- core/src/storage/postgres/mod.rs | 199 ++++--- core/src/waker/mod.rs | 79 +++ core/src/waker/postgres.rs | 286 ++++++++++ core/tests/processor.rs | 63 +++ core/tests/service.rs | 164 ++++++ core/tests/setup.rs | 505 ++++++++++++++++++ core/tests/waker.rs | 102 ++++ .../migrations/00000000000000_init/up.sql | 4 + develop/CHEATSHEET.adoc | 15 + develop/docker-compose.yml | 38 ++ examples/10_basic.adoc | 10 +- examples/20_reconcile.adoc | 6 +- examples/20_reconcile/recon2.js | 29 +- examples/30_notifications.adoc | 2 +- examples/30_notifications/index.html | 8 +- examples/99_just_testing.adoc | 38 ++ processor/src/lib.rs | 28 +- server/src/injector/mod.rs | 11 +- server/src/injector/mqtt.rs | 2 +- server/src/main.rs | 78 ++- 43 files changed, 2114 insertions(+), 376 deletions(-) create mode 100644 core/src/processor/sink/kafka.rs create mode 100644 core/src/processor/sink/mod.rs create mode 100644 core/src/waker/mod.rs create mode 100644 core/src/waker/postgres.rs create mode 100644 core/tests/processor.rs create mode 100644 core/tests/service.rs create mode 100644 core/tests/setup.rs create mode 100644 core/tests/waker.rs create mode 100644 develop/CHEATSHEET.adoc create mode 100644 develop/docker-compose.yml create mode 100644 examples/99_just_testing.adoc diff --git a/Cargo.lock b/Cargo.lock index 2802def..9c3542e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -858,6 +858,7 @@ dependencies = [ "drogue-doppelgaenger-common", "env_logger", "futures", + "humantime", "humantime-serde", "json-patch", "jsonschema", @@ -867,6 +868,7 @@ dependencies = [ "opentelemetry", "opentelemetry-jaeger", "postgres-native-tls", + "postgres-types", "prometheus", "rdkafka", "rumqttc", diff --git a/backend/src/endpoints.rs b/backend/src/endpoints.rs index 926c380..dba6d69 100644 --- a/backend/src/endpoints.rs +++ b/backend/src/endpoints.rs @@ -2,13 +2,14 @@ use crate::notifier::actix::WebSocketHandler; use crate::Instance; use actix_web::{web, HttpRequest, HttpResponse}; use actix_web_actors::ws; -use drogue_doppelgaenger_core::processor::source::Sink; -use drogue_doppelgaenger_core::service::JsonMergeUpdater; +use drogue_doppelgaenger_core::processor::sink::Sink; use drogue_doppelgaenger_core::{ listener::KafkaSource, model::{Reconciliation, Thing}, notifier::Notifier, - service::{Id, JsonPatchUpdater, Patch, ReportedStateUpdater, Service, UpdateMode}, + service::{ + Id, JsonMergeUpdater, JsonPatchUpdater, Patch, ReportedStateUpdater, Service, UpdateMode, + }, storage::Storage, }; use serde_json::{json, Value}; @@ -18,7 +19,7 @@ pub async fn things_get( service: web::Data>, path: web::Path, ) -> Result { - let result = service.get(path.into_inner()).await?; + let result = service.get(&path.into_inner()).await?; Ok(HttpResponse::Ok().json(result)) } @@ -106,7 +107,7 @@ pub async fn things_delete( service: web::Data>, path: web::Path, ) -> Result { - service.delete(path.into_inner()).await?; + service.delete(&path.into_inner()).await?; Ok(HttpResponse::NoContent().json(json!({}))) } diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 44a28cd..f80e08b 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -3,26 +3,24 @@ mod notifier; use actix_web::{guard, web, App, HttpServer}; use anyhow::anyhow; -use drogue_doppelgaenger_core::listener::KafkaSource; -use drogue_doppelgaenger_core::notifier::{kafka, Notifier}; -use drogue_doppelgaenger_core::processor::source::{EventStream, Sink}; -use drogue_doppelgaenger_core::service::Service; -use drogue_doppelgaenger_core::storage::postgres; -use drogue_doppelgaenger_core::{app::run_main, processor, service, storage::Storage}; -use futures::future::LocalBoxFuture; -use futures::{FutureExt, TryFutureExt}; +use drogue_doppelgaenger_core::{ + app::run_main, + listener::KafkaSource, + notifier::{kafka, Notifier}, + processor::sink::{self, Sink}, + service::{self, Service}, + storage::{postgres, Storage}, +}; +use futures::{future::LocalBoxFuture, FutureExt, TryFutureExt}; #[derive(Clone, Debug, serde::Deserialize)] -pub struct Config { +pub struct Config { pub application: Option, // serde(bound) required as S isn't serializable: https://github.com/serde-rs/serde/issues/1296 #[serde(bound = "")] - pub service: service::Config, + pub service: service::Config, pub listener: kafka::Config, - - // FIXME: fix up sink configuration - pub sink: processor::source::kafka::Config, } #[derive(Clone, Debug)] @@ -31,14 +29,12 @@ pub struct Instance { } pub fn configure( - config: Config, + config: Config, ) -> anyhow::Result<( impl Fn(&mut web::ServiceConfig) + Send + Sync + Clone, LocalBoxFuture<'static, anyhow::Result<()>>, )> { - let (_, sink) = processor::source::kafka::EventStream::new(config.sink)?; - - let service = Service::new(config.service, sink)?; + let service = Service::from_config(config.service)?; let service = web::Data::new(service); let (source, runner) = KafkaSource::new(config.listener)?; @@ -77,11 +73,11 @@ pub fn configure( ), ); ctx.service( - web::resource("/api/v1alpha1/things/{application}/things/{thing}/reportedState") + web::resource("/api/v1alpha1/things/{application}/things/{thing}/reportedStates") .route(web::put().to(endpoints::things_update_reported_state::)), ); ctx.service( - web::resource("/api/v1alpha1/things/{application}/things/{thing}/reconciliation") + web::resource("/api/v1alpha1/things/{application}/things/{thing}/reconciliations") .route(web::put().to(endpoints::things_update_reconciliation::)), ); ctx.service( @@ -97,8 +93,10 @@ pub fn configure( )) } -pub async fn run(config: Config) -> anyhow::Result<()> { - let (configurator, runner) = configure::<_, _, processor::source::kafka::Sink>(config)?; +pub async fn run( + config: Config, +) -> anyhow::Result<()> { + let (configurator, runner) = configure::<_, _, _>(config)?; let http = HttpServer::new(move || App::new().configure(|ctx| configurator(ctx))) .bind("[::]:8080")? diff --git a/backend/src/notifier/actix.rs b/backend/src/notifier/actix.rs index 4c2fcaf..f405b3d 100644 --- a/backend/src/notifier/actix.rs +++ b/backend/src/notifier/actix.rs @@ -2,10 +2,10 @@ use super::{CLIENT_TIMEOUT, HEARTBEAT_INTERVAL}; use crate::notifier::{Request, Response}; use actix::{Actor, ActorContext, AsyncContext, Handler, SpawnHandle, StreamHandler, WrapFuture}; use actix_web_actors::ws::{self, CloseCode, CloseReason}; -use drogue_doppelgaenger_core::processor::source::Sink; use drogue_doppelgaenger_core::{ listener::{KafkaSource, Message}, notifier::Notifier, + processor::sink::Sink, service::{Id, Service}, storage::Storage, }; @@ -174,37 +174,53 @@ impl Handler for WebSocke let service = self.service.clone(); + // subscribe first let mut source = self.source.subscribe(id.clone()); + let addr = ctx.address(); let i = id.clone(); let task = ctx.spawn( async move { - // read the initial state - // FIXME: filter out "not found" - if let Ok(thing) = service.get(id).await { - let initial_generation = thing.metadata.generation; - // send initial - addr.do_send(message::Event(Response::Change { - thing: Arc::new(thing), - })); - - while let Some(msg) = source.next().await { - match msg { - Ok(Message::Change(thing)) => { - if thing.metadata.generation > initial_generation { - // prevent initial duplicates - addr.do_send(message::Event(Response::Change { thing })) - } else { - log::info!("Suppressing duplicate generation change"); - } - } - Err(BroadcastStreamRecvError::Lagged(lag)) => { - addr.do_send(message::Event(Response::Lag { lag })) + // now read the initial state + let initial_generation = match service.get(&id).await { + Ok(Some(thing)) => { + let initial_generation = thing.metadata.generation; + // send initial + addr.do_send(message::Event(Response::Initial { + thing: Arc::new(thing), + })); + initial_generation + } + Ok(None) => Some(0), + Err(err) => { + // stream closed + addr.do_send(message::Close(Some(CloseReason { + code: CloseCode::Abnormal, + description: Some("Failed to read initial state".to_string()), + }))); + + log::warn!("Failed to read initial state: {err}"); + return; + } + }; + + // and run the loop + while let Some(msg) = source.next().await { + match msg { + Ok(Message::Change(thing)) => { + if thing.metadata.generation > initial_generation { + // prevent initial duplicates + addr.do_send(message::Event(Response::Change { thing })) + } else { + log::info!("Suppressing duplicate generation change"); } } + Err(BroadcastStreamRecvError::Lagged(lag)) => { + addr.do_send(message::Event(Response::Lag { lag })) + } } - log::warn!("Listener loop exited"); } + log::warn!("Listener loop exited"); // stream closed addr.do_send(message::Close(Some(CloseReason { diff --git a/backend/src/notifier/mod.rs b/backend/src/notifier/mod.rs index 1ba8c7a..b0ae27e 100644 --- a/backend/src/notifier/mod.rs +++ b/backend/src/notifier/mod.rs @@ -19,6 +19,7 @@ pub enum Request { #[serde(rename_all = "camelCase")] #[serde(tag = "type")] pub enum Response { + Initial { thing: Arc }, Change { thing: Arc }, Lag { lag: u64 }, } diff --git a/core/Cargo.toml b/core/Cargo.toml index efac9b4..d8aba4f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -19,11 +19,13 @@ config = "0.13" dotenv = "0.15" env_logger = "0.9" futures = "0.3" +humantime = "2" humantime-serde = "1" json-patch = { version = "0.2", default-features = false } jsonschema = "0.16" log = "0.4" opentelemetry = { version = "0.17", features = ["rt-tokio"] } +postgres-types = "0.2" rdkafka = { version = "0.28", features = ["sasl", "ssl"] } rustls = "0.20" rustls-native-certs = "0.6" diff --git a/core/examples/gen_schema.rs b/core/examples/gen_schema.rs index 994a0d9..e26b45b 100644 --- a/core/examples/gen_schema.rs +++ b/core/examples/gen_schema.rs @@ -1,8 +1,7 @@ use schemars::gen::SchemaSettings; -use schemars::schema_for; fn main() { let schema = schemars::gen::SchemaGenerator::from(SchemaSettings::openapi3()) - .into_root_schema_for::(); + .into_root_schema_for::(); println!("{}", serde_yaml::to_string(&schema).unwrap()); } diff --git a/core/src/app.rs b/core/src/app.rs index c69f321..b3aeda6 100644 --- a/core/src/app.rs +++ b/core/src/app.rs @@ -2,7 +2,6 @@ use futures::future::LocalBoxFuture; use futures::stream::FuturesUnordered; -use std::time::Duration; #[cfg(feature = "jaeger")] use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -112,6 +111,7 @@ where { use futures::FutureExt; use prometheus::{Encoder, TextEncoder}; + use std::time::Duration; futures.push( async move { diff --git a/core/src/lib.rs b/core/src/lib.rs index 5cdde55..9878e1d 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -9,3 +9,4 @@ pub mod processor; pub mod service; pub mod storage; pub mod version; +pub mod waker; diff --git a/core/src/listener/mod.rs b/core/src/listener/mod.rs index f53e02d..190dd88 100644 --- a/core/src/listener/mod.rs +++ b/core/src/listener/mod.rs @@ -75,7 +75,10 @@ impl KafkaSource { let topic = config.topic; - let config: rdkafka::ClientConfig = KafkaProperties(config.properties).into(); + let mut config: rdkafka::ClientConfig = KafkaProperties(config.properties).into(); + + config.set("enable.partition.eof", "false"); + let consumer: StreamConsumer = config.create().context("Creating consumer")?; consumer.subscribe(&[&topic]).context("Start subscribe")?; @@ -89,9 +92,6 @@ impl KafkaSource { consumer, inner: inner.clone(), }; - //let consumer = Self::run(consumer, inner.clone()); - - //let task = Handle::current().spawn(consumer); Ok((Self { inner }, runner)) } diff --git a/core/src/machine/deno.rs b/core/src/machine/deno.rs index 37c1763..2a396f4 100644 --- a/core/src/machine/deno.rs +++ b/core/src/machine/deno.rs @@ -1,6 +1,7 @@ use super::Outgoing; use crate::model::Thing; use anyhow::bail; +use chrono::Duration; use deno_core::{serde_v8, v8, Extension, JsRuntime, RuntimeOptions}; use serde_json::Value; use std::sync::Arc; @@ -16,6 +17,7 @@ const KEY_CURRENT_STATE: &str = "currentState"; const KEY_NEW_STATE: &str = "newState"; const KEY_OUTBOX: &str = "outbox"; const KEY_LOGS: &str = "logs"; +const KEY_WAKER: &str = "waker"; /// Run a deno script /// @@ -144,9 +146,46 @@ fn extract_context(runtime: &mut JsRuntime) -> anyhow::Result { } }; + let waker = { + let key = serde_v8::to_v8(&mut scope, KEY_WAKER)?; + match global.get(scope, key) { + Some(value) => to_duration(serde_v8::from_v8(scope, value)?)?, + None => None, + } + }; + Ok(Outgoing { new_thing, outbox, log, + waker, + }) +} + +/// convert a JavaScript value into a duration +fn to_duration(value: Value) -> anyhow::Result> { + Ok(match value { + Value::String(time) => { + let duration = humantime::parse_duration(&time)?; + Some(Duration::from_std(duration)?) + } + Value::Number(seconds) => { + if let Some(seconds) = seconds.as_i64() { + if seconds > 0 { + return Ok(Some(Duration::seconds(seconds))); + } + } else if let Some(_) = seconds.as_u64() { + // we can be sure it doesn't fit into an i64 + return Ok(Some(Duration::seconds(i64::MAX))); + } else if let Some(seconds) = seconds.as_f64() { + if seconds > i64::MAX as f64 { + return Ok(Some(Duration::seconds(i64::MAX))); + } else if seconds > 0f64 { + return Ok(Some(Duration::seconds(seconds as i64))); + } + } + None + } + _ => None, }) } diff --git a/core/src/machine/mod.rs b/core/src/machine/mod.rs index 1c3873a..834a350 100644 --- a/core/src/machine/mod.rs +++ b/core/src/machine/mod.rs @@ -1,9 +1,10 @@ mod deno; use crate::machine::deno::DenoOptions; -use crate::model::{Code, JsonSchema, Metadata, Schema, Thing, ThingState}; +use crate::model::{Code, JsonSchema, Metadata, Schema, Thing, ThingState, WakerExt, WakerReason}; use crate::processor::Message; use anyhow::anyhow; +use chrono::Duration; use deno_core::url::Url; use jsonschema::{Draft, JSONSchema, SchemaResolver, SchemaResolverError}; use serde_json::Value; @@ -144,11 +145,12 @@ pub struct OutboxMessage { pub message: Message, } -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug)] pub struct Outgoing { pub new_thing: Thing, pub outbox: Vec, pub log: Vec, + pub waker: Option, } async fn reconcile(current_thing: Arc, mut new_thing: Thing) -> Result { @@ -177,10 +179,16 @@ async fn reconcile(current_thing: Arc, mut new_thing: Thing) -> Result Self { + Self::new(&id.application, &id.thing) + } } /// The state view on thing model. @@ -178,6 +183,15 @@ pub struct Changed { pub last_log: Vec, } +impl From for Changed { + fn from(code: Code) -> Self { + Self { + code, + last_log: Default::default(), + } + } +} + #[derive( Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize, schemars::JsonSchema, )] @@ -245,11 +259,90 @@ pub enum Script { #[serde(rename_all = "camelCase")] pub struct Internal { #[serde(default, skip_serializing_if = "Option::is_none")] - pub wakeup: Option>, + pub waker: Option, #[serde(default, skip_serializing_if = "Vec::is_empty")] pub outbox: Vec, } +pub trait WakerExt { + fn wakeup(&mut self, delay: Duration, reason: WakerReason); + fn clear_wakeup(&mut self, reason: WakerReason); +} + +impl WakerExt for Thing { + fn wakeup(&mut self, delay: Duration, reason: WakerReason) { + match &mut self.internal { + Some(internal) => { + internal.wakeup(delay, reason); + } + None => { + let mut internal = Internal::default(); + internal.wakeup(delay, reason); + self.internal = Some(internal); + } + } + } + + fn clear_wakeup(&mut self, reason: WakerReason) { + if let Some(internal) = &mut self.internal { + internal.clear_wakeup(reason); + } + } +} + +impl WakerExt for Internal { + fn wakeup(&mut self, delay: Duration, reason: WakerReason) { + let when = Utc::now() + delay; + match &mut self.waker { + None => { + // no waker, just create it + self.waker = Some(Waker { + when, + why: { + let mut set = BTreeSet::new(); + set.insert(reason); + set + }, + }) + } + Some(waker) => { + // we already have a waker + if waker.when > when { + // wakeup earlier + waker.when = when; + } + // add our reason (if missing) + waker.why.extend(Some(reason)); + } + } + } + + fn clear_wakeup(&mut self, reason: WakerReason) { + if let Some(waker) = &mut self.waker { + waker.why.remove(&reason); + if waker.why.is_empty() { + self.waker = None; + } + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Waker { + pub when: DateTime, + pub why: BTreeSet, +} + +#[derive( + Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, +)] +#[serde(rename_all = "camelCase")] +pub enum WakerReason { + Reconcile, + Outbox, +} + #[cfg(test)] mod test { use super::*; diff --git a/core/src/notifier/kafka.rs b/core/src/notifier/kafka.rs index 0750a63..80d22c3 100644 --- a/core/src/notifier/kafka.rs +++ b/core/src/notifier/kafka.rs @@ -56,7 +56,7 @@ impl super::Notifier for Notifier { type Config = Config; type Error = Error; - fn new(config: &Self::Config) -> anyhow::Result { + fn from_config(config: &Self::Config) -> anyhow::Result { let topic = config.topic.clone(); let timeout = Timeout::After(config.timeout); let config: rdkafka::ClientConfig = KafkaProperties(config.properties.clone()).into(); diff --git a/core/src/notifier/mod.rs b/core/src/notifier/mod.rs index d28eed1..35fb3b9 100644 --- a/core/src/notifier/mod.rs +++ b/core/src/notifier/mod.rs @@ -16,7 +16,7 @@ pub trait Notifier: Sized + Send + Sync + 'static { type Config: Clone + Debug + Send + Sync + serde::de::DeserializeOwned + 'static; type Error: std::error::Error + Debug; - fn new(config: &Self::Config) -> anyhow::Result; + fn from_config(config: &Self::Config) -> anyhow::Result; async fn notify(&self, thing: &Thing) -> Result<(), Error>; } diff --git a/core/src/processor/mod.rs b/core/src/processor/mod.rs index b86e88e..42dd7e5 100644 --- a/core/src/processor/mod.rs +++ b/core/src/processor/mod.rs @@ -1,13 +1,14 @@ +pub mod sink; pub mod source; -use crate::model::Thing; -use crate::notifier::Notifier; -use crate::processor::source::{Sink, Source}; -use crate::service::JsonMergeUpdater; +use crate::model::WakerReason; use crate::{ + model::Thing, + notifier::Notifier, + processor::{sink::Sink, source::Source}, service::{ - self, Id, JsonPatchUpdater, MergeError, PatchError, ReportedStateUpdater, Service, - UpdateMode, Updater, + self, Id, JsonMergeUpdater, JsonPatchUpdater, MergeError, PatchError, ReportedStateUpdater, + Service, UpdateMode, Updater, }, storage::{self, Storage}, }; @@ -20,8 +21,8 @@ use prometheus::{ IntCounterVec, }; use serde_json::Value; -use std::collections::BTreeMap; -use std::convert::Infallible; +use std::{collections::BTreeMap, convert::Infallible}; +use uuid::Uuid; lazy_static! { static ref EVENTS: IntCounter = @@ -41,6 +42,22 @@ pub struct Event { pub message: Message, } +impl Event { + pub fn new, D: Into, M: Into>( + application: A, + device: D, + message: M, + ) -> Self { + Self { + id: Uuid::new_v4().to_string(), + timestamp: Utc::now(), + application: application.into(), + device: device.into(), + message: message.into(), + } + } +} + #[derive(Clone, Debug, PartialEq, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub enum Message { @@ -51,27 +68,80 @@ pub enum Message { }, Patch(Patch), Merge(Value), + Wakeup { + reasons: Vec, + }, } -pub struct Processor +impl Message { + pub fn report_state() -> ReportStateBuilder { + Default::default() + } +} + +#[derive(Clone, Debug, Default)] +pub struct ReportStateBuilder { + state: BTreeMap, + partial: bool, +} + +impl ReportStateBuilder { + pub fn partial(mut self) -> Self { + self.partial = true; + self + } + + pub fn full(mut self) -> Self { + self.partial = false; + self + } + + pub fn state, V: Into>(mut self, property: P, value: V) -> Self { + self.state.insert(property.into(), value.into()); + self + } + + pub fn build(self) -> Message { + Message::ReportState { + state: self.state, + partial: self.partial, + } + } +} + +#[derive(Clone, Debug, serde::Deserialize)] +pub struct Config { + #[serde(bound = "")] + pub service: service::Config, + pub source: So::Config, +} + +pub struct Processor where - S: Storage, - N: Notifier, - I: Source, + St: Storage, + No: Notifier, Si: Sink, + So: Source, { - service: Service, - source: I, + service: Service, + source: So, } -impl Processor +impl Processor where - S: Storage, - N: Notifier, - I: Source, + St: Storage, + No: Notifier, Si: Sink, + So: Source, { - pub fn new(service: Service, source: I) -> Self { + pub fn from_config(config: Config) -> anyhow::Result { + let service = Service::from_config(config.service)?; + let source = So::from_config(config.source)?; + + Ok(Self::new(service, source)) + } + + pub fn new(service: Service, source: So) -> Self { Self { service, source } } @@ -175,6 +245,11 @@ impl Updater for Message { .update(thing)?), Message::Patch(patch) => Ok(JsonPatchUpdater(patch).update(thing)?), Message::Merge(merge) => Ok(JsonMergeUpdater(merge).update(thing)?), + Message::Wakeup { .. } => { + // FIXME: need to implement wakeup + log::warn!("Wakeup"); + Ok(thing) + } } } } diff --git a/core/src/processor/sink/kafka.rs b/core/src/processor/sink/kafka.rs new file mode 100644 index 0000000..d5aa4bb --- /dev/null +++ b/core/src/processor/sink/kafka.rs @@ -0,0 +1,83 @@ +use crate::config::kafka::KafkaProperties; +use crate::processor::Event; +use anyhow::anyhow; +use async_trait::async_trait; +use rdkafka::config::FromClientConfig; +use rdkafka::message::OwnedHeaders; +use rdkafka::producer::{FutureProducer, FutureRecord}; +use std::collections::HashMap; +use std::time::Duration; + +#[derive(Clone, Debug, serde::Deserialize)] +pub struct Config { + #[serde(default)] + pub properties: HashMap, + + pub topic: String, + #[serde(with = "humantime_serde", default = "default::timeout")] + pub timeout: Duration, +} + +mod default { + use std::time::Duration; + + pub const fn timeout() -> Duration { + Duration::from_secs(2) + } +} + +#[derive(Clone)] +pub struct Sink { + producer: FutureProducer, + topic: String, + timeout: Duration, +} + +#[async_trait] +impl super::Sink for Sink { + type Config = Config; + + fn from_config( + Self::Config { + properties, + topic, + timeout, + }: Self::Config, + ) -> anyhow::Result { + let config: rdkafka::ClientConfig = KafkaProperties(properties).into(); + let producer = FutureProducer::from_config(&config)?; + + Ok(Self { + producer, + topic, + timeout, + }) + } + + async fn publish(&self, event: Event) -> anyhow::Result<()> { + let key = format!("{}/{}", event.application, event.device); + + let payload = serde_json::to_vec(&event.message)?; + + let headers = OwnedHeaders::new() + .add("ce_specversion", "1.0") + .add("ce_id", &event.id) + .add("ce_source", "drogue-doppelgaenger") + .add("ce_type", "io.drogue.doppelgeanger.event.v1") + .add("ce_timestamp", &event.timestamp.to_rfc3339()) + .add("content-type", "application/json") + .add("application", &event.application) + .add("device", &event.device); + + let record = FutureRecord::to(&self.topic) + .key(&key) + .payload(&payload) + .headers(headers); + + if let Err((err, _)) = self.producer.send(record, self.timeout).await { + Err(anyhow!(err)) + } else { + Ok(()) + } + } +} diff --git a/core/src/processor/sink/mod.rs b/core/src/processor/sink/mod.rs new file mode 100644 index 0000000..43aa9d8 --- /dev/null +++ b/core/src/processor/sink/mod.rs @@ -0,0 +1,31 @@ +pub mod kafka; + +use crate::processor::Event; +use async_trait::async_trait; +use serde::de::DeserializeOwned; +use std::fmt::Debug; + +#[async_trait] +pub trait Sink: Sized + Send + Sync + Clone + 'static { + type Config: Clone + Debug + DeserializeOwned; + + fn from_config(config: Self::Config) -> anyhow::Result; + + async fn publish(&self, event: Event) -> anyhow::Result<()>; + + async fn publish_iter(&self, i: I) -> Result<(), (usize, anyhow::Error)> + where + I: IntoIterator + Send + Sync, + ::IntoIter: Send + Sync, + { + let mut n = 0; + for event in i.into_iter() { + if let Err(err) = self.publish(event).await { + return Err((n, err.into())); + } + n += 1; + } + + Ok(()) + } +} diff --git a/core/src/processor/source/kafka.rs b/core/src/processor/source/kafka.rs index 36b74e0..252bc50 100644 --- a/core/src/processor/source/kafka.rs +++ b/core/src/processor/source/kafka.rs @@ -2,36 +2,21 @@ use crate::config::kafka::KafkaProperties; use crate::processor::Event; use anyhow::{anyhow, bail}; use async_trait::async_trait; -use rdkafka::config::FromClientConfig; -use rdkafka::consumer::{Consumer, StreamConsumer}; -use rdkafka::message::{BorrowedMessage, Headers, OwnedHeaders}; -use rdkafka::producer::{FutureProducer, FutureRecord}; -use rdkafka::Message; +use rdkafka::{ + config::FromClientConfig, + consumer::{Consumer, StreamConsumer}, + message::{BorrowedMessage, Headers}, + Message, +}; use std::collections::HashMap; use std::future::Future; -use std::time::Duration; #[derive(Clone, Debug, serde::Deserialize)] pub struct Config { #[serde(default)] pub properties: HashMap, - #[serde(default)] - pub source_properties: HashMap, - #[serde(default)] - pub sink_properties: HashMap, - pub topic: String, - #[serde(with = "humantime_serde", default = "default::timeout")] - pub timeout: Duration, -} - -mod default { - use std::time::Duration; - - pub const fn timeout() -> Duration { - Duration::from_secs(2) - } } pub struct EventStream {} @@ -40,12 +25,14 @@ pub struct Source { consumer: StreamConsumer, } -impl Source { - pub fn new(config: Config) -> anyhow::Result { +#[async_trait] +impl super::Source for Source { + type Config = Config; + + fn from_config(config: Self::Config) -> anyhow::Result { let topic = config.topic; - let mut config: rdkafka::ClientConfig = - KafkaProperties::new([config.properties, config.source_properties]).into(); + let mut config: rdkafka::ClientConfig = KafkaProperties(config.properties).into(); config.set("enable.partition.eof", "false"); @@ -64,39 +51,10 @@ impl Source { Ok(Self { consumer }) } -} - -#[derive(Clone)] -pub struct Sink { - producer: FutureProducer, - topic: String, - timeout: Duration, -} - -impl Sink { - pub fn new(config: Config) -> anyhow::Result { - let topic = config.topic.clone(); - let timeout = config.timeout; - let config: rdkafka::ClientConfig = - KafkaProperties::new([config.properties, config.sink_properties]).into(); - - log::info!("Event stream - sink: {config:?}"); - - let producer = FutureProducer::from_config(&config)?; - - Ok(Self { - producer, - topic, - timeout, - }) - } -} -#[async_trait] -impl super::Source for Source { - async fn run(self, mut f: F) -> anyhow::Result<()> + async fn run(self, f: F) -> anyhow::Result<()> where - F: FnMut(Event) -> Fut + Send + Sync, + F: Fn(Event) -> Fut + Send + Sync, Fut: Future> + Send, { log::info!("Running event source loop..."); @@ -155,10 +113,10 @@ fn extract_meta(msg: &BorrowedMessage) -> anyhow::Result<(String, String, String for i in 0..headers.count() { match headers.get_as::(i) { - Some(("id", Ok(value))) => { + Some(("ce_id", Ok(value))) => { id = Some(value); } - Some(("timestamp", Ok(value))) => { + Some(("ce_timestamp", Ok(value))) => { timestamp = Some(value); } Some(("application", Ok(value))) => { @@ -172,10 +130,10 @@ fn extract_meta(msg: &BorrowedMessage) -> anyhow::Result<(String, String, String } Ok(( - id.ok_or_else(|| anyhow!("Missing 'id' header"))? + id.ok_or_else(|| anyhow!("Missing 'ce_id' header"))? .to_string(), timestamp - .ok_or_else(|| anyhow!("Missing 'timestamp' header"))? + .ok_or_else(|| anyhow!("Missing 'ce_timestamp' header"))? .to_string(), application .ok_or_else(|| anyhow!("Missing 'application' header"))? @@ -200,39 +158,3 @@ fn from_msg(msg: &BorrowedMessage) -> anyhow::Result { message, }) } - -#[async_trait] -impl super::Sink for Sink { - async fn publish(&self, event: Event) -> anyhow::Result<()> { - let key = format!("{}/{}", event.application, event.device); - - let payload = serde_json::to_vec(&event.message)?; - - let headers = OwnedHeaders::new() - .add("id", &event.id) - .add("timestamp", &event.timestamp.to_rfc3339()) - .add("application", &event.application) - .add("device", &event.device); - - let record = FutureRecord::to(&self.topic) - .key(&key) - .payload(&payload) - .headers(headers); - - if let Err((err, _)) = self.producer.send(record, self.timeout).await { - Err(anyhow!(err)) - } else { - Ok(()) - } - } -} - -impl super::EventStream for EventStream { - type Config = Config; - type Source = Source; - type Sink = Sink; - - fn new(config: Self::Config) -> anyhow::Result<(Self::Source, Self::Sink)> { - Ok((Source::new(config.clone())?, Sink::new(config)?)) - } -} diff --git a/core/src/processor/source/mod.rs b/core/src/processor/source/mod.rs index 4826d51..99558df 100644 --- a/core/src/processor/source/mod.rs +++ b/core/src/processor/source/mod.rs @@ -2,42 +2,18 @@ pub mod kafka; use crate::processor::Event; use async_trait::async_trait; +use serde::de::DeserializeOwned; +use std::fmt::Debug; use std::future::Future; #[async_trait] -pub trait EventStream { - type Config; - type Source: Source; - type Sink: Sink; +pub trait Source: Sized + Send + Sync { + type Config: Clone + Debug + DeserializeOwned; - fn new(config: Self::Config) -> anyhow::Result<(Self::Source, Self::Sink)>; -} + fn from_config(config: Self::Config) -> anyhow::Result; -#[async_trait] -pub trait Source: Sized + Send + Sync { async fn run(self, f: F) -> anyhow::Result<()> where - F: FnMut(Event) -> Fut + Send + Sync, + F: Fn(Event) -> Fut + Send + Sync, Fut: Future> + Send; } - -#[async_trait] -pub trait Sink: Sized + Send + Sync + Clone + 'static { - async fn publish(&self, event: Event) -> anyhow::Result<()>; - - async fn publish_iter(&self, i: I) -> Result<(), (usize, anyhow::Error)> - where - I: IntoIterator + Send + Sync, - ::IntoIter: Send + Sync, - { - let mut n = 0; - for event in i.into_iter() { - if let Err(err) = self.publish(event).await { - return Err((n, err.into())); - } - n += 1; - } - - Ok(()) - } -} diff --git a/core/src/service/id.rs b/core/src/service/id.rs index b6fe870..090e74f 100644 --- a/core/src/service/id.rs +++ b/core/src/service/id.rs @@ -8,8 +8,23 @@ pub struct Id { pub thing: String, } +impl Id { + pub fn new, T: Into>(application: A, thing: T) -> Self { + Self { + application: application.into(), + thing: thing.into(), + } + } +} + impl Display for Id { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{}/{}", self.application, self.thing) } } + +impl, T: Into> From<(A, T)> for Id { + fn from((application, thing): (A, T)) -> Self { + Id::new(application, thing) + } +} diff --git a/core/src/service/mod.rs b/core/src/service/mod.rs index 340f186..fb2701c 100644 --- a/core/src/service/mod.rs +++ b/core/src/service/mod.rs @@ -2,7 +2,7 @@ mod error; mod id; mod updater; -use chrono::Utc; +use chrono::{Duration, Utc}; pub use error::*; pub use id::Id; use std::convert::Infallible; @@ -10,9 +10,9 @@ pub use updater::*; use uuid::Uuid; use crate::machine::{Machine, OutboxMessage, Outcome}; -use crate::model::Thing; +use crate::model::{Thing, WakerExt, WakerReason}; use crate::notifier::Notifier; -use crate::processor::source::Sink; +use crate::processor::sink::Sink; use crate::processor::Event; use crate::storage::{self, Storage}; use lazy_static::lazy_static; @@ -24,23 +24,25 @@ lazy_static! { } #[derive(Debug, serde::Deserialize)] -pub struct Config { - pub storage: S::Config, - pub notifier: N::Config, +pub struct Config { + pub storage: St::Config, + pub notifier: No::Config, + pub sink: Si::Config, } -impl Clone for Config { +impl Clone for Config { fn clone(&self) -> Self { Self { storage: self.storage.clone(), notifier: self.notifier.clone(), + sink: self.sink.clone(), } } } -pub struct Service { - storage: S, - notifier: N, +pub struct Service { + storage: St, + notifier: No, sink: Si, } @@ -65,20 +67,28 @@ where } } -impl Service { - // FIXME: I don't like having the sink provided here directly. Need to split this up! - pub fn new(config: Config, sink: Si) -> anyhow::Result { - let Config { storage, notifier } = config; - let storage = S::new(&storage)?; - let notifier = N::new(¬ifier)?; - Ok(Self { +impl Service { + pub fn from_config(config: Config) -> anyhow::Result { + let Config { storage, notifier, sink, - }) + } = config; + let storage = St::from_config(&storage)?; + let notifier = No::from_config(¬ifier)?; + let sink = Si::from_config(sink)?; + Ok(Self::new(storage, notifier, sink)) } - pub async fn create(&self, thing: Thing) -> Result<(), Error> { + pub fn new(storage: St, notifier: No, sink: Si) -> Self { + Self { + storage, + notifier, + sink, + } + } + + pub async fn create(&self, thing: Thing) -> Result> { let Outcome { mut new_thing, outbox, @@ -95,32 +105,39 @@ impl Service { // we can send the events right away, as we created the entry - self.send_and_ack(new_thing, outbox).await?; + let new_thing = self.send_and_ack(new_thing, outbox).await?; + + // notify + + self.notifier + .notify(&new_thing) + .await + .map_err(Error::Notifier)?; // FIXME: handle error - Ok(()) + Ok(new_thing) } - pub async fn get(&self, id: Id) -> Result> { + pub async fn get(&self, id: &Id) -> Result, Error> { self.storage .get(&id.application, &id.thing) .await .map_err(Error::Storage) } - pub async fn delete(&self, id: Id) -> Result<(), Error> { + pub async fn delete(&self, id: &Id) -> Result> { self.storage .delete(&id.application, &id.thing) .await .or_else(|err| match err { // if we didn't find what we want to delete, this is just fine - storage::Error::NotFound => Ok(()), + storage::Error::NotFound => Ok(false), err => Err(Error::Storage(err)), }) } - pub async fn update(&self, id: &Id, updater: U) -> Result<(), Error> + pub async fn update(&self, id: &Id, updater: U) -> Result> where U: Updater, { @@ -128,6 +145,7 @@ impl Service { .storage .get(&id.application, &id.thing) .await + .and_then(|r| r.ok_or(storage::Error::NotFound)) .map_err(Error::Storage)?; let Outcome { @@ -144,7 +162,7 @@ impl Service { if current_thing == new_thing { log::debug!("Thing state not changed. Return early!"); // no change, nothing to do - return Ok(()); + return Ok(current_thing); } OUTBOX_EVENTS.inc_by(outbox.len() as u64); @@ -166,7 +184,7 @@ impl Service { log::debug!("Current outbox size: {}", current_outbox); if current_outbox <= 0 { - // only sending when we had no previous events + // only send when we had no previous events new_thing = self.send_and_ack(new_thing, outbox).await?; } @@ -181,7 +199,7 @@ impl Service { // done - Ok(()) + Ok(new_thing) } /// Add new, scheduled, messages to the outbox, and return the entries to send out. @@ -215,6 +233,12 @@ impl Service { internal.outbox.extend(add.clone()); + // schedule waker + + if !internal.outbox.is_empty() { + internal.wakeup(Duration::seconds(30), WakerReason::Outbox); + } + // return the added events add @@ -224,7 +248,7 @@ impl Service { &self, mut new_thing: Thing, outbox: Vec, - ) -> Result> { + ) -> Result> { log::debug!("New outbox: {outbox:?}"); if outbox.is_empty() { @@ -239,6 +263,8 @@ impl Service { // ack outbox events if let Some(internal) = &mut new_thing.internal { internal.outbox.clear(); + // we can clear the waker, as we are sure that the outbox was clear initially + internal.clear_wakeup(WakerReason::Outbox); new_thing = self .storage .update(new_thing) diff --git a/core/src/storage/mod.rs b/core/src/storage/mod.rs index 00b86b7..f9a5902 100644 --- a/core/src/storage/mod.rs +++ b/core/src/storage/mod.rs @@ -1,12 +1,15 @@ pub mod postgres; -use crate::model::Thing; +use crate::model::{Metadata, Thing}; use async_trait::async_trait; use std::fmt::Debug; use std::future::Future; #[derive(Debug, thiserror::Error)] pub enum Error { + /// Returned when an option should modify a thing, but it could not be found. + /// + /// Not used, when not finding the things isn't a problem. #[error("Not found")] NotFound, #[error("Not allowed")] @@ -36,11 +39,13 @@ pub trait Storage: Sized + Send + Sync + 'static { type Config: Clone + Debug + Send + Sync + serde::de::DeserializeOwned + 'static; type Error: std::error::Error + Debug; - fn new(config: &Self::Config) -> anyhow::Result; + fn from_config(config: &Self::Config) -> anyhow::Result; - async fn get(&self, application: &str, name: &str) -> Result>; + async fn get(&self, application: &str, name: &str) + -> Result, Error>; async fn create(&self, thing: Thing) -> Result>; async fn update(&self, thing: Thing) -> Result>; + async fn patch( &self, application: &str, @@ -50,6 +55,47 @@ pub trait Storage: Sized + Send + Sync + 'static { where F: FnOnce(Thing) -> Fut + Send + Sync, Fut: Future> + Send + Sync, - E: Send + Sync; - async fn delete(&self, application: &str, name: &str) -> Result<(), Error>; + E: Send + Sync, + { + log::debug!("Updating existing thing: {application} / {name}"); + + let current_thing = self.get(application, name).await?.ok_or(Error::NotFound)?; + // capture current metadata + let Metadata { + name, + application, + uid, + creation_timestamp, + resource_version, + generation, + annotations: _, + labels: _, + } = current_thing.metadata.clone(); + let mut new_thing = f(current_thing.clone()) + .await + .map_err(UpdateError::Mutator)?; + + // override metadata which must not be changed by the caller + new_thing.metadata = Metadata { + name, + application, + uid, + creation_timestamp, + resource_version, + generation, + ..new_thing.metadata + }; + + if current_thing == new_thing { + // no change + return Ok(current_thing); + } + + // perform update + + self.update(new_thing).await.map_err(UpdateError::Service) + } + + /// Delete a thing. Return `true` if the thing was deleted, `false` if it didn't exist. + async fn delete(&self, application: &str, name: &str) -> Result>; } diff --git a/core/src/storage/postgres/mod.rs b/core/src/storage/postgres/mod.rs index b0ecb4c..05eea2a 100644 --- a/core/src/storage/postgres/mod.rs +++ b/core/src/storage/postgres/mod.rs @@ -1,14 +1,15 @@ mod utils; -use crate::model::{Internal, Schema}; +use crate::model::{Internal, Schema, WakerReason}; use crate::{ model::{DesiredFeature, Metadata, Reconciliation, ReportedFeature, SyntheticFeature, Thing}, - storage::{self, UpdateError}, + storage::{self}, }; use async_trait::async_trait; use chrono::{DateTime, Utc}; use deadpool_postgres::{PoolError, Runtime}; -use std::{collections::BTreeMap, future::Future}; +use postgres_types::Type; +use std::collections::BTreeMap; use tokio_postgres::{ error::SqlState, types::{Json, ToSql}, @@ -32,6 +33,8 @@ pub struct ThingEntity { pub annotations: BTreeMap, pub data: Data, + + pub waker: Option>, } /// The persisted data field @@ -80,6 +83,8 @@ impl TryFrom for ThingEntity { labels: utils::row_to_map(&row, "LABELS")?, annotations: utils::row_to_map(&row, "ANNOTATIONS")?, data: row.try_get::<_, Json<_>>("DATA")?.0, + + waker: row.try_get("WAKER")?, }) } } @@ -114,7 +119,7 @@ impl super::Storage for Storage { type Config = Config; type Error = Error; - fn new(config: &Self::Config) -> anyhow::Result { + fn from_config(config: &Self::Config) -> anyhow::Result { let pool = config.postgres.create_pool( Some(Runtime::Tokio1), postgres_native_tls::MakeTlsConnector::new(native_tls::TlsConnector::new()?), @@ -123,13 +128,17 @@ impl super::Storage for Storage { Ok(Self { application, pool }) } - async fn get(&self, application: &str, name: &str) -> Result { - self.ensure_app(application, || storage::Error::NotFound)?; + async fn get(&self, application: &str, name: &str) -> Result> { + if let Err(storage::Error::NotFound) = + self.ensure_app(application, || storage::Error::NotFound) + { + return Ok(None); + } let con = self.pool.get().await.map_err(Error::Pool)?; - match con - .query_opt( + let stmt = con + .prepare_typed_cached( r#" SELECT UID, @@ -138,7 +147,8 @@ SELECT RESOURCE_VERSION, ANNOTATIONS, LABELS, - DATA + DATA, + WAKER FROM THINGS WHERE @@ -146,14 +156,22 @@ WHERE AND APPLICATION = $2 "#, - &[&name, &application], + &[ + Type::VARCHAR, // name + Type::VARCHAR, // application + ], ) .await + .map_err(Error::Postgres)?; + + match con + .query_opt(&stmt, &[&name, &application]) + .await .map_err(Error::Postgres)? { Some(row) => { let entity: ThingEntity = row.try_into()?; - Ok(Thing { + Ok(Some(Thing { metadata: Metadata { name: name.to_string(), application: application.to_string(), @@ -171,7 +189,7 @@ WHERE synthetic_state: entity.data.synthetic_state, reconciliation: entity.data.reconciliation, internal: entity.data.internal, - }) + })) } None => Err(storage::Error::NotFound), } @@ -192,6 +210,8 @@ WHERE thing.metadata.generation = Some(generation as u32); thing.metadata.resource_version = Some(resource_version.to_string()); + let (waker, waker_reasons) = waker_data(&thing); + log::debug!( "Creating new thing: {} / {}", thing.metadata.application, @@ -200,8 +220,9 @@ WHERE let data: Data = (&thing).into(); - con.execute( - r#" + let stmt = con + .prepare_typed_cached( + r#" INSERT INTO things ( NAME, APPLICATION, @@ -211,7 +232,9 @@ INSERT INTO things ( RESOURCE_VERSION, ANNOTATIONS, LABELS, - DATA + DATA, + WAKER, + WAKER_REASONS, ) VALUES ( $1, $2, @@ -221,9 +244,30 @@ INSERT INTO things ( $6, $7, $8, - $9 + $9, + $10, + $11 ) "#, + &[ + Type::VARCHAR, // name + Type::VARCHAR, // application + Type::UUID, // uid + Type::TIMESTAMPTZ, // creation timestamp + Type::INT8, // generation + Type::UUID, // resource version + Type::JSON, // annotations + Type::JSONB, // labels + Type::JSON, // data + Type::TIMESTAMPTZ, // waker + Type::JSONB, // waker reasons + ], + ) + .await + .map_err(Error::Postgres)?; + + con.execute( + &stmt, &[ &thing.metadata.name, &thing.metadata.application, @@ -234,6 +278,8 @@ INSERT INTO things ( &Json(&thing.metadata.annotations), &Json(&thing.metadata.labels), &Json(data), + &waker, + &Json(&waker_reasons), ], ) .await @@ -253,6 +299,8 @@ INSERT INTO things ( let name = &thing.metadata.name; let application = &thing.metadata.application; + let (waker, waker_reasons) = waker_data(&thing); + log::debug!("Updating existing thing: {application} / {name}"); let mut stmt = r#" @@ -262,7 +310,9 @@ SET RESOURCE_VERSION = $3, ANNOTATIONS = $4, LABELS = $5, - DATA = $6 + DATA = $6, + WAKER = $7, + WAKER_REASONS = $8 WHERE NAME = $1 AND @@ -274,24 +324,38 @@ WHERE let data = Json(Data::from(&thing)); let annotations = Json(&thing.metadata.annotations); let labels = Json(&thing.metadata.labels); + let waker_reasons = Json(&waker_reasons); + let mut types: Vec = Vec::new(); let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new(); + types.push(Type::VARCHAR); params.push(name); + types.push(Type::VARCHAR); params.push(application); + types.push(Type::UUID); params.push(&resource_version); + types.push(Type::JSON); params.push(&annotations); + types.push(Type::JSONB); params.push(&labels); + types.push(Type::JSON); params.push(&data); + types.push(Type::TIMESTAMPTZ); + params.push(&waker); + types.push(Type::JSONB); + params.push(&waker_reasons); if let Some(resource_version) = &thing.metadata.resource_version { stmt.push_str(&format!( " AND RESOURCE_VERSION::text=${}", params.len() + 1 )); + types.push(Type::TEXT); params.push(resource_version); } if let Some(uid) = &thing.metadata.uid { stmt.push_str(&format!(" AND UID::text=${}", params.len() + 1)); + types.push(Type::TEXT); params.push(uid); } @@ -302,6 +366,11 @@ RETURNING "#, ); + let stmt = con + .prepare_typed_cached(&stmt, &types) + .await + .map_err(Error::Postgres)?; + match con.query_opt(&stmt, ¶ms).await { Ok(None) => Err(storage::Error::PreconditionFailed), Ok(Some(row)) => { @@ -322,79 +391,42 @@ RETURNING } } - async fn patch( - &self, - application: &str, - name: &str, - f: F, - ) -> std::result::Result> - where - F: FnOnce(Thing) -> Fut + Send + Sync, - Fut: Future> + Send + Sync, - E: Send + Sync, - { - self.ensure_app(application, || storage::Error::NotFound)?; - - log::debug!("Updating existing thing: {application} / {name}"); - - let current_thing = self.get(application, name).await?; - // capture current metadata - let Metadata { - name, - application, - uid, - creation_timestamp, - resource_version, - generation, - annotations: _, - labels: _, - } = current_thing.metadata.clone(); - let mut new_thing = f(current_thing.clone()) - .await - .map_err(UpdateError::Mutator)?; - - // override metadata which must not be changed by the caller - new_thing.metadata = Metadata { - name, - application, - uid, - creation_timestamp, - resource_version, - generation, - ..new_thing.metadata - }; - - if current_thing == new_thing { - // no change - return Ok(current_thing); + async fn delete(&self, application: &str, name: &str) -> Result { + if let Err(storage::Error::NotFound) = + self.ensure_app(application, || storage::Error::NotFound) + { + return Ok(false); } - // perform update - - self.update(new_thing).await.map_err(UpdateError::Service) - } - - async fn delete(&self, application: &str, name: &str) -> Result<()> { - self.ensure_app(application, || storage::Error::NotFound)?; - let con = self.pool.get().await.map_err(Error::Pool)?; log::debug!("Deleting thing: {application} / {name}"); - con.execute( - r#" + let stmt = con + .prepare_typed_cached( + r#" DELETE FROM things WHERE NAME = $1 AND APPLICATION = $2 "#, - &[&name, &application], - ) - .await - .map_err(Error::Postgres)?; + &[ + Type::VARCHAR, // name + Type::VARCHAR, // application + ], + ) + .await + .map_err(Error::Postgres)?; - Ok(()) + // FIXME: add uid and resource version + + let rows = con + .execute(&stmt, &[&name, &application]) + .await + .map_err(Error::Postgres)?; + + Ok(rows > 0) } } @@ -412,3 +444,16 @@ impl Storage { Ok(()) } } + +fn waker_data(thing: &Thing) -> (Option>, Option>) { + // FIXME: use unzip_option once available + match thing + .internal + .as_ref() + .and_then(|i| i.waker.as_ref()) + .map(|w| (w.when, w.why.iter().map(|r| *r).collect::>())) + { + Some(w) => (Some(w.0), Some(w.1)), + None => (None, None), + } +} diff --git a/core/src/waker/mod.rs b/core/src/waker/mod.rs new file mode 100644 index 0000000..10aee3e --- /dev/null +++ b/core/src/waker/mod.rs @@ -0,0 +1,79 @@ +pub mod postgres; + +use crate::model::WakerReason; +use crate::processor::sink::Sink; +use crate::processor::{Event, Message}; +use crate::service::Id; +use async_trait::async_trait; +use std::future::Future; + +#[derive(Clone, Debug)] +pub struct TargetId { + pub id: Id, + pub uid: String, + pub resource_version: String, +} + +#[async_trait] +pub trait Waker: Sized + Send + Sync { + type Config; + + fn from_config(config: Self::Config) -> anyhow::Result; + + /// Run the waker + /// + /// The function provided must wake up the thing. It must only return ok if it was able to do so. + /// It is not necessary to direct reconcile the thing though. + async fn run(self, f: F) -> anyhow::Result<()> + where + F: Fn(TargetId, Vec) -> Fut + Send + Sync, + Fut: Future> + Send; +} + +pub struct Config { + pub waker: W::Config, + pub sink: S::Config, +} + +/// Process wakeups +pub struct Processor { + waker: W, + sink: S, +} + +impl Processor +where + W: Waker, + S: Sink, +{ + pub fn new(waker: W, sink: S) -> Self { + Self { waker, sink } + } + + pub fn from_config(config: Config) -> anyhow::Result { + Ok(Self::new( + W::from_config(config.waker)?, + S::from_config(config.sink)?, + )) + } + + pub async fn run(self) -> anyhow::Result<()> { + let sink = self.sink; + let waker = self.waker; + + waker + .run(|id, reasons| async { + sink.publish(Event::new( + id.id.application, + id.id.thing, + Message::Wakeup { reasons }, + )) + .await?; + + Ok(()) + }) + .await?; + + Ok(()) + } +} diff --git a/core/src/waker/postgres.rs b/core/src/waker/postgres.rs new file mode 100644 index 0000000..1de1cc9 --- /dev/null +++ b/core/src/waker/postgres.rs @@ -0,0 +1,286 @@ +use crate::model::WakerReason; +use crate::service::Id; +use crate::waker::TargetId; +use async_trait::async_trait; +use deadpool_postgres::{Client, Runtime}; +use postgres_types::{Json, Type}; +use std::future::Future; +use std::time::Duration; +use tokio::time::MissedTickBehavior; +use tokio_postgres::Statement; +use uuid::Uuid; + +#[derive(Clone, Debug, serde::Deserialize)] +pub struct Config { + pub application: Option, + pub postgres: deadpool_postgres::Config, + #[serde(with = "humantime_serde", default = "default::waker_delay")] + pub waker_delay: Duration, +} + +mod default { + use std::time::Duration; + + pub const fn waker_delay() -> Duration { + Duration::from_secs(1) + } +} + +pub struct Waker { + application: Option, + pool: deadpool_postgres::Pool, + // waker delay in ISO 8609 duration format + waker_delay: String, +} + +#[async_trait] +impl super::Waker for Waker { + type Config = Config; + + fn from_config(config: Self::Config) -> anyhow::Result { + let pool = config.postgres.create_pool( + Some(Runtime::Tokio1), + postgres_native_tls::MakeTlsConnector::new(native_tls::TlsConnector::new()?), + )?; + + Ok(Self { + pool, + application: config.application, + waker_delay: chrono::Duration::from_std(config.waker_delay)?.to_string(), + }) + } + + async fn run(self, f: F) -> anyhow::Result<()> + where + F: Fn(TargetId, Vec) -> Fut + Send + Sync, + Fut: Future> + Send, + { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + let stmt = self.build_statement(); + + loop { + interval.tick().await; + log::debug!("Ticking ..."); + + match self.pool.get().await { + Err(err) => { + // FIXME: map to liveness status + log::warn!("Failed to prepare for tick: {err}"); + } + Ok(con) => { + if let Err(err) = + WakerRun::new(con, &stmt, &self.application, &self.waker_delay, &f) + .run() + .await + { + // FIXME: map to liveness status + log::warn!("Failed to tick: {err}"); + } + } + } + } + } +} + +impl Waker { + fn build_statement(&self) -> (String, Vec) { + let mut types = vec![Type::VARCHAR]; + + let and_application = match self.application.is_some() { + true => { + types.push(Type::VARCHAR); + r#" + AND + APPLICATION = $2 +"# + } + false => "", + }; + + // We retrieve the next thing that is due for being woken. We fetch only one, and push + // its due time by a short interval past now. That way others would not see it right away. + // The reconciliation will be processed and should update the waker too. + // + // NOTE: If we plan on scheduling the wakeup through something that buffers (like Kafka) + // then it would not work to just delay for a short amount of time, as we don't know when + // the wakeup would be executed. In this case, we would need to send out the event, and then + // clear the waker. + + let stmt = format!( + r#" +UPDATE + things +SET + WAKER = NOW() + $1::interval +WHERE + UID = ( + SELECT + UID + FROM + things + WHERE + WAKER <= NOW() +{and_application} + + ORDER BY + WAKER ASC + + LIMIT 1 + FOR UPDATE SKIP LOCKED + + ) + +RETURNING + APPLICATION, NAME, UID, RESOURCE_VERSION, WAKER_REASONS +"# + ); + + (stmt, types) + } +} + +struct WakerRun<'r, F, Fut> +where + F: Fn(TargetId, Vec) -> Fut + Send, + Fut: Future> + Send, +{ + con: Client, + stmt: &'r (String, Vec), + application: &'r Option, + // the amount of time to delay the waker by when processing + waker_delay: &'r str, + f: &'r F, +} + +impl<'r, F, Fut> WakerRun<'r, F, Fut> +where + F: Fn(TargetId, Vec) -> Fut + Send, + Fut: Future> + Send, +{ + fn new( + con: Client, + stmt: &'r (String, Vec), + application: &'r Option, + waker_delay: &'r str, + f: &'r F, + ) -> Self { + Self { + con, + stmt, + application, + waker_delay, + f, + } + } + + async fn run(self) -> anyhow::Result<()> { + let stmt = self + .con + .prepare_typed_cached(&self.stmt.0, &self.stmt.1) + .await?; + + loop { + if !self.tick_next(&stmt).await? { + log::debug!("No more for this time"); + break; + } + } + + Ok(()) + } + + async fn tick_next(&self, stmt: &Statement) -> anyhow::Result { + let row = match &self.application { + Some(application) => { + self.con + .query_opt(stmt, &[&self.waker_delay, application]) + .await + } + None => self.con.query_opt(stmt, &[&self.waker_delay]).await, + }?; + + Ok(match row { + Some(row) => { + let application: String = row.try_get("APPLICATION")?; + let thing: String = row.try_get("NAME")?; + let uid: Uuid = row.try_get("UID")?; + let resource_version: Uuid = row.try_get("RESOURCE_VERSION")?; + let reasons: Vec = + row.try_get::<_, Json>>("WAKER_REASONS")?.0; + + // send wakeup + + log::debug!("Wakeup: {application} / {thing} / {uid}"); + (self.f)( + TargetId { + id: Id { + application: application.clone(), + thing: thing.clone(), + }, + uid: uid.to_string(), + resource_version: resource_version.to_string(), + }, + reasons, + ) + .await?; + + // clear waker + + self.clear_waker(application, thing, uid, resource_version) + .await?; + + // done with this entry + + true + } + None => false, + }) + } + + async fn clear_waker( + &self, + + application: String, + thing: String, + uid: Uuid, + resource_version: Uuid, + ) -> anyhow::Result<()> { + // we try to clear the waker. It might already be cleared due to the fact that we woke + // that thing up. But it might also be that the wakeup event is still queued, so we don't + // want to wake it up too often. + + let stmt = self + .con + .prepare_typed_cached( + r#" +UPDATE + things +SET + WAKER = NULL, WAKER_REASONS = NULL +WHERE + APPLICATION = $1 + AND + NAME = $2 + AND + UID = $3 + AND + RESOURCE_VERSION = $4 +"#, + &[Type::VARCHAR, Type::VARCHAR, Type::UUID, Type::UUID], + ) + .await?; + + let result = self + .con + .execute(&stmt, &[&application, &thing, &uid, &resource_version]) + .await?; + + if result == 0 { + log::debug!("Lost oplock clearing waker. Don't worry.") + } + + Ok(()) + } +} diff --git a/core/tests/processor.rs b/core/tests/processor.rs new file mode 100644 index 0000000..62a5b55 --- /dev/null +++ b/core/tests/processor.rs @@ -0,0 +1,63 @@ +mod setup; + +use crate::setup::{setup, RunningContext}; +use drogue_doppelgaenger_core::model::Thing; +use drogue_doppelgaenger_core::processor::{Event, Message}; +use drogue_doppelgaenger_core::service::Id; +use serde_json::json; + +#[tokio::test] +async fn test_process() { + let RunningContext { + service, + notifier, + runner, + .. + } = setup().run(); + + let id = Id::new("default", "thing1"); + let thing = service.create(Thing::with_id(&id)).await.unwrap(); + + assert_eq!(notifier.drain().await, vec![thing.clone()]); + + // run a change + + runner + .send_wait(Event::new( + "default", + "thing1", + Message::report_state().state("foo", "bar").build(), + )) + .await + .unwrap(); + + let thing = service.get(&id).await.unwrap().expect("Thing to be found"); + + assert_eq!(notifier.drain().await, vec![thing.clone()]); + + assert_eq!(thing.metadata.generation, Some(2)); + assert_eq!(thing.reported_state.len(), 1); + assert_eq!(thing.reported_state.get("foo").unwrap().value, json!("bar")); + + // run another change, that doesn't change + + runner + .send_wait(Event::new( + "default", + "thing1", + Message::report_state().state("foo", "bar").build(), + )) + .await + .unwrap(); + + let thing = service.get(&id).await.unwrap().expect("Thing to be found"); + + assert_eq!(notifier.drain().await, vec![]); + + assert_eq!(thing.metadata.generation, Some(2)); + assert_eq!(thing.reported_state.len(), 1); + assert_eq!(thing.reported_state.get("foo").unwrap().value, json!("bar")); + + // shutdown runner + runner.shutdown().await.unwrap(); +} diff --git a/core/tests/service.rs b/core/tests/service.rs new file mode 100644 index 0000000..2670f66 --- /dev/null +++ b/core/tests/service.rs @@ -0,0 +1,164 @@ +mod setup; + +use crate::setup::Context; +use drogue_doppelgaenger_core::model::{Metadata, Thing}; +use std::collections::BTreeMap; + +#[tokio::test] +async fn basic() { + let Context { + service, notifier, .. + } = setup::setup(); + + service + .create(Thing::new("default", "thing1")) + .await + .unwrap(); + let thing = service + .get(&("default", "thing1").into()) + .await + .unwrap() + .unwrap(); + + assert_eq!(thing.metadata.application, "default"); + assert_eq!(thing.metadata.name, "thing1"); + + assert_eq!(notifier.drain().await, vec![thing]); +} + +#[tokio::test] +async fn delete() { + let Context { + service, notifier, .. + } = setup::setup(); + + service + .create(Thing::new("default", "thing1")) + .await + .unwrap(); + + let id = ("default", "thing1").into(); + + let thing = service.get(&id).await.unwrap().unwrap(); + + assert_eq!(thing.metadata.application, "default"); + assert_eq!(thing.metadata.name, "thing1"); + + assert_eq!(notifier.drain().await, vec![thing]); + + let found = service.delete(&id).await.unwrap(); + assert_eq!(found, true); + let found = service.delete(&id).await.unwrap(); + assert_eq!(found, false); +} + +#[tokio::test] +async fn update() { + let Context { + service, notifier, .. + } = setup::setup(); + + service + .create(Thing::new("default", "thing1")) + .await + .unwrap(); + + let id = ("default", "thing1").into(); + + let thing = service.get(&id).await.unwrap().unwrap(); + + assert_eq!(thing.metadata.application, "default"); + assert_eq!(thing.metadata.name, "thing1"); + + assert_eq!(notifier.drain().await, vec![thing.clone()]); + + let thing_1 = Thing { + metadata: Metadata { + // try change immutable fields + application: "something".to_string(), + name: "thing2".to_string(), + annotations: { + // make some change + let mut annotations = BTreeMap::new(); + annotations.insert("foo".to_string(), "bar".to_string()); + annotations + }, + ..thing.metadata.clone() + }, + ..thing.clone() + }; + + let thing_1 = service.update(&id, thing_1).await.unwrap(); + + assert_eq!(notifier.drain().await, vec![thing_1.clone()]); + + // immutable metadata must not change + assert_eq!(thing_1.metadata.application, "default"); + assert_eq!(thing_1.metadata.name, "thing1"); + assert_eq!(thing_1.metadata.uid, thing.metadata.uid); + assert_eq!( + thing_1.metadata.creation_timestamp, + thing.metadata.creation_timestamp + ); + assert_eq!( + thing_1.metadata.generation, + thing.metadata.generation.map(|g| g + 1) + ); + assert_ne!( + thing_1.metadata.resource_version, + thing.metadata.resource_version + ); + + // fetching again must return the same result + + let thing_1 = service + .get(&id) + .await + .unwrap() + .expect("Thing must be found"); + + // immutable metadata must not change + assert_eq!(thing_1.metadata.application, "default"); + assert_eq!(thing_1.metadata.name, "thing1"); + assert_eq!(thing_1.metadata.uid, thing.metadata.uid); + assert_eq!( + thing_1.metadata.creation_timestamp, + thing.metadata.creation_timestamp + ); + assert_eq!( + thing_1.metadata.generation, + thing.metadata.generation.map(|g| g + 1) + ); + assert_ne!( + thing_1.metadata.resource_version, + thing.metadata.resource_version + ); +} + +/// Testing the case that a change isn't a change. +#[tokio::test] +async fn update_no_change() { + let Context { + service, notifier, .. + } = setup::setup(); + + service + .create(Thing::new("default", "thing1")) + .await + .unwrap(); + + let id = ("default", "thing1").into(); + + let thing = service.get(&id).await.unwrap().unwrap(); + + assert_eq!(thing.metadata.application, "default"); + assert_eq!(thing.metadata.name, "thing1"); + + assert_eq!(notifier.drain().await, vec![thing.clone()]); + + let thing_1 = service.update(&id, thing.clone()).await.unwrap(); + + assert_eq!(notifier.drain().await, vec![]); + + assert_eq!(thing_1, thing); +} diff --git a/core/tests/setup.rs b/core/tests/setup.rs new file mode 100644 index 0000000..72ea3b9 --- /dev/null +++ b/core/tests/setup.rs @@ -0,0 +1,505 @@ +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use drogue_doppelgaenger_core::model::WakerReason; +use drogue_doppelgaenger_core::service::Id; +use drogue_doppelgaenger_core::waker::{TargetId, Waker}; +use drogue_doppelgaenger_core::{ + model::Thing, + notifier::Notifier, + processor::{sink::Sink, source::Source, Event, Processor}, + service::Service, + storage::{Error, Storage}, + waker, +}; +use std::collections::{btree_map::Entry, BTreeMap, HashMap}; +use std::convert::Infallible; +use std::future::Future; +use std::ops::Deref; +use std::sync::Arc; +use std::time::Duration; +use tokio::runtime::Handle; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::{oneshot, Mutex, RwLock}; +use tokio::task::JoinHandle; +use uuid::Uuid; + +#[derive(Clone)] +pub struct MockStorage { + pub application: String, + pub things: Arc>>, + waker: MockWaker, +} + +impl MockStorage { + pub fn new>(application: A, waker: MockWaker) -> Self { + Self { + application: application.into(), + things: Default::default(), + waker, + } + } +} + +#[async_trait] +impl Storage for MockStorage { + type Config = (); + type Error = Infallible; + + fn from_config(_: &Self::Config) -> anyhow::Result { + panic!("Not supported") + } + + async fn get( + &self, + application: &str, + name: &str, + ) -> Result, Error> { + if application != self.application { + return Ok(None); + } + + return Ok(self.things.read().await.get(name).cloned()); + } + + async fn create(&self, mut thing: Thing) -> Result> { + if thing.metadata.application != self.application { + return Err(Error::NotAllowed); + } + + // fake metadata + + thing.metadata.creation_timestamp = Some(Utc::now()); + thing.metadata.uid = Some(Uuid::new_v4().to_string()); + thing.metadata.resource_version = Some(Uuid::new_v4().to_string()); + thing.metadata.generation = Some(1); + + // store + + let mut lock = self.things.write().await; + lock.insert(thing.metadata.name.clone(), thing.clone()); + + // while still holding the lock + + self.waker.update(&thing).await; + + // done + + Ok(thing) + } + + async fn update(&self, mut thing: Thing) -> Result> { + if thing.metadata.application != self.application { + return Err(Error::NotAllowed); + } + + let mut things = self.things.write().await; + + let result = match things.entry(thing.metadata.name.clone()) { + Entry::Occupied(mut entry) => { + // immutable fields + thing.metadata.creation_timestamp = entry.get().metadata.creation_timestamp; + thing.metadata.uid = entry.get().metadata.uid.clone(); + + // updated by storage + thing.metadata.resource_version = Some(Uuid::new_v4().to_string()); + thing.metadata.generation = + Some(entry.get().metadata.generation.unwrap_or_default() + 1); + + // store + entry.insert(thing.clone()); + + // while still holding the lock + self.waker.update(&thing).await; + + // return + Ok(thing) + } + Entry::Vacant(_) => Err(Error::NotFound), + }; + + result + } + + async fn delete(&self, application: &str, name: &str) -> Result> { + if application != self.application { + return Ok(false); + } + + Ok(self.things.write().await.remove(name).is_some()) + } +} + +#[derive(Clone)] +pub struct MockNotifier { + pub events: Arc>>, +} + +impl MockNotifier { + pub fn new() -> Self { + Self { + events: Default::default(), + } + } + + pub async fn drain(&self) -> Vec { + let mut lock = self.events.write().await; + lock.drain(..).collect() + } +} + +#[async_trait] +impl Notifier for MockNotifier { + type Config = (); + type Error = Infallible; + + fn from_config(_: &Self::Config) -> anyhow::Result { + Ok(Self::new()) + } + + async fn notify( + &self, + thing: &Thing, + ) -> Result<(), drogue_doppelgaenger_core::notifier::Error> { + self.events.write().await.push(thing.clone()); + Ok(()) + } +} + +#[derive(Debug)] +struct Item { + event: Event, + notifier: oneshot::Sender<()>, +} + +#[derive(Clone)] +pub struct MockSource { + tx: Sender, + rx: Arc>>>, +} + +impl MockSource { + pub fn new() -> Self { + let (tx, rx) = channel(100); + Self { + tx, + rx: Arc::new(Mutex::new(Some(rx))), + } + } +} + +#[async_trait] +impl Source for MockSource { + type Config = (); + + fn from_config(_: Self::Config) -> anyhow::Result { + panic!("Not supported") + } + + async fn run(self, f: F) -> anyhow::Result<()> + where + F: Fn(Event) -> Fut + Send + Sync, + Fut: Future> + Send, + { + // drop our own sender, so that we don't keep the loop running + drop(self.tx); + + let mut rx = self + .rx + .lock() + .await + .take() + .expect("Mock source can only be started once"); + + // just forward events + while let Some(item) = rx.recv().await { + f(item.event).await?; + let _ = item.notifier.send(()); + } + + // when all other senders close, we return + + Ok(()) + } +} + +#[derive(Clone)] +pub struct MockSink { + pub events: Arc>>, + tx: Sender, + rx: Arc>>>, +} + +impl MockSink { + pub fn new() -> Self { + let (tx, rx) = channel(100); + Self { + events: Arc::default(), + tx, + rx: Arc::new(Mutex::new(Some(rx))), + } + } + + async fn take_receiver(&self) -> Option> { + self.rx.lock().await.take() + } +} + +#[async_trait] +impl Sink for MockSink { + type Config = (); + + fn from_config(_: Self::Config) -> anyhow::Result { + panic!("Not supported") + } + + async fn publish(&self, event: Event) -> anyhow::Result<()> { + self.tx.send(event.clone()).await?; + self.events.write().await.push(event); + Ok(()) + } +} + +pub struct Context { + pub sink: MockSink, + pub source: MockSourceFeeder, + pub notifier: MockNotifier, + pub service: Service, + pub processor: Processor, + pub waker: waker::Processor, +} + +impl Context { + /// Run the context. + /// + /// When the `feed_events` parameter is true, events sent to the sink will automatically be fed + /// into the source again. + pub fn run(self, feed_events: bool) -> RunningContext { + let processor = self.processor; + let waker = self.waker; + + let feed_runner = if feed_events { + // forward events from sink to source + let sink = self.sink.clone(); + let source = self.source.clone(); + Some(Handle::current().spawn(async move { + let mut rx = sink.take_receiver().await.expect("Must have receiver"); + while let Some(event) = rx.recv().await { + source.send(event).await.unwrap(); + } + })) + } else { + None + }; + + RunningContext { + sink: self.sink, + notifier: self.notifier, + service: self.service, + runner: ContextRunner { + source: self.source, + processor_runner: Handle::current().spawn(async move { processor.run().await }), + waker_runner: Handle::current().spawn(async move { waker.run().await }), + feed_runner, + }, + } + } +} + +pub struct ContextRunner { + source: MockSourceFeeder, + processor_runner: JoinHandle>, + waker_runner: JoinHandle>, + feed_runner: Option>, +} + +impl ContextRunner { + /// safely shut down a running context + pub async fn shutdown(mut self) -> anyhow::Result<()> { + // abort the feed runner if we have one + if let Some(feed_runner) = self.feed_runner.take() { + feed_runner.abort(); + } + + // we abort the waker + self.waker_runner.abort(); + + // drop the source to shut down the loop + drop(self.source); + // await the end of the loop + self.processor_runner.await??; + + // done + Ok(()) + } +} + +impl Deref for ContextRunner { + type Target = MockSourceFeeder; + + fn deref(&self) -> &Self::Target { + &self.source + } +} + +pub struct RunningContext { + pub sink: MockSink, + pub notifier: MockNotifier, + pub service: Service, + pub runner: ContextRunner, +} + +#[derive(Clone)] +pub struct MockSourceFeeder { + source: MockSource, +} + +impl MockSourceFeeder { + /// Send an event, and wait until it is processed by the processor. + /// + /// NOTE: This will block indefinitely, if the processor is not running! + pub async fn send_wait(&self, event: Event) -> anyhow::Result<()> { + self.send(event).await?; + Ok(()) + } + + pub async fn send(&self, event: Event) -> anyhow::Result> { + let (tx, rx) = oneshot::channel(); + let item = Item { + event, + notifier: tx, + }; + + self.source.tx.send(item).await?; + + Ok(rx) + } +} + +#[derive(Clone, Default)] +pub struct MockWaker { + wakers: Arc, Vec, TargetId)>>>, +} + +impl MockWaker { + pub fn new() -> Self { + Self { + wakers: Default::default(), + } + } + + pub async fn update(&self, thing: &Thing) { + let mut lock = self.wakers.lock().await; + + match thing.internal.as_ref().and_then(|i| i.waker.as_ref()) { + Some(waker) => { + let id = TargetId { + id: Id::new( + thing.metadata.application.clone(), + thing.metadata.name.clone(), + ), + resource_version: thing + .metadata + .resource_version + .as_ref() + .cloned() + .unwrap_or_default(), + uid: thing.metadata.uid.as_ref().cloned().unwrap_or_default(), + }; + lock.insert( + thing.metadata.name.clone(), + ( + waker.when, + waker.why.iter().map(|r| *r).collect::>(), + id, + ), + ); + } + None => { + lock.remove(&thing.metadata.name); + } + } + } +} + +#[async_trait] +impl Waker for MockWaker { + type Config = (); + + fn from_config(_: Self::Config) -> anyhow::Result { + panic!("Not supported"); + } + + async fn run(self, f: F) -> anyhow::Result<()> + where + F: Fn(TargetId, Vec) -> Fut + Send + Sync, + Fut: Future> + Send, + { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + loop { + interval.tick().await; + + log::debug!("MockWacker - ticking"); + + // this is a bit messy, might be improved with drain_filter + + let expired = { + let mut lock = self.wakers.lock().await; + + let mut expired_keys = Vec::new(); + for (k, v) in lock.iter() { + if v.0 < Utc::now() { + expired_keys.push(k.clone()); + } + } + + let mut expired = Vec::new(); + + for k in expired_keys { + if let Some(v) = lock.remove(&k) { + expired.push(v); + } + } + + expired + }; + + // call back outside the lock + + for v in expired { + log::info!("Mock waker for: {v:?}"); + let _ = f(v.2, v.1).await; + } + } + } +} + +pub fn setup() -> Context { + let _ = env_logger::builder().is_test(true).try_init(); + + let sink = MockSink::new(); + let source = MockSource::new(); + let notifier = MockNotifier::new(); + + let waker = MockWaker::new(); + let storage = MockStorage::new("default", waker.clone()); + + let processor = Processor::new( + Service::new(storage.clone(), notifier.clone(), sink.clone()), + source.clone(), + ); + + let waker = waker::Processor::new(waker, sink.clone()); + + let service = Service::new(storage.clone(), notifier.clone(), sink.clone()); + + let source = MockSourceFeeder { source }; + + Context { + sink, + service, + processor, + notifier, + source, + waker, + } +} diff --git a/core/tests/waker.rs b/core/tests/waker.rs new file mode 100644 index 0000000..1c62a7f --- /dev/null +++ b/core/tests/waker.rs @@ -0,0 +1,102 @@ +mod setup; + +use crate::setup::{setup, RunningContext}; +use chrono::Utc; +use drogue_doppelgaenger_core::model::{Code, Reconciliation, Thing, WakerReason}; +use drogue_doppelgaenger_core::service::Id; +use serde_json::json; +use std::collections::{BTreeMap, BTreeSet}; +use std::time::Duration; + +#[tokio::test] +async fn test_process() { + let RunningContext { + service, + notifier, + runner, + .. + } = setup().run(true); + + let id = Id::new("default", "thing1"); + let thing = service + .create(Thing { + reconciliation: Reconciliation { + changed: { + let mut changed = BTreeMap::new(); + changed.insert( + "change".to_string(), + Code::Script( + r#" +function wakeup(when) { + waker = when; +} + +if (newState.reportedState?.["foo"] === undefined) { + if (newState.metadata.annotations?.["test"] === "true") { + newState.reportedState = {}; + newState.reportedState["foo"] = { + value: "bar", + lastUpdate: new Date().toISOString(), + } + } else { + newState.metadata.annotations = {"test": "true"}; + wakeup("5s"); + } +} + +"# + .to_string(), + ) + .into(), + ); + changed + }, + }, + ..Thing::with_id(&id) + }) + .await + .unwrap(); + + let wakeup = thing.internal.unwrap().waker.unwrap(); + assert_eq!(wakeup.why, BTreeSet::from([WakerReason::Reconcile])); + + assert!(wakeup.when > Utc::now()); + + // it should also have out annotation + + assert_eq!( + thing.metadata.annotations.get("test").map(|s| s.as_str()), + Some("true"), + ); + + // wait until the waker should have processed + + tokio::time::sleep_until(tokio::time::Instant::now() + Duration::from_secs(7)).await; + + // check again + + let thing = service.get(&id).await.unwrap().unwrap(); + + assert_eq!( + thing + .internal + .as_ref() + .unwrap() + .waker + .as_ref() + .unwrap() + .why + .clone() + .into_iter() + .collect::>(), + &[WakerReason::Reconcile] + ); + + assert_eq!(thing.reported_state.get("foo").unwrap().value, json!("bar")); + + // there are two events, one for the created and a second one for the timer + assert_eq!(notifier.drain().await.len(), 2); + + // shutdown runner + runner.shutdown().await.unwrap(); +} diff --git a/database-migration/migrations/00000000000000_init/up.sql b/database-migration/migrations/00000000000000_init/up.sql index 8b82f23..6b3898d 100644 --- a/database-migration/migrations/00000000000000_init/up.sql +++ b/database-migration/migrations/00000000000000_init/up.sql @@ -16,6 +16,10 @@ CREATE TABLE things ( -- data DATA JSON, + -- internal state (for easier access) + WAKER TIMESTAMP WITH TIME ZONE NULL, + WAKER_REASONS JSONB NULL, + -- constraints PRIMARY KEY (NAME, APPLICATION) ); diff --git a/develop/CHEATSHEET.adoc b/develop/CHEATSHEET.adoc new file mode 100644 index 0000000..861981d --- /dev/null +++ b/develop/CHEATSHEET.adoc @@ -0,0 +1,15 @@ += Cheatsheet + +== Connect to local postgres + +[source,shell] +---- +env PGPASSWORD=admin123456 psql -h localhost -U admin drogue +---- + +== Update postgres directly + +[source,sql] +---- +-- update things set data = jsonb_set(to_jsonb(data), '{internal, waker}', NULL); +---- \ No newline at end of file diff --git a/develop/docker-compose.yml b/develop/docker-compose.yml new file mode 100644 index 0000000..2e5b0f1 --- /dev/null +++ b/develop/docker-compose.yml @@ -0,0 +1,38 @@ +version: "3.9" +services: + + postgres: + image: docker.io/bitnami/postgresql:14 + environment: + - POSTGRESQL_USERNAME=admin + - POSTGRESQL_PASSWORD=admin123456 + - POSTGRESQL_DATABASE=drogue + ports: + - "5432:5432" + healthcheck: + test: ["CMD", "pg_isready"] + interval: 10s + timeout: 5s + retries: 5 + + kafka: + image: docker.io/moeenz/docker-kafka-kraft:latest + ports: + - "9092:9092" + environment: + - KRAFT_CONTAINER_HOST_NAME=localhost + + keycloak: + image: quay.io/keycloak/keycloak:18.0.2 + command: start-dev + environment: + - KEYCLOAK_ADMIN=admin + - KEYCLOAK_ADMIN_PASSWORD=admin123456 + - KEYCLOAK_DB=dev-file + ports: + - "8080:8080" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8080"] + interval: 10s + timeout: 5s + retries: 5 diff --git a/examples/10_basic.adoc b/examples/10_basic.adoc index ae681f3..02e75c6 100644 --- a/examples/10_basic.adoc +++ b/examples/10_basic.adoc @@ -4,7 +4,7 @@ [source,shell] ---- -http POST localhost:8080/api/v1alpha1/things metadata:='{"name": "foo", "application":"default"}' +http POST localhost:8080/api/v1alpha1/things metadata:='{"name": "foo", "application": "default"}' ---- == Delete a thing @@ -25,26 +25,26 @@ http GET localhost:8080/api/v1alpha1/things/default/things/foo [source,shell] ---- -http PUT localhost:8080/api/v1alpha1/things metadata:='{"name": "foo", "application":"default"}' +http PUT localhost:8080/api/v1alpha1/things metadata:='{"name": "foo", "application": "default"}' ---- == Update reported state [source,shell] ---- -http PUT localhost:8080/api/v1alpha1/things/default/things/foo/reportedState temperature:=42 +http PUT localhost:8080/api/v1alpha1/things/default/things/foo/reportedStates temperature:=42 ---- == Patch a thing [source,shell] ---- -http PATCH localhost:8080/api/v1alpha1/things/default/things/foo '[0][op]=replace' '[0][path]=/reportedState/temperature/value' '[0][value]=43' +http PATCH localhost:8080/api/v1alpha1/things/default/things/foo content-type:application/json-patch+json '[0][op]=replace' '[0][path]=/reportedState/temperature/value' '[0][value]=43' ---- === Remove an annotation [source,shell] ---- -http PATCH localhost:8080/api/v1alpha1/things/default/things/foo '[0][op]=remove' '[0][path]=/metadata/annotations/condition~1FoverTemp' +http PATCH localhost:8080/api/v1alpha1/things/default/things/foo content-type:application/json-patch+json '[0][op]=remove' '[0][path]=/metadata/annotations/condition~1overTemp' ---- diff --git a/examples/20_reconcile.adoc b/examples/20_reconcile.adoc index 0cfc55f..edce076 100644 --- a/examples/20_reconcile.adoc +++ b/examples/20_reconcile.adoc @@ -10,14 +10,14 @@ include::20_reconcile/recon1.js[] [source,shell] ---- -http PUT localhost:8080/api/v1alpha1/things/default/things/foo/reconciliation 'changed[overTemp][script]=@20_reconcile/recon1.js' +http PUT localhost:8080/api/v1alpha1/things/default/things/foo/reconciliations 'changed[overTemp][script]=@20_reconcile/recon1.js' ---- Set over temp value: [source,shell] ---- -http PUT localhost:8080/api/v1alpha1/things/default/things/foo/reportedState temperature:=62 +http PUT localhost:8080/api/v1alpha1/things/default/things/foo/reportedStates temperature:=62 ---- Reset with a value below `60`. @@ -37,5 +37,5 @@ include::20_reconcile/recon2.js[] [source,shell] ---- -http PUT localhost:8080/api/v1alpha1/things/default/things/foo/reconciliation 'changed[overTempGroup][script]=@20_reconcile/recon2.js' +http PUT localhost:8080/api/v1alpha1/things/default/things/foo/reconciliations 'changed[overTempGroup][script]=@20_reconcile/recon2.js' ---- \ No newline at end of file diff --git a/examples/20_reconcile/recon2.js b/examples/20_reconcile/recon2.js index e731fc6..26e7627 100644 --- a/examples/20_reconcile/recon2.js +++ b/examples/20_reconcile/recon2.js @@ -1,3 +1,27 @@ +const WARNING_THRESHOLD = 20; +const ALARM_THRESHOLD = 28; +const PROPERTY = "temp"; + +function updateLabel(key, value) { + if (value !== undefined) { + if (newState.metadata.labels === undefined) { + newState.metadata.labels = {}; + } + newState.metadata.labels[key] = value; + } else { + if (newState.metadata.labels !== undefined) { + delete newState.metadata.labels[key]; + } + } +} + +function flagLabel(key, state) { + updateLabel(key, state ? "" : undefined) +} + +// check over temp +flagLabel("highTemp", newState?.reportedState?.[PROPERTY]?.value > WARNING_THRESHOLD); +flagLabel("overTemp", newState?.reportedState?.[PROPERTY]?.value > ALARM_THRESHOLD); function log(text) { //logs.push(text) @@ -116,8 +140,8 @@ renameReportedState("geoloc", "location"); //log(`Post(renameReportedState): ${JSON.stringify(newState, null, 2)}`); -whenConditionChanged("overTemp", "temp", (value) => { - return value > 20; +whenConditionChanged("overTemp", PROPERTY, (value) => { + return value > ALARM_THRESHOLD; }, (condition) => { log(`Condition change: ${condition}`); if (condition) { @@ -127,4 +151,5 @@ whenConditionChanged("overTemp", "temp", (value) => { } }); + //log(`Post(whenConditionChanged): ${JSON.stringify(newState, null, 2)}`); diff --git a/examples/30_notifications.adoc b/examples/30_notifications.adoc index a5f0023..56709f2 100644 --- a/examples/30_notifications.adoc +++ b/examples/30_notifications.adoc @@ -17,5 +17,5 @@ [source,shell] ---- -echo '{"type": "subscribe", "id": {"application": "default", "thing": "foo"}}' | websocat ws://localhost:8080/api/v1alpha1/things/default/notifications +websocat ws://localhost:8080/api/v1alpha1/things/default/things/foo/notifications ---- diff --git a/examples/30_notifications/index.html b/examples/30_notifications/index.html index 53a6d79..8b95712 100644 --- a/examples/30_notifications/index.html +++ b/examples/30_notifications/index.html @@ -58,8 +58,8 @@
  • -
    Devices
    -
    +
    Devices
    +