Skip to content

Commit

Permalink
feat: implement core of wakeup stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Aug 17, 2022
1 parent c15e273 commit c6c59f3
Show file tree
Hide file tree
Showing 43 changed files with 2,114 additions and 376 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions backend/src/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use crate::notifier::actix::WebSocketHandler;
use crate::Instance;
use actix_web::{web, HttpRequest, HttpResponse};
use actix_web_actors::ws;
use drogue_doppelgaenger_core::processor::source::Sink;
use drogue_doppelgaenger_core::service::JsonMergeUpdater;
use drogue_doppelgaenger_core::processor::sink::Sink;
use drogue_doppelgaenger_core::{
listener::KafkaSource,
model::{Reconciliation, Thing},
notifier::Notifier,
service::{Id, JsonPatchUpdater, Patch, ReportedStateUpdater, Service, UpdateMode},
service::{
Id, JsonMergeUpdater, JsonPatchUpdater, Patch, ReportedStateUpdater, Service, UpdateMode,
},
storage::Storage,
};
use serde_json::{json, Value};
Expand All @@ -18,7 +19,7 @@ pub async fn things_get<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
path: web::Path<Id>,
) -> Result<HttpResponse, actix_web::Error> {
let result = service.get(path.into_inner()).await?;
let result = service.get(&path.into_inner()).await?;

Ok(HttpResponse::Ok().json(result))
}
Expand Down Expand Up @@ -106,7 +107,7 @@ pub async fn things_delete<S: Storage, N: Notifier, Si: Sink>(
service: web::Data<Service<S, N, Si>>,
path: web::Path<Id>,
) -> Result<HttpResponse, actix_web::Error> {
service.delete(path.into_inner()).await?;
service.delete(&path.into_inner()).await?;

Ok(HttpResponse::NoContent().json(json!({})))
}
Expand Down
40 changes: 19 additions & 21 deletions backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,24 @@ mod notifier;

use actix_web::{guard, web, App, HttpServer};
use anyhow::anyhow;
use drogue_doppelgaenger_core::listener::KafkaSource;
use drogue_doppelgaenger_core::notifier::{kafka, Notifier};
use drogue_doppelgaenger_core::processor::source::{EventStream, Sink};
use drogue_doppelgaenger_core::service::Service;
use drogue_doppelgaenger_core::storage::postgres;
use drogue_doppelgaenger_core::{app::run_main, processor, service, storage::Storage};
use futures::future::LocalBoxFuture;
use futures::{FutureExt, TryFutureExt};
use drogue_doppelgaenger_core::{
app::run_main,
listener::KafkaSource,
notifier::{kafka, Notifier},
processor::sink::{self, Sink},
service::{self, Service},
storage::{postgres, Storage},
};
use futures::{future::LocalBoxFuture, FutureExt, TryFutureExt};

#[derive(Clone, Debug, serde::Deserialize)]
pub struct Config<S: Storage, N: Notifier> {
pub struct Config<S: Storage, N: Notifier, Si: Sink> {
pub application: Option<String>,
// serde(bound) required as S isn't serializable: https://github.com/serde-rs/serde/issues/1296
#[serde(bound = "")]
pub service: service::Config<S, N>,
pub service: service::Config<S, N, Si>,

pub listener: kafka::Config,

// FIXME: fix up sink configuration
pub sink: processor::source::kafka::Config,
}

#[derive(Clone, Debug)]
Expand All @@ -31,14 +29,12 @@ pub struct Instance {
}

pub fn configure<S: Storage, N: Notifier, Si: Sink>(
config: Config<S, N>,
config: Config<S, N, Si>,
) -> anyhow::Result<(
impl Fn(&mut web::ServiceConfig) + Send + Sync + Clone,
LocalBoxFuture<'static, anyhow::Result<()>>,
)> {
let (_, sink) = processor::source::kafka::EventStream::new(config.sink)?;

let service = Service::new(config.service, sink)?;
let service = Service::from_config(config.service)?;
let service = web::Data::new(service);

let (source, runner) = KafkaSource::new(config.listener)?;
Expand Down Expand Up @@ -77,11 +73,11 @@ pub fn configure<S: Storage, N: Notifier, Si: Sink>(
),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}/reportedState")
web::resource("/api/v1alpha1/things/{application}/things/{thing}/reportedStates")
.route(web::put().to(endpoints::things_update_reported_state::<S, N, Si>)),
);
ctx.service(
web::resource("/api/v1alpha1/things/{application}/things/{thing}/reconciliation")
web::resource("/api/v1alpha1/things/{application}/things/{thing}/reconciliations")
.route(web::put().to(endpoints::things_update_reconciliation::<S, N, Si>)),
);
ctx.service(
Expand All @@ -97,8 +93,10 @@ pub fn configure<S: Storage, N: Notifier, Si: Sink>(
))
}

