Skip to content

Commit

Permalink
feat: add desired state to the debugger
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Aug 17, 2022
1 parent 823e797 commit e3aa525
Show file tree
Hide file tree
Showing 15 changed files with 614 additions and 155 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
* [ ] Allow more fine-grained control over this
* [ ] Implement WASM
* [x] Ensure that reported state "last updated" changes when only the value changes (move logic to machine)
* [ ] Allow unsetting a desired state, without deleting it
3 changes: 2 additions & 1 deletion backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ actix = "0.13"
actix-web = "4"
actix-web-actors = "4"
anyhow = "1"
chrono = "0.4"
chrono = { version = "0.4", features = ["serde"] }
futures = "0.3"
humantime = "2"
humantime-serde = "1"
log = "0.4"
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1"
Expand Down
11 changes: 5 additions & 6 deletions backend/src/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use drogue_doppelgaenger_core::{
listener::KafkaSource,
model::{Reconciliation, SyntheticType, Thing},
notifier::Notifier,
processor::sink::Sink,
processor::{sink::Sink, SetDesiredValue},
service::{
DesiredStateUpdate, DesiredStateUpdater, DesiredStateValueUpdater, Id, JsonMergeUpdater,
JsonPatchUpdater, Patch, ReportedStateUpdater, Service, SyntheticStateUpdater, UpdateMode,
Expand Down Expand Up @@ -164,14 +164,13 @@ pub async fn things_update_desired_state_value<S: Storage, N: Notifier, Si: Sink

let valid_until = valid_until.or_else(|| valid_for.map(|d| Utc::now() + d));

let mut values = BTreeMap::new();
values.insert(name, SetDesiredValue::WithOptions { value, valid_until });

service
.update(
&Id::new(application, thing),
DesiredStateValueUpdater {
name,
value,
valid_until,
},
DesiredStateValueUpdater(values),
&OPTS,
)
.await?;
Expand Down
75 changes: 64 additions & 11 deletions backend/src/notifier/actix.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
use super::{CLIENT_TIMEOUT, HEARTBEAT_INTERVAL};
use crate::notifier::{Request, Response};
use actix::{Actor, ActorContext, AsyncContext, Handler, SpawnHandle, StreamHandler, WrapFuture};
use crate::notifier::{Request, Response, SetDesiredValue};
use actix::{
Actor, ActorContext, AsyncContext, Handler, ResponseFuture, SpawnHandle, StreamHandler,
WrapFuture,
};
use actix_web_actors::ws::{self, CloseCode, CloseReason};
use drogue_doppelgaenger_core::{
listener::{KafkaSource, Message},
notifier::Notifier,
processor::sink::Sink,
processor::{self, sink::Sink, Event},
service::{Id, Service},
storage::Storage,
};
use futures::StreamExt;
use std::{collections::HashMap, sync::Arc, time::Instant};
use std::{collections::BTreeMap, collections::HashMap, fmt::Display, sync::Arc, time::Instant};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;

mod message {
use crate::notifier::Response;
use actix::Message;
use actix_web_actors::ws::CloseReason;
use drogue_doppelgaenger_core::processor::SetDesiredValue;
use std::collections::BTreeMap;

#[derive(Message)]
#[rtype(result = "()")]
Expand All @@ -29,6 +34,9 @@ mod message {
pub struct Event(pub Response);
#[derive(Message)]
#[rtype(result = "()")]
pub struct SetDesiredValues(pub String, pub BTreeMap<String, SetDesiredValue>);
#[derive(Message)]
#[rtype(result = "()")]
pub struct Close(pub Option<CloseReason>);
}

Expand Down Expand Up @@ -93,15 +101,41 @@ impl<S: Storage, N: Notifier, Si: Sink> WebSocketHandler<S, N, Si> {
}));
ctx.stop();
}
Ok(Request::SetDesiredValues { thing, values }) => match Self::convert_set(values) {
Ok(values) => {
ctx.address()
.do_send(message::SetDesiredValues(thing, values));
}
Err(err) => {
Self::close_err(ctx, err);
}
},
Err(err) => {
ctx.close(Some(CloseReason {
code: CloseCode::Protocol,
description: Some(err.to_string()),
}));
ctx.stop();
Self::close_err(ctx, err);
}
}
}

