diff --git a/core/src/machine/mod.rs b/core/src/machine/mod.rs index 834a350..26b384d 100644 --- a/core/src/machine/mod.rs +++ b/core/src/machine/mod.rs @@ -182,9 +182,11 @@ async fn reconcile(current_thing: Arc, mut new_thing: Thing) -> Result = Vec::new(); let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new(); @@ -342,8 +336,6 @@ WHERE params.push(&data); types.push(Type::TIMESTAMPTZ); params.push(&waker); - types.push(Type::JSONB); - params.push(&waker_reasons); if let Some(resource_version) = &thing.metadata.resource_version { stmt.push_str(&format!( @@ -445,15 +437,10 @@ impl Storage { } } -fn waker_data(thing: &Thing) -> (Option>, Option>) { - // FIXME: use unzip_option once available - match thing +fn waker_data(thing: &Thing) -> Option> { + thing .internal .as_ref() .and_then(|i| i.waker.as_ref()) - .map(|w| (w.when, w.why.iter().map(|r| *r).collect::>())) - { - Some(w) => (Some(w.0), Some(w.1)), - None => (None, None), - } + .map(|w| w.when) } diff --git a/core/src/waker/postgres.rs b/core/src/waker/postgres.rs index 34e6550..9cd70ef 100644 --- a/core/src/waker/postgres.rs +++ b/core/src/waker/postgres.rs @@ -1,7 +1,8 @@ use crate::model::WakerReason; use crate::service::Id; +use crate::storage::postgres::Data; use crate::waker::TargetId; -use anyhow::{anyhow, bail}; +use anyhow::bail; use async_trait::async_trait; use deadpool_postgres::{Client, Runtime, Transaction}; use postgres_types::{Json, Type}; @@ -94,7 +95,7 @@ SELECT NAME, UID, RESOURCE_VERSION, - WAKER_REASONS + DATA FROM things @@ -178,8 +179,15 @@ where let thing: String = row.try_get("NAME")?; let uid: Uuid = row.try_get("UID")?; let resource_version: Uuid = row.try_get("RESOURCE_VERSION")?; - let reasons: Vec = - row.try_get::<_, Json>>("WAKER_REASONS")?.0; + let data = row.try_get::<_, Json>("DATA")?.0; + + let reasons = data + .internal + .as_ref() + .and_then(|i| i.waker.as_ref()) + .map(|w| &w.why) + .map(|r| r.iter().map(|r| *r).collect::>()) + .unwrap_or_default(); // send wakeup @@ -199,7 +207,7 @@ where // clear waker - Self::clear_waker(tx, application, thing, uid, resource_version).await?; + Self::clear_waker(tx, application, thing, uid, resource_version, data).await?; // done with this entry @@ -215,32 +223,49 @@ where thing: String, uid: Uuid, resource_version: Uuid, + mut data: Data, ) -> anyhow::Result<()> { // we clear the waker and commit the transaction. The oplock should hold, as we have locked // the record. + if let Some(internal) = &mut data.internal { + internal.waker = None; + } + let stmt = tx .prepare_typed_cached( r#" UPDATE things SET - WAKER = NULL, WAKER_REASONS = NULL + WAKER = NULL, + DATA = $1 WHERE - APPLICATION = $1 + APPLICATION = $2 AND - NAME = $2 + NAME = $3 AND - UID = $3 + UID = $4 AND - RESOURCE_VERSION = $4 + RESOURCE_VERSION = $5 "#, - &[Type::VARCHAR, Type::VARCHAR, Type::UUID, Type::UUID], + &[ + Type::JSON, + Type::VARCHAR, + Type::VARCHAR, + Type::UUID, + Type::UUID, + ], ) .await?; + let data = Json(&data); + let result = tx - .execute(&stmt, &[&application, &thing, &uid, &resource_version]) + .execute( + &stmt, + &[&data, &application, &thing, &uid, &resource_version], + ) .await?; if result == 0 { diff --git a/database-migration/migrations/00000000000000_init/up.sql b/database-migration/migrations/00000000000000_init/up.sql index 6b3898d..f1e19d8 100644 --- a/database-migration/migrations/00000000000000_init/up.sql +++ b/database-migration/migrations/00000000000000_init/up.sql @@ -18,7 +18,6 @@ CREATE TABLE things ( -- internal state (for easier access) WAKER TIMESTAMP WITH TIME ZONE NULL, - WAKER_REASONS JSONB NULL, -- constraints PRIMARY KEY (NAME, APPLICATION)