pub async fn run(config: Config<postgres::Storage, kafka::Notifier>) -> anyhow::Result<()> {
let (configurator, runner) = configure::<_, _, processor::source::kafka::Sink>(config)?;
pub async fn run(
config: Config<postgres::Storage, kafka::Notifier, sink::kafka::Sink>,
) -> anyhow::Result<()> {
let (configurator, runner) = configure::<_, _, _>(config)?;

let http = HttpServer::new(move || App::new().configure(|ctx| configurator(ctx)))
.bind("[::]:8080")?
Expand Down
62 changes: 39 additions & 23 deletions backend/src/notifier/actix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use super::{CLIENT_TIMEOUT, HEARTBEAT_INTERVAL};
use crate::notifier::{Request, Response};
use actix::{Actor, ActorContext, AsyncContext, Handler, SpawnHandle, StreamHandler, WrapFuture};
use actix_web_actors::ws::{self, CloseCode, CloseReason};
use drogue_doppelgaenger_core::processor::source::Sink;
use drogue_doppelgaenger_core::{
listener::{KafkaSource, Message},
notifier::Notifier,
processor::sink::Sink,
service::{Id, Service},
storage::Storage,
};
Expand Down Expand Up @@ -174,37 +174,53 @@ impl<S: Storage, N: Notifier, Si: Sink> Handler<message::Subscribe> for WebSocke

let service = self.service.clone();

// subscribe first
let mut source = self.source.subscribe(id.clone());

let addr = ctx.address();
let i = id.clone();
let task = ctx.spawn(
async move {
// read the initial state
// FIXME: filter out "not found"
if let Ok(thing) = service.get(id).await {
let initial_generation = thing.metadata.generation;
// send initial
addr.do_send(message::Event(Response::Change {
thing: Arc::new(thing),
}));

while let Some(msg) = source.next().await {
match msg {
Ok(Message::Change(thing)) => {
if thing.metadata.generation > initial_generation {
// prevent initial duplicates
addr.do_send(message::Event(Response::Change { thing }))
} else {
log::info!("Suppressing duplicate generation change");
}
}
Err(BroadcastStreamRecvError::Lagged(lag)) => {
addr.do_send(message::Event(Response::Lag { lag }))
// now read the initial state
let initial_generation = match service.get(&id).await {
Ok(Some(thing)) => {
let initial_generation = thing.metadata.generation;
// send initial
addr.do_send(message::Event(Response::Initial {
thing: Arc::new(thing),
}));
initial_generation
}
Ok(None) => Some(0),
Err(err) => {
// stream closed
addr.do_send(message::Close(Some(CloseReason {
code: CloseCode::Abnormal,
description: Some("Failed to read initial state".to_string()),
})));

log::warn!("Failed to read initial state: {err}");
return;
}
};

// and run the loop
while let Some(msg) = source.next().await {
match msg {
Ok(Message::Change(thing)) => {
if thing.metadata.generation > initial_generation {
// prevent initial duplicates
addr.do_send(message::Event(Response::Change { thing }))
} else {
log::info!("Suppressing duplicate generation change");
}
}
Err(BroadcastStreamRecvError::Lagged(lag)) => {
addr.do_send(message::Event(Response::Lag { lag }))
}
}
log::warn!("Listener loop exited");
}
log::warn!("Listener loop exited");

// stream closed
addr.do_send(message::Close(Some(CloseReason {
Expand Down
1 change: 1 addition & 0 deletions backend/src/notifier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub enum Request {
#[serde(rename_all = "camelCase")]
#[serde(tag = "type")]
pub enum Response {
Initial { thing: Arc<Thing> },
Change { thing: Arc<Thing> },
Lag { lag: u64 },
}
2 changes: 2 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ config = "0.13"
dotenv = "0.15"
env_logger = "0.9"
futures = "0.3"
humantime = "2"
humantime-serde = "1"
json-patch = { version = "0.2", default-features = false }
jsonschema = "0.16"
log = "0.4"
opentelemetry = { version = "0.17", features = ["rt-tokio"] }
postgres-types = "0.2"
rdkafka = { version = "0.28", features = ["sasl", "ssl"] }
rustls = "0.20"
rustls-native-certs = "0.6"
Expand Down
3 changes: 1 addition & 2 deletions core/examples/gen_schema.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use schemars::gen::SchemaSettings;
use schemars::schema_for;

fn main() {
let schema = schemars::gen::SchemaGenerator::from(SchemaSettings::openapi3())
.into_root_schema_for::<drogue_doppelgaenger_common::model::Thing>();
.into_root_schema_for::<drogue_doppelgaenger_core::model::Thing>();
println!("{}", serde_yaml::to_string(&schema).unwrap());
}
2 changes: 1 addition & 1 deletion core/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use futures::future::LocalBoxFuture;
use futures::stream::FuturesUnordered;
use std::time::Duration;

#[cfg(feature = "jaeger")]
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
Expand Down Expand Up @@ -112,6 +111,7 @@ where
{
use futures::FutureExt;
use prometheus::{Encoder, TextEncoder};
use std::time::Duration;

futures.push(
async move {
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ pub mod processor;
pub mod service;
pub mod storage;
pub mod version;
pub mod waker;
8 changes: 4 additions & 4 deletions core/src/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ impl KafkaSource {

let topic = config.topic;

let config: rdkafka::ClientConfig = KafkaProperties(config.properties).into();
let mut config: rdkafka::ClientConfig = KafkaProperties(config.properties).into();

config.set("enable.partition.eof", "false");

let consumer: StreamConsumer = config.create().context("Creating consumer")?;

consumer.subscribe(&[&topic]).context("Start subscribe")?;
Expand All @@ -89,9 +92,6 @@ impl KafkaSource {
consumer,
inner: inner.clone(),
};
//let consumer = Self::run(consumer, inner.clone());

//let task = Handle::current().spawn(consumer);

Ok((Self { inner }, runner))
}
Expand Down
39 changes: 39 additions & 0 deletions core/src/machine/deno.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::Outgoing;
use crate::model::Thing;
use anyhow::bail;
use chrono::Duration;
use deno_core::{serde_v8, v8, Extension, JsRuntime, RuntimeOptions};
use serde_json::Value;
use std::sync::Arc;
Expand All @@ -16,6 +17,7 @@ const KEY_CURRENT_STATE: &str = "currentState";
const KEY_NEW_STATE: &str = "newState";
const KEY_OUTBOX: &str = "outbox";
const KEY_LOGS: &str = "logs";
const KEY_WAKER: &str = "waker";

/// Run a deno script
///
Expand Down Expand Up @@ -144,9 +146,46 @@ fn extract_context(runtime: &mut JsRuntime) -> anyhow::Result<Outgoing> {
}
};

let waker = {
let key = serde_v8::to_v8(&mut scope, KEY_WAKER)?;
match global.get(scope, key) {
Some(value) => to_duration(serde_v8::from_v8(scope, value)?)?,
None => None,
}
};

Ok(Outgoing {
new_thing,
outbox,
log,
waker,
})
}

/// convert a JavaScript value into a duration
fn to_duration(value: Value) -> anyhow::Result<Option<Duration>> {
Ok(match value {
Value::String(time) => {
let duration = humantime::parse_duration(&time)?;
Some(Duration::from_std(duration)?)
}
Value::Number(seconds) => {
if let Some(seconds) = seconds.as_i64() {
if seconds > 0 {
return Ok(Some(Duration::seconds(seconds)));
}
} else if let Some(_) = seconds.as_u64() {
// we can be sure it doesn't fit into an i64
return Ok(Some(Duration::seconds(i64::MAX)));
} else if let Some(seconds) = seconds.as_f64() {
if seconds > i64::MAX as f64 {
return Ok(Some(Duration::seconds(i64::MAX)));
} else if seconds > 0f64 {
return Ok(Some(Duration::seconds(seconds as i64)));
}
}
None
}
_ => None,
})
}
Loading

0 comments on commit c6c59f3

Please sign in to comment.