fn convert_set(
value: BTreeMap<String, SetDesiredValue>,
) -> anyhow::Result<BTreeMap<String, processor::SetDesiredValue>> {
value
.into_iter()
.map(|(key, value)| {
let value: processor::SetDesiredValue = value.try_into()?;
Ok((key, value))
})
.collect::<Result<BTreeMap<String, processor::SetDesiredValue>, _>>()
}

fn close_err<E: Display>(ctx: &mut ws::WebsocketContext<Self>, err: E) {
log::info!("Closing websocket due to: {err}");
ctx.close(Some(CloseReason {
code: CloseCode::Protocol,
description: Some(err.to_string()),
}));
ctx.stop();
}
}

impl<S: Storage, N: Notifier, Si: Sink> Actor for WebSocketHandler<S, N, Si> {
Expand Down Expand Up @@ -138,10 +172,11 @@ impl<S: Storage, N: Notifier, Si: Sink> StreamHandler<Result<ws::Message, ws::Pr
self.heartbeat = Instant::now();
}
Ok(ws::Message::Binary(data)) => {
self.handle_protocol_message(ctx, serde_json::from_slice::<super::Request>(&data));
self.handle_protocol_message(ctx, serde_json::from_slice(&data));
}
Ok(ws::Message::Text(data)) => {
self.handle_protocol_message(ctx, serde_json::from_str::<super::Request>(&data));
log::debug!("Message: {data}");
self.handle_protocol_message(ctx, serde_json::from_str(&data));
}
Ok(ws::Message::Close(reason)) => {
log::debug!("Client disconnected - reason: {:?}", reason);
Expand Down Expand Up @@ -251,6 +286,24 @@ impl<S: Storage, N: Notifier, Si: Sink> Handler<message::Unsubscribe>
}
}

impl<S: Storage, N: Notifier, Si: Sink> Handler<message::SetDesiredValues>
for WebSocketHandler<S, N, Si>
{
type Result = ResponseFuture<()>;

fn handle(&mut self, msg: message::SetDesiredValues, _ctx: &mut Self::Context) -> Self::Result {
let application = self.application.clone();
let sink = self.service.sink().clone();

Box::pin(async move {
let thing = msg.0;
let message = processor::Message::SetDesiredValue { values: msg.1 };

let _ = sink.publish(Event::new(application, thing, message)).await;
})
}
}

impl<S: Storage, N: Notifier, Si: Sink> Handler<message::Event> for WebSocketHandler<S, N, Si> {
type Result = Result<(), serde_json::Error>;

Expand Down
70 changes: 68 additions & 2 deletions backend/src/notifier/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,84 @@
pub mod actix;

use chrono::{DateTime, Utc};
use drogue_doppelgaenger_core::model::Thing;
use drogue_doppelgaenger_core::processor;
use serde_json::Value;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;

const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);

#[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
#[serde(untagged)]
pub enum SetDesiredValue {
#[serde(rename_all = "camelCase")]
WithOptions {
value: Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
valid_until: Option<DateTime<Utc>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[serde(with = "humantime_serde")]
valid_for: Option<Duration>,
},
Value(Value),
}

