Skip to content

Commit

Permalink
feat: added desired state processing
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Aug 17, 2022
1 parent 13327aa commit c34286c
Show file tree
Hide file tree
Showing 19 changed files with 1,128 additions and 370 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
* [ ] RBAC
* [ ] Allow a way to modify the thing, overriding a non-empty outbox
* [ ] Implement WASM
* [ ] Ensure that reported state "last updated" changes when only the value changes (move logic to machine)
4 changes: 4 additions & 0 deletions backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ actix = "0.13"
actix-web = "4"
actix-web-actors = "4"
anyhow = "1"
chrono = "0.4"
futures = "0.3"
humantime = "2"
log = "0.4"
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1"
time = "0.1"
thiserror = "1"
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1", features = ["sync"] }

Expand Down
68 changes: 65 additions & 3 deletions backend/src/endpoints.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
use crate::{notifier::actix::WebSocketHandler, Instance};
use crate::{
notifier::actix::WebSocketHandler,
utils::{to_datetime, to_duration},
Instance,
};
use actix_web::{web, HttpRequest, HttpResponse};
use actix_web_actors::ws;
use chrono::Utc;
use drogue_doppelgaenger_core::{
listener::KafkaSource,
model::{Reconciliation, SyntheticType, Thing},
notifier::Notifier,
processor::sink::Sink,
service::{
Id, JsonMergeUpdater, JsonPatchUpdater, Patch, ReportedStateUpdater, Service,
SyntheticStateUpdater, UpdateMode, UpdateOptions,
DesiredStateUpdate, DesiredStateUpdater, DesiredStateValueUpdater, Id, JsonMergeUpdater,
JsonPatchUpdater, Patch, ReportedStateUpdater, Service, SyntheticStateUpdater, UpdateMode,
UpdateOptions,
},
storage::Storage,
};
Expand Down Expand Up @@ -117,6 +123,62 @@ pub async fn things_update_synthetic_state<S: Storage, N: Notifier, Si: Sink>(
Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_update_desired_state<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
path: web::Path<(String, String, String)>,
payload: web::Json<DesiredStateUpdate>,
) -> Result<HttpResponse, actix_web::Error> {
let (application, thing, state) = path.into_inner();
let payload = payload.into_inner();

service
.update(
&Id::new(application, thing),
DesiredStateUpdater(state, payload),
&OPTS,
)
.await?;

Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_update_desired_state_value<S: Storage, N: Notifier, Si: Sink>(
request: HttpRequest,
service: web::Data<Service<S, N, Si>>,
path: web::Path<(String, String, String)>,
payload: web::Json<Value>,
) -> Result<HttpResponse, actix_web::Error> {
let (application, thing, name) = path.into_inner();
let value = payload.into_inner();

let valid_until = request
.headers()
.get("valid-until")
.map(to_datetime)
.transpose()?;
let valid_for = request
.headers()
.get("valid-for")
.map(to_duration)
.transpose()?;

let valid_until = valid_until.or_else(|| valid_for.map(|d| Utc::now() + d));

service
.update(
&Id::new(application, thing),
DesiredStateValueUpdater {
name,
value,
valid_until,
},
&OPTS,
)
.await?;

Ok(HttpResponse::NoContent().json(json!({})))
}

pub async fn things_update_reconciliation<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
path: web::Path<Id>,
Expand Down
17 changes: 17 additions & 0 deletions backend/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod endpoints;
mod notifier;
mod utils;

use actix_web::{guard, web, App, HttpServer};
use anyhow::anyhow;
Expand Down Expand Up @@ -86,6 +87,22 @@ pub fn configure<S: Storage, N: Notifier, Si: Sink>(
Si,
>)),
);
ctx.service(
web::resource(
"/api/v1alpha1/things/{application}/things/{thing}/desiredStates/{name}",
)
.route(web::put().to(endpoints::things_update_desired_state::<
S,
N,
Si,
>)),
);
ctx.service(
web::resource(
"/api/v1alpha1/things/{application}/things/{thing}/desiredStates/{name}/value",
)
.route(web::put().to(endpoints::things_update_desired_state_value::<S, N, Si>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}/reconciliations")
.route(web::put().to(endpoints::things_update_reconciliation::<S, N, Si>)),
Expand Down
37 changes: 37 additions & 0 deletions backend/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use actix_web::body::BoxBody;
use actix_web::http::header::{HeaderValue, ToStrError};
use actix_web::{HttpResponse, ResponseError};
use chrono::{DateTime, Duration, ParseError, Utc};
use drogue_doppelgaenger_core::error::ErrorInformation;
use humantime::DurationError;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Header Value: {0}")]
Value(#[from] ToStrError),
#[error("Parse: {0}")]
Parse(#[from] ParseError),
#[error("Out of range: {0}")]
OutOfRange(#[from] time::OutOfRangeError),
#[error("Duration: {0}")]
Duration(#[from] DurationError),
}

impl ResponseError for Error {
fn error_response(&self) -> HttpResponse<BoxBody> {
HttpResponse::BadRequest().json(ErrorInformation {
error: "InvalidFormat".to_string(),
message: Some(self.to_string()),
})
}
}

pub fn to_duration(value: &HeaderValue) -> Result<Duration, Error> {
Ok(Duration::from_std(humantime::parse_duration(
value.to_str()?,
)?)?)
}

pub fn to_datetime(value: &HeaderValue) -> Result<DateTime<Utc>, Error> {
Ok(DateTime::parse_from_rfc3339(value.to_str()?)?.into())
}
7 changes: 4 additions & 3 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,25 @@ humantime-serde = "1"
indexmap = "1.9"
json-patch = { version = "0.2", default-features = false }
jsonschema = "0.16"
lazy_static = "1"
log = "0.4"
opentelemetry = { version = "0.17", features = ["rt-tokio"] }
postgres-types = "0.2"
prometheus = { version = "0.13" }
rdkafka = { version = "0.28", features = ["sasl", "ssl"] }
rustls = "0.20"
rustls-native-certs = "0.6"
schemars = { version = "0.8", features = ["bytes", "chrono", "indexmap"] }
serde = "1"
serde = { version = "1", features = ["rc"] }
serde_json = "1"
thiserror = "1"
time = "0.1"
tokio = "1"
tokio-stream = { version = "0.1", features = ["sync"] }
tracing = "0.1"
tracing-opentelemetry = "0.17"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1", features = ["v4"] }
lazy_static = "1"
prometheus = { version = "0.13" }

opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"], optional = true }

Expand Down
Loading

0 comments on commit c34286c

Please sign in to comment.