Skip to content

Commit

Permalink
fix: reset the timer after waking up the thing
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Aug 17, 2022
1 parent cac30c6 commit 2e24440
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 35 deletions.
2 changes: 2 additions & 0 deletions core/src/machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,11 @@ async fn reconcile(current_thing: Arc<Thing>, mut new_thing: Thing) -> Result<Ou
// FIXME: record error (if any)

new_thing = outgoing.new_thing;
// record the log
if let Some(rec) = new_thing.reconciliation.changed.get_mut(&name) {
rec.last_log = outgoing.log;
}
// schedule the waker
if let Some(duration) = outgoing.waker {
new_thing.wakeup(duration, WakerReason::Reconcile);
}
Expand Down
31 changes: 9 additions & 22 deletions core/src/storage/postgres/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod utils;

use crate::model::{Internal, Schema, WakerReason};
use crate::model::{Internal, Schema};
use crate::{
model::{DesiredFeature, Metadata, Reconciliation, ReportedFeature, SyntheticFeature, Thing},
storage::{self},
Expand Down Expand Up @@ -210,7 +210,7 @@ WHERE
thing.metadata.generation = Some(generation as u32);
thing.metadata.resource_version = Some(resource_version.to_string());

let (waker, waker_reasons) = waker_data(&thing);
let waker = waker_data(&thing);

log::debug!(
"Creating new thing: {} / {}",
Expand All @@ -233,8 +233,7 @@ INSERT INTO things (
ANNOTATIONS,
LABELS,
DATA,
WAKER,
WAKER_REASONS
WAKER
) VALUES (
$1,
$2,
Expand All @@ -245,8 +244,7 @@ INSERT INTO things (
$7,
$8,
$9,
$10,
$11
$10
)
"#,
&[
Expand All @@ -260,7 +258,6 @@ INSERT INTO things (
Type::JSONB, // labels
Type::JSON, // data
Type::TIMESTAMPTZ, // waker
Type::JSONB, // waker reasons
],
)
.await
Expand All @@ -279,7 +276,6 @@ INSERT INTO things (
&Json(&thing.metadata.labels),
&Json(data),
&waker,
&Json(&waker_reasons),
],
)
.await
Expand All @@ -299,7 +295,7 @@ INSERT INTO things (
let name = &thing.metadata.name;
let application = &thing.metadata.application;

let (waker, waker_reasons) = waker_data(&thing);
let waker = waker_data(&thing);

log::debug!("Updating existing thing: {application} / {name}");

Expand All @@ -311,8 +307,7 @@ SET
ANNOTATIONS = $4,
LABELS = $5,
DATA = $6,
WAKER = $7,
WAKER_REASONS = $8
WAKER = $7
WHERE
NAME = $1
AND
Expand All @@ -324,7 +319,6 @@ WHERE
let data = Json(Data::from(&thing));
let annotations = Json(&thing.metadata.annotations);
let labels = Json(&thing.metadata.labels);
let waker_reasons = Json(&waker_reasons);

let mut types: Vec<Type> = Vec::new();
let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new();
Expand All @@ -342,8 +336,6 @@ WHERE
params.push(&data);
types.push(Type::TIMESTAMPTZ);
params.push(&waker);
types.push(Type::JSONB);
params.push(&waker_reasons);

if let Some(resource_version) = &thing.metadata.resource_version {
stmt.push_str(&format!(
Expand Down Expand Up @@ -445,15 +437,10 @@ impl Storage {
}
}

fn waker_data(thing: &Thing) -> (Option<DateTime<Utc>>, Option<Vec<WakerReason>>) {
// FIXME: use unzip_option once available
match thing
fn waker_data(thing: &Thing) -> Option<DateTime<Utc>> {
thing
.internal
.as_ref()
.and_then(|i| i.waker.as_ref())
.map(|w| (w.when, w.why.iter().map(|r| *r).collect::<Vec<_>>()))
{
Some(w) => (Some(w.0), Some(w.1)),
None => (None, None),
}
.map(|w| w.when)
}
49 changes: 37 additions & 12 deletions core/src/waker/postgres.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::model::WakerReason;
use crate::service::Id;
use crate::storage::postgres::Data;
use crate::waker::TargetId;
use anyhow::{anyhow, bail};
use anyhow::bail;
use async_trait::async_trait;
use deadpool_postgres::{Client, Runtime, Transaction};
use postgres_types::{Json, Type};
Expand Down Expand Up @@ -94,7 +95,7 @@ SELECT
NAME,
UID,
RESOURCE_VERSION,
WAKER_REASONS
DATA
FROM
things
Expand Down Expand Up @@ -178,8 +179,15 @@ where
let thing: String = row.try_get("NAME")?;
let uid: Uuid = row.try_get("UID")?;
let resource_version: Uuid = row.try_get("RESOURCE_VERSION")?;
let reasons: Vec<WakerReason> =
row.try_get::<_, Json<Vec<WakerReason>>>("WAKER_REASONS")?.0;
let data = row.try_get::<_, Json<Data>>("DATA")?.0;

let reasons = data
.internal
.as_ref()
.and_then(|i| i.waker.as_ref())
.map(|w| &w.why)
.map(|r| r.iter().map(|r| *r).collect::<Vec<_>>())
.unwrap_or_default();

// send wakeup

Expand All @@ -199,7 +207,7 @@ where

// clear waker

Self::clear_waker(tx, application, thing, uid, resource_version).await?;
Self::clear_waker(tx, application, thing, uid, resource_version, data).await?;

// done with this entry

Expand All @@ -215,32 +223,49 @@ where
thing: String,
uid: Uuid,
resource_version: Uuid,
mut data: Data,
) -> anyhow::Result<()> {
// we clear the waker and commit the transaction. The oplock should hold, as we have locked
// the record.

if let Some(internal) = &mut data.internal {
internal.waker = None;
}

let stmt = tx
.prepare_typed_cached(
r#"
UPDATE
things
SET
WAKER = NULL, WAKER_REASONS = NULL
WAKER = NULL,
DATA = $1
WHERE
APPLICATION = $1
APPLICATION = $2
AND
NAME = $2
NAME = $3
AND
UID = $3
UID = $4
AND
RESOURCE_VERSION = $4
RESOURCE_VERSION = $5
"#,
&[Type::VARCHAR, Type::VARCHAR, Type::UUID, Type::UUID],
&[
Type::JSON,
Type::VARCHAR,
Type::VARCHAR,
Type::UUID,
Type::UUID,
],
)
.await?;

let data = Json(&data);

let result = tx
.execute(&stmt, &[&application, &thing, &uid, &resource_version])
.execute(
&stmt,
&[&data, &application, &thing, &uid, &resource_version],
)
.await?;

if result == 0 {
Expand Down
1 change: 0 additions & 1 deletion database-migration/migrations/00000000000000_init/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ CREATE TABLE things (

-- internal state (for easier access)
WAKER TIMESTAMP WITH TIME ZONE NULL,
WAKER_REASONS JSONB NULL,

-- constraints
PRIMARY KEY (NAME, APPLICATION)
Expand Down

0 comments on commit 2e24440

Please sign in to comment.