#[derive(Debug, thiserror::Error)]
pub enum SetDesiredValueError {
#[error("Duration out of range: {0}")]
OutOfRange(#[from] time::OutOfRangeError),
#[error("Invalid combination: only one of valid for or until must be present")]
Invalid,
}

impl TryFrom<SetDesiredValue> for processor::SetDesiredValue {
type Error = SetDesiredValueError;

fn try_from(value: SetDesiredValue) -> Result<Self, Self::Error> {
Ok(match value {
SetDesiredValue::Value(value) => Self::Value(value),
SetDesiredValue::WithOptions {
value,
valid_until,
valid_for: None,
} => Self::WithOptions { value, valid_until },
SetDesiredValue::WithOptions {
value,
valid_until: None,
valid_for: Some(valid_for),
} => {
let valid_until = Utc::now() + chrono::Duration::from_std(valid_for)?;
Self::WithOptions {
value,
valid_until: Some(valid_until),
}
}
SetDesiredValue::WithOptions {
value,
valid_until: Some(_),
valid_for: Some(_),
} => return Err(SetDesiredValueError::Invalid),
})
}
}

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "type")]
pub enum Request {
Subscribe { thing: String },
Unsubscribe { thing: String },
Subscribe {
thing: String,
},
Unsubscribe {
thing: String,
},
SetDesiredValues {
thing: String,
values: BTreeMap<String, SetDesiredValue>,
},
}

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
Expand Down
6 changes: 6 additions & 0 deletions core/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ mod test {
last_attempt: Some(Utc.ymd(2022, 1, 1).and_hms(1, 2, 4)),
},
method: DesiredFeatureMethod::Manual,
mode: DesiredMode::Sync,
},
);
thing.desired_state.insert(
Expand All @@ -619,6 +620,7 @@ mod test {
when: Utc.ymd(2022, 1, 1).and_hms(1, 2, 4),
},
method: DesiredFeatureMethod::Manual,
mode: DesiredMode::Sync,
},
);
thing.desired_state.insert(
Expand All @@ -632,6 +634,7 @@ mod test {
reason: Some("The dog ate my command".to_string()),
},
method: DesiredFeatureMethod::Manual,
mode: DesiredMode::Sync,
},
);
thing.desired_state.insert(
Expand All @@ -644,6 +647,7 @@ mod test {
last_attempt: Some(Utc.ymd(2022, 1, 1).and_hms(1, 2, 4)),
},
method: DesiredFeatureMethod::Code(Code::JavaScript("true".to_string())),
mode: DesiredMode::Sync,
},
);
thing.desired_state.insert(
Expand All @@ -659,6 +663,7 @@ mod test {
channel: "set-feature".to_string(),
payload: DesiredFeatureCommandPayload::Raw,
},
mode: DesiredMode::Sync,
},
);
thing.desired_state.insert(
Expand All @@ -671,6 +676,7 @@ mod test {
last_attempt: Some(Utc.ymd(2022, 1, 1).and_hms(1, 2, 4)),
},
method: DesiredFeatureMethod::External,
mode: DesiredMode::Sync,
},
);
assert_eq!(
Expand Down
23 changes: 22 additions & 1 deletion core/src/processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub mod sink;
pub mod source;

use crate::service::UpdateOptions;
use crate::service::{DesiredStateValueUpdater, DesiredStateValueUpdaterError, UpdateOptions};
use crate::{
model::{Thing, WakerReason},
notifier::Notifier,
Expand Down Expand Up @@ -58,6 +58,18 @@ impl Event {
}
}

#[derive(Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
#[serde(untagged)]
pub enum SetDesiredValue {
#[serde(rename_all = "camelCase")]
WithOptions {
value: Value,
#[serde(default, skip_serializing_if = "Option::is_none")]
valid_until: Option<DateTime<Utc>>,
},
Value(Value),
}

#[derive(Clone, Debug, PartialEq, serde::Deserialize, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub enum Message {
Expand All @@ -66,6 +78,10 @@ pub enum Message {
#[serde(default)]
partial: bool,
},
SetDesiredValue {
#[serde(default)]
values: BTreeMap<String, SetDesiredValue>,
},
Patch(Patch),
Merge(Value),
Wakeup {
Expand Down Expand Up @@ -259,6 +275,8 @@ pub enum MessageError {
Patch(#[from] PatchError),
#[error("Failed to apply JSON merge: {0}")]
Merge(#[from] MergeError),
#[error("Failed to apply desired value: {0}")]
SetDesiredValues(#[from] DesiredStateValueUpdaterError),
}

impl Updater for Message {
Expand All @@ -280,6 +298,9 @@ impl Updater for Message {
// don't do any real change, this will just reconcile and process what is necessary
Ok(thing)
}
Message::SetDesiredValue { values } => {
Ok(DesiredStateValueUpdater(values).update(thing)?)
}
}
}
}
4 changes: 4 additions & 0 deletions core/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ impl<St: Storage, No: Notifier, Si: Sink> Service<St, No, Si> {
}
}

pub fn sink(&self) -> &Si {
&self.sink
}

pub async fn create(&self, thing: Thing) -> Result<Thing, Error<St, No>> {
let Outcome {
mut new_thing,
Expand Down
Loading

0 comments on commit e3aa525

Please sign in to